📜  Python 多进程处理

📅  最后修改于: 2020-10-28 01:39:02             🧑  作者: Mango

Python 多进程处理

在本文中,我们将学习如何使用Python实现多进程。我们还将讨论其高级概念。

什么是多进程?

多进程是系统并行运行一个或多个进程的能力。简而言之,多进程使用单个计算机系统中的两个或多个CPU。此方法还能够在多个进程之间分配任务。

处理单元共享主存储器和外围设备以同时处理程序。多进程应用程序分成较小的部分,并独立运行。操作系统将每个进程分配给处理器。

Python提供了一个称为multiprocessing的内置软件包,该软件包支持交换过程。在进行多进程之前,我们必须了解过程对象。

为什么要进行多进程?

多进程对于在计算机系统内执行多项任务至关重要。假设一台没有多处理器或单处理器的计算机。我们同时将各种过程分配给该系统。

然后,它将不得不中断上一个任务,并转移到另一个任务以使所有进程继续进行。就像Chef独自在厨房工作一样简单。他必须完成一些任务来烹饪食物,例如切割,清洁,烹饪,揉面团,烘烤等。

因此,多进程对于不间断地同时执行多个任务至关重要。它还使跟踪所有任务变得容易。因此,出现了多进程的概念。

  • 可以将多进程表示为具有多个中央处理器的计算机。
  • 多核处理器是指具有两个或更多独立单元的单个计算组件。

在多进程中,CPU可以一次分配多个任务,每个任务都有自己的处理器。

Python的多进程

Python提供了多进程模块,可以在单个系统中执行多个任务。它提供了一个用户友好且直观的API,可用于多进程。

让我们了解多重处理的简单示例。

范例-

from multiprocessing import Process
   def disp():
      print ('Hello !! Welcome to Python Tutorial')
      if __name__ == '__main__':
      p = Process(target=disp)
      p.start()
      p.join()

输出:

'Hello !! Welcome to Python Tutorial'

说明:

在上面的代码中,我们导入了Process类,然后在disp()函数创建了Process对象。然后,我们使用start()方法开始该过程,并使用join()方法完成该过程。我们还可以使用args关键字在已声明的函数传递参数。

让我们了解带有参数的多重处理的以下示例。

示例-2

# Python multiprocessing example
# importing the multiprocessing module

import multiprocessing
def cube(n):
   # This function will print the cube of the given number
   print("The Cube is: {}".format(n * n * n))

def square(n):
    # This function will print the square of the given number
   print("The Square is: {}".format(n * n))

if __name__ == "__main__":
   # creating two processes
   process1 = multiprocessing.Process(target= square, args=(5, ))
   process2 = multiprocessing.Process(target= cube, args=(5, ))

   # Here we start the process 1
   process1.start()
   # Here we start process 2
   process2.start()

   # The join() method is used to wait for process 1 to complete
   process1.join()
   # It is used to wait for process 1 to complete
   process2.join()

   # Print if both processes are completed
   print("Both processes are finished")

输出:

The Cube is: 125
The Square is: 25
Both processes are finished

说明-

在上面的示例中,我们创建了两个函数-cube()函数计算给定数字的立方体,square()函数计算给定数字的平方。

接下来,我们定义具有两个参数的Process类的过程对象。第一个参数是代表要执行的函数的目标,第二个参数是args代表要在函数内传递的函数。

process1 = multiprocessing.Process(target= square, args=(5, ))
process2 = multiprocessing.Process(target= cube, args=(5, ))

我们使用了start()方法来启动该过程。

process1.start()
process2.start()

正如我们在输出中看到的那样,它等待进程1完成,然后等待进程2完成。最后一条语句在两个进程完成之后执行。

Python多进程类

Python多进程模块提供了许多用于构建并行程序的类。我们将讨论其主要类-进程,队列和锁定。在前面的示例中,我们已经讨论了Process类。现在,我们将讨论Queue和Lock类。

让我们看一下获取当前系统中CPU数量的简单示例。

范例-

import multiprocessing
print("The number of CPU currently working in system : ", multiprocessing.cpu_count())

输出:

('The number of CPU currently woking in system : ', 32)

以上CPU数量因您的PC而异。对我们来说,核心数是32。

使用队列类的Python多进程

我们知道Queue是数据结构的重要组成部分。 Python多进程与基于“先进先出”概念的数据结构队列完全相同。队列通常存储Python对象,并且在进程之间共享数据中起着至关重要的作用。

在流程的目标函数中将队列作为参数传递,以允许流程使用数据。队列提供了put()函数来插入数据,而get()函数则从队列中获取数据。让我们了解以下示例。

范例-

# Importing Queue Class

from multiprocessing import Queue

fruits = ['Apple', 'Orange', 'Guava', 'Papaya', 'Banana']
count = 1
# creating a queue object
queue = Queue()
print('pushing items to the queue:')
for fr in fruits:
    print('item no: ', count, ' ', fr)
    queue.put(fr)
    count += 1

print('\npopping items from the queue:')
count = 0
while not queue.empty():
    print('item no: ', count, ' ', queue.get())
    count += 1

输出:

pushing items to the queue:
('item no: ', 1, ' ', 'Apple')
('item no: ', 2, ' ', 'Orange')
('item no: ', 3, ' ', 'Guava')
('item no: ', 4, ' ', 'Papaya')
('item no: ', 5, ' ', 'Banana')

popping items from the queue:
('item no: ', 0, ' ', 'Apple')
('item no: ', 1, ' ', 'Orange')
('item no: ', 2, ' ', 'Guava')
('item no: ', 3, ' ', 'Papaya')
('item no: ', 4, ' ', 'Banana')

说明-

在上面的代码中,我们导入了Queue类并初始化了名为Fruits的列表。接下来,我们为1分配一个计数。count变量将对元素总数进行计数。然后,我们通过调用Queue()方法创建了队列对象。该对象将用于在队列中执行操作。在for循环中,我们使用put()函数在队列中逐个插入元素,并在每次循环迭代时将计数增加1。

Python多进程锁类

多进程Lock类用于获取对该进程的锁定,以便我们可以让另一个进程执行类似的代码,直到释放该锁定为止。 Lock类主要执行两项任务。第一个是使用acquire()函数获取锁,第二个是使用release()函数释放锁。

Python多进程示例

假设我们有多个任务。因此,我们创建了两个队列:第一个队列将维护任务,另一个将存储完整的任务日志。下一步是实例化流程以完成任务。如前所述,Queue类已经同步,因此我们不需要使用Lock类来获取锁。

在下面的示例中,我们将所有多进程类合并在一起。让我们看下面的例子。

范例-

from multiprocessing import Lock, Process, Queue, current_process
import time
import queue 


def jobTodo(tasks_to_perform, complete_tasks):
    while True:
        try:

            # The try block to catch task from the queue.
            # The get_nowait() function is used to
            # raise queue.Empty exception if the queue is empty.

            task = tasks_to_perform.get_nowait()

        except queue.Empty:

            break
        else:

                # if no exception has been raised, the else block will execute
                # add the task completion
                

            print(task)
            complete_tasks.put(task + ' is done by ' + current_process().name)
            time.sleep(.5)
    return True


def main():
    total_task = 8
    total_number_of_processes = 3
    tasks_to_perform = Queue()
    complete_tasks = Queue()
    number_of_processes = []

    for i in range(total_task):
        tasks_to_perform.put("Task no " + str(i))

    # defining number of processes
    for w in range(total_number_of_processes):
        p = Process(target=jobTodo, args=(tasks_to_perform, complete_tasks))
        number_of_processes.append(p)
        p.start()

    # completing process
    for p in number_of_processes:
        p.join()

    # print the output
    while not complete_tasks.empty():
        print(complete_tasks.get())

    return True


if __name__ == '__main__':
    main()

输出:

Task no 2
Task no 5
Task no 0
Task no 3
Task no 6
Task no 1
Task no 4
Task no 7
Task no 0 is done by Process-1
Task no 1 is done by Process-3
Task no 2 is done by Process-2
Task no 3 is done by Process-1
Task no 4 is done by Process-3
Task no 5 is done by Process-2
Task no 6 is done by Process-1
Task no 7 is done by Process-3

Python多进程池

Python多进程池对于跨多个输入值并行执行函数至关重要。它还可用于跨进程分配输入数据(数据并行性)。考虑以下多进程池示例。

范例-

from multiprocessing import Pool
import time

w = (["V", 5], ["X", 2], ["Y", 1], ["Z", 3])


def work_log(data_for_work):
    print(" Process name is %s waiting time is %s seconds" % (data_for_work[0], data_for_work[1]))
    time.sleep(int(data_for_work[1]))
    print(" Process %s Executed." % data_for_work[0])


def handler():
    p = Pool(2)
    p.map(work_log, w)

if __name__ == '__main__':
    handler()

输出:

Process name is V waiting time is 5 seconds
Process V Executed.
Process name is X waiting time is 2 seconds
Process X Executed.
Process name is Y waiting time is 1 seconds
Process Y Executed.
Process name is Z waiting time is 3 seconds
Process Z Executed.

让我们了解多进程池的另一个示例。

示例-2

from multiprocessing import Pool
def fun(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(fun, [1, 2, 3]))

输出:

[1, 8, 27]

代理对象

代理对象称为驻留在不同进程中的共享对象。该对象也称为代理。多个代理对象可能具有相似的引用。代理对象由各种方法组成,这些方法用于调用其引用对象的相应方法。以下是代理对象的示例。

范例-

from multiprocessing import Manager
manager = Manager()
l = manager.list([i*i for i in range(10)])
print(l)
print(repr(l))
print(l[4])
print(l[2:5])

输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

16
[4, 9, 16]

代理对象是可拾取的,因此我们可以在进程之间传递它们。这些对象还用于控制同步级别。

多重处理的常用功能

到目前为止,我们已经讨论了使用Python进行多处理的基本概念。多处理本身就是一个广泛的主题,对于在单个系统中执行各种任务至关重要。我们正在定义一些基本功能,这些功能通常用于实现多处理。

Method Description
pipe() The pipe() function returns a pair of connection objects.
run() The run() method is used to represent the process activities.
start() The start()method is used to start the process.
join([timeout]) The join() method is used to block the process until the process whose join() method is called terminates. The timeout is optional argument.
is_alive() It returns if process is alive.
terminate() As the name suggests, it is used to terminate the process. Always remember – the terminate() method is used in Linux, for Windows, we use TerminateProcess() method.
kill() This method is similar to the terminate() but using the SIGKILL signal on Unix.
close() This method is used to close the Process object and releases all resources associated with it.
qsize() It returns the approximate size of the queue.
empty() If queue is empty, it returns True.
full() It returns True, if queue is full.
get_await() This method is equivalent get(False).
get() This method is used to get elements from the queue. It removes and returns an element from queue.
put() This method is used to insert an element into the queue.
cpu_count() It returns the number of working CPU within the system.
current_process() It returns the Process object corresponding to the current process.
parent_process() It returns the parent Process object corresponding to the current process.
task_done() This function is used indicate that an enqueued task is completed.
join_thread() This method is used to join the background thread