multiprocessing
是一个使用类似于线程模块的API来支持多进程的包。多处理包提供本地和远程并发,通过使用子进程而不是线程有效的避开全局解释器锁(GIL),由于这个原因,多处理模块允许程序员充分利用给定机器上的多个处理器。
使用多进程往往是用来处理CPU密集型的需求,如科学计算;如果是I/O密集型,则可以使用多线程去处理,比如爬虫。
创建和启动进程(Process类)
用法与Threading模块中的Thread类似。
![图片[1] - python多进程模块multiprocessing - 正则时光](https://www.regular.cc/wp-content/uploads/2024/07/image-86.png)
进程池(Pool类)
1.apply和apply_async
![图片[2] - python多进程模块multiprocessing - 正则时光](https://www.regular.cc/wp-content/uploads/2024/07/image-87-1024x324.png)
2.map和map_async
![图片[3] - python多进程模块multiprocessing - 正则时光](https://www.regular.cc/wp-content/uploads/2024/07/image-88-1024x196.png)
3.两种的区别
区别 | apply_async | map_async |
---|---|---|
任务数量 | 用于异步执行单个任务 | 用于异步执行多个任务,将一个函数应用到一个可迭代对象的每个元素上 |
返回值 | 返回一个AsyncResult 对象,表示单个任务的结果。 | 返回一个AsyncResult 对象,表示多个任务的结果列表。 |
参数传递 | 需要传递函数和函数的参数。 | 需要传递函数和一个可迭代对象。 |
使用场景 | 适用于需要异步执行单个任务的场景。 | 适用于需要并行处理大量数据或计算密集型任务的场景。 |
4.close与join方法
close
方法用于阻止更多的任务提交到进程池,而join
方法用于等待所有工作进程完成其任务。在使用with
语句上下文管理器时,close
和join
会被自动调用,但在手动管理进程池时,需要显式调用这两个方法。如下所示。
from multiprocessing import Pooldef f(x):if x == 5:raise ValueError("Invalid value")return x*xif __name__ == '__main__':pool = Pool(processes=4)results = []try:for i in range(10):try:result = pool.apply(f, (i,)) # 同步执行results.append(result)except ValueError as e:print(f"Caught exception: {e}")print(results)finally:pool.close() # 阻止更多的任务提交到进程池pool.join() # 等待所有工作进程完成from multiprocessing import Pool def f(x): if x == 5: raise ValueError("Invalid value") return x*x if __name__ == '__main__': pool = Pool(processes=4) results = [] try: for i in range(10): try: result = pool.apply(f, (i,)) # 同步执行 results.append(result) except ValueError as e: print(f"Caught exception: {e}") print(results) finally: pool.close() # 阻止更多的任务提交到进程池 pool.join() # 等待所有工作进程完成from multiprocessing import Pool def f(x): if x == 5: raise ValueError("Invalid value") return x*x if __name__ == '__main__': pool = Pool(processes=4) results = [] try: for i in range(10): try: result = pool.apply(f, (i,)) # 同步执行 results.append(result) except ValueError as e: print(f"Caught exception: {e}") print(results) finally: pool.close() # 阻止更多的任务提交到进程池 pool.join() # 等待所有工作进程完成
进程间通信
multiprocessing
模块提供了多种进程间通信(IPC)机制,包括队列(Queue)和管道(Pipe)。
使用队列(Queue)
Queue
类是线程和进程安全的,适用于在多个进程之间传递数据。
from multiprocessing import Process, Queuedef f(q):q.put([42, None, 'hello'])if __name__ == '__main__':q = Queue()p = Process(target=f, args=(q,))p.start()print(q.get()) # 输出: [42, None, 'hello']p.join()from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # 输出: [42, None, 'hello'] p.join()from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # 输出: [42, None, 'hello'] p.join()
使用管道(Pipe)
Pipe
类返回一对连接对象,分别代表管道的两端。每个连接对象都有send()
和recv()
方法。
from multiprocessing import Process, Pipedef f(conn):conn.send([42, None, 'hello'])conn.close()if __name__ == '__main__':recive_conn, send_conn = Pipe(duplex=False) #默认为False,表示只能单向通信,conn1只能接收消息,conn2只能发送消息。true为双向。p = Process(target=f, args=(send_conn,))p.start()print(recive_conn.recv()) # 输出: [42, None, 'hello']p.join()from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': recive_conn, send_conn = Pipe(duplex=False) #默认为False,表示只能单向通信,conn1只能接收消息,conn2只能发送消息。true为双向。 p = Process(target=f, args=(send_conn,)) p.start() print(recive_conn.recv()) # 输出: [42, None, 'hello'] p.join()from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': recive_conn, send_conn = Pipe(duplex=False) #默认为False,表示只能单向通信,conn1只能接收消息,conn2只能发送消息。true为双向。 p = Process(target=f, args=(send_conn,)) p.start() print(recive_conn.recv()) # 输出: [42, None, 'hello'] p.join()
共享内存
multiprocessing
模块还提供了共享内存对象,如Value
和Array
,用于在进程间共享数据。
from multiprocessing import Process, Value, Arraydef f(n, a):n.value = 3.1415927for i in range(len(a)):a[i] = -a[i]if __name__ == '__main__':num = Value('d', 0.0)arr = Array('i', range(10))p = Process(target=f, args=(num, arr))p.start()p.join()print(num.value) # 输出: 3.1415927print(arr[:]) # 输出: [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) # 输出: 3.1415927 print(arr[:]) # 输出: [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) # 输出: 3.1415927 print(arr[:]) # 输出: [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END