multiprocessing
是一个使用类似于线程模块的API来支持多进程的包。多处理包提供本地和远程并发,通过使用子进程而不是线程有效的避开全局解释器锁(GIL),由于这个原因,多处理模块允许程序员充分利用给定机器上的多个处理器。
使用多进程往往是用来处理CPU密集型的需求,如科学计算;如果是I/O密集型,则可以使用多线程去处理,比如爬虫。
创建和启动进程(Process类)
用法与Threading模块中的Thread类似。
进程池(Pool类)
1.apply和apply_async
2.map和map_async
3.两种的区别
区别 | apply_async | map_async |
---|---|---|
任务数量 | 用于异步执行单个任务 | 用于异步执行多个任务,将一个函数应用到一个可迭代对象的每个元素上 |
返回值 | 返回一个AsyncResult 对象,表示单个任务的结果。 | 返回一个AsyncResult 对象,表示多个任务的结果列表。 |
参数传递 | 需要传递函数和函数的参数。 | 需要传递函数和一个可迭代对象。 |
使用场景 | 适用于需要异步执行单个任务的场景。 | 适用于需要并行处理大量数据或计算密集型任务的场景。 |
4.close与join方法
close
方法用于阻止更多的任务提交到进程池,而join
方法用于等待所有工作进程完成其任务。在使用with
语句上下文管理器时,close
和join
会被自动调用,但在手动管理进程池时,需要显式调用这两个方法。如下所示。
from multiprocessing import Pool
def f(x):
if x == 5:
raise ValueError("Invalid value")
return x*x
if __name__ == '__main__':
pool = Pool(processes=4)
results = []
try:
for i in range(10):
try:
result = pool.apply(f, (i,)) # 同步执行
results.append(result)
except ValueError as e:
print(f"Caught exception: {e}")
print(results)
finally:
pool.close() # 阻止更多的任务提交到进程池
pool.join() # 等待所有工作进程完成
进程间通信
multiprocessing
模块提供了多种进程间通信(IPC)机制,包括队列(Queue)和管道(Pipe)。
使用队列(Queue)
Queue
类是线程和进程安全的,适用于在多个进程之间传递数据。
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # 输出: [42, None, 'hello']
p.join()
使用管道(Pipe)
Pipe
类返回一对连接对象,分别代表管道的两端。每个连接对象都有send()
和recv()
方法。
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
recive_conn, send_conn = Pipe(duplex=False) #默认为False,表示只能单向通信,conn1只能接收消息,conn2只能发送消息。true为双向。
p = Process(target=f, args=(send_conn,))
p.start()
print(recive_conn.recv()) # 输出: [42, None, 'hello']
p.join()
共享内存
multiprocessing
模块还提供了共享内存对象,如Value
和Array
,用于在进程间共享数据。
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value) # 输出: 3.1415927
print(arr[:]) # 输出: [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END