python多进程模块multiprocessing

multiprocessing是一个使用类似于线程模块的API来支持多进程的包。多处理包提供本地和远程并发,通过使用子进程而不是线程有效的避开全局解释器锁(GIL),由于这个原因,多处理模块允许程序员充分利用给定机器上的多个处理器。

使用多进程往往是用来处理CPU密集型的需求,如科学计算;如果是I/O密集型,则可以使用多线程去处理,比如爬虫。

创建和启动进程(Process类)

用法与Threading模块中的Thread类似。

图片[1] - python多进程模块multiprocessing - 正则时光

进程池(Pool类)

1.apply和apply_async

图片[2] - python多进程模块multiprocessing - 正则时光

2.map和map_async

图片[3] - python多进程模块multiprocessing - 正则时光

3.两种的区别

区别apply_asyncmap_async
任务数量用于异步执行单个任务用于异步执行多个任务,将一个函数应用到一个可迭代对象的每个元素上
返回值返回一个AsyncResult对象,表示单个任务的结果。返回一个AsyncResult对象,表示多个任务的结果列表。
参数传递需要传递函数和函数的参数。需要传递函数和一个可迭代对象。
使用场景适用于需要异步执行单个任务的场景。适用于需要并行处理大量数据或计算密集型任务的场景。

4.close与join方法

close方法用于阻止更多的任务提交到进程池,而join方法用于等待所有工作进程完成其任务。在使用with语句上下文管理器时,closejoin会被自动调用,但在手动管理进程池时,需要显式调用这两个方法。如下所示。

from multiprocessing import Pool

def f(x):
    if x == 5:
        raise ValueError("Invalid value")
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)
    results = []
    try:
        for i in range(10):
            try:
                result = pool.apply(f, (i,))  # 同步执行
                results.append(result)
            except ValueError as e:
                print(f"Caught exception: {e}")
        print(results)
    finally:
        pool.close()  # 阻止更多的任务提交到进程池
        pool.join()   # 等待所有工作进程完成

进程间通信

multiprocessing模块提供了多种进程间通信(IPC)机制,包括队列(Queue)和管道(Pipe)。

使用队列(Queue)

Queue类是线程和进程安全的,适用于在多个进程之间传递数据。

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())  # 输出: [42, None, 'hello']
    p.join()

使用管道(Pipe)

Pipe类返回一对连接对象,分别代表管道的两端。每个连接对象都有send()recv()方法。

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    recive_conn, send_conn = Pipe(duplex=False)  #默认为False,表示只能单向通信,conn1只能接收消息,conn2只能发送消息。true为双向。
    p = Process(target=f, args=(send_conn,))
    p.start()
    print(recive_conn.recv())  # 输出: [42, None, 'hello']
    p.join()

共享内存

multiprocessing模块还提供了共享内存对象,如ValueArray,用于在进程间共享数据。

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)  # 输出: 3.1415927
    print(arr[:])  # 输出: [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
© 版权声明
THE END
喜欢就支持一下吧
点赞12 分享