Day05:Python中的并发和并行(3)
使用 multiprocessing 模块实现多进程
Python 中的 multiprocessing 模块提供了一种创建进程的方式,允许你并行运行代码。这对于受全局解释器锁(GIL)限制的多线程可能有限的 CPU 密集型任务特别有用。通过利用多个核心,multiprocessing 可以显著提高计算密集型操作的性能。本课程将涵盖使用 multiprocessing 模块的基础知识,包括进程创建、通信和同步。
理解 multiprocessing 模块
multiprocessing 模块是一个支持使用与 threading 模块相似的 API 来创建进程的包。它提供了本地和远程并发,通过使用子进程而不是线程来有效规避全局解释器锁(GIL)。正因如此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。
关键概念
- 进程: 执行中的程序实例。每个进程都有自己独立的内存空间,这意味着变量默认情况下不会在进程之间共享。
- 池: 一组工作进程的集合。使用池比手动创建和销毁进程更高效,尤其是在你需要执行许多任务时。
- 队列: 一种允许进程通过发送和接收消息进行通信的数据结构。
- 管道: 两个进程之间的连接,允许它们发送和接收数据。
- 锁: 一种同步原语,防止多个进程同时访问共享资源,避免竞态条件。
- 值和数组: 多个进程可以访问的共享内存对象。
为什么使用多进程?
多进程特别适用于:
- CPU 密集型任务: 那些大部分时间用于执行计算的任务,例如图像处理、科学模拟和数据分析。
- 可并行化任务: 那些可以分解为更小、独立子任务的任务,这些子任务可以同时执行。
- 绕过 GIL: CPython 中的全局解释器锁限制了线程的真正并行性。多进程通过使用独立的进程绕过这一限制。
创建进程
使用 multiprocessing 模块的基本方式是创建 Process 对象。每个 Process 对象会在一个单独的进程中运行目标函数。
基本进程创建
import multiprocessing
import time
def worker(num):
"""Worker function to be executed in a separate process"""
print(f"Worker {num}: Starting")
time.sleep(2) # 模拟一些工作
print(f"Worker {num}: Finishing")
if __name__ == '__main__':
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join() # 等待所有进程完成
print("All workers finished")
解释:
import multiprocessing: 导入必要的模块。worker(num): 定义每个进程将执行的函数。它接受一个数字作为参数来标识工作进程。multiprocessing.Process(target=worker, args=(i,)): 创建一个Process对象,指定目标函数 (worker) 及其参数 (i)。注意args=(i,)中的逗号,这是将其作为元组所必需的,即使只有一个元素。p.start(): 启动进程,导致worker函数在一个单独的进程中被执行。p.join(): 等待进程完成后再继续。这确保了主程序不会在所有工作进程都完成之前退出。if __name__ == '__main__':: 这至关重要。在一些平台(尤其是 Windows)上,multiprocessing模块要求主程序的入口点受此条件保护,以避免递归地生成新进程。
向进程传递参数
args 参数允许你向目标函数传递参数。它必须是一个元组。
import multiprocessing
import time
def calculate_square(number, result, square_sum):
"""
Calculates the square of a number and stores the result in a shared memory array.
Also updates the sum of squares in a shared memory value.
"""
print(f"Calculating square of {number}")
square = number * number
result[number] = square # 将正方形存储在结果数组中
with square_sum.get_lock():
square_sum.value += square
print(f"Square of {number} is {square}")
if __name__ == '__main__':
numbers = range(1, 6)
result = multiprocessing.Array('i', len(numbers)) # 'i' for integer
square_sum = multiprocessing.Value('i', 0)
processes = []
for i, number in enumerate(numbers):
p = multiprocessing.Process(target=calculate_square, args=(number, result, square_sum))
processes.append(p)
p.start()
for p in processes:
p.join()
print("Result:", list(result))
print("Sum of squares:", square_sum.value)
解释:
multiprocessing.Array('i', len(numbers)): 创建一个共享内存的整型数组。第一个参数,'i',指定数据类型(此处为整型)。第二个参数指定数组的大小。multiprocessing.Value('i', 0): 创建一个整型的共享内存值,初始值为 0。with square_sum.get_lock():: 在更新共享的square_sum值之前获取锁。这防止了竞态条件,即多个进程可能同时尝试更新值,导致结果错误。使用with语句确保锁在异常发生时自动释放。
进程名称和 ID
每个进程都有一个名称和进程 ID(PID)。您可以使用 name 和 pid 属性来访问 Process 对象。
import multiprocessing
import os
def worker():
print(f"Worker name: {multiprocessing.current_process().name}")
print(f"Worker PID: {os.getpid()}")
if __name__ == '__main__':
p = multiprocessing.Process(target=worker, name="MyWorkerProcess")
p.start()
p.join()
解释:
multiprocessing.current_process(): 返回代表当前进程的Process对象。os.getpid(): 返回当前进程的进程 ID。这是获取 PID 的另一种方法,使用os模块。name="MyWorkerProcess": 在创建进程时设置进程的名称。如果你不指定名称,它将默认为类似"Process-1"的名称。
进程池
进程池管理一组工作进程。当你有许多任务需要执行时,使用进程池通常比手动创建和销毁进程更高效。
使用 Pool.apply()
apply() 方法在一个工作进程中执行带有给定参数的函数,直到结果准备好才会解除阻塞。
import multiprocessing
import time
def cube(x):
"""Calculates the cube of a number"""
time.sleep(1) # 模拟一些工作
return x * x * x
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4) # 创建一个由 4 个工作进程组成的池
numbers = [1, 2, 3, 4, 5]
results = []
for num in numbers:
result = pool.apply(cube, args=(num,))
results.append(result)
pool.close() # 阻止将更多任务提交到池中
pool.join() # 等待所有任务完成
print("Results:", results)
解释:
multiprocessing.Pool(processes=4): 创建一个包含 4 个工作进程的进程池。参数processes指定要使用的工作进程数量。如果省略,则默认为系统中的 CPU 数量。pool.apply(cube, args=(num,)): 在其中一个工作进程中应用cube函数到参数num。它会阻塞直到结果可用。pool.close(): 防止向进程池提交更多任务。在调用join()之前必须调用close()。pool.join(): 等待池中的所有任务完成。
使用 Pool.map()
map() 方法将一个函数应用于可迭代对象中的每个元素,并返回一个包含结果列表。这是一种并行化简单循环的便捷方式。
import multiprocessing
import time
def square(x):
"""Calculates the square of a number"""
time.sleep(1) # Simulate some work
return x * x
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
numbers = [1, 2, 3, 4, 5]
results = pool.map(square, numbers) # Apply the square function to each number in parallel
pool.close()
pool.join()
print("Results:", results)
解释:
pool.map(square, numbers): 将函数square 并行应用于 `numbers 列表中的每个数字。它返回一个包含结果的列表。结果的顺序与输入可迭代对象的顺序相对应。`
使用 Pool.imap()
imap() 方法与 map() 类似,但它返回一个迭代器,该迭代器在结果可用时产生结果。如果你不需要一次性获得所有结果,这种方式可能更高效。
import multiprocessing
import time
def double(x):
"""Calculates double of a number"""
time.sleep(1) # Simulate some work
return x * 2
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
numbers = [1, 2, 3, 4, 5]
results = pool.imap(double, numbers) # Returns an iterator
for result in results:
print("Result:", result)
pool.close()
pool.join()
解释:
pool.imap(double, numbers):将double函数应用于numbers列表中的每个数字,并返回一个迭代器。for result in results::在结果可用时进行迭代。这避免了在处理第一个结果之前等待所有任务完成。
使用 Pool.apply_async() 和 Pool.map_async()
这些是 apply() 和 map() 的异步版本。它们返回一个 AsyncResult 对象,允许你检查任务状态并在稍后获取结果。
import multiprocessing
import time
def triple(x):
"""Calculates the triple of a number"""
time.sleep(1) # Simulate some work
return x * 3
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
numbers = [1, 2, 3, 4, 5]
# Asynchronous map
result = pool.map_async(triple, numbers)
pool.close()
pool.join()
print("Results:", result.get()) # Get the results
解释:
result = pool.map_async(triple, numbers): 异步地将triple函数应用于numbers列表中的每个数字。它立即返回一个AsyncResult对象,而无需等待任务完成。result.get(): 等待所有任务完成并返回结果。如果任务已经完成,它会立即返回结果。
进程间通信 (IPC)
由于进程拥有独立的内存空间,你需要使用特殊的机制来在它们之间共享数据。multiprocessing 模块提供了多种实现这一功能的方法。
队列
队列是一种线程和进程安全的在进程间传递消息的方式。
import multiprocessing
def producer(queue):
"""Sends messages to the queue"""
for i in range(5):
message = f"Message {i}"
print(f"Producer: Sending {message}")
queue.put(message)
def consumer(queue):
"""Receives messages from the queue"""
while True:
message = queue.get()
if message == "DONE":
break
print(f"Consumer: Received {message}")
if __name__ == '__main__':
queue = multiprocessing.Queue()
producer_process = multiprocessing.Process(target=producer, args=(queue,))
consumer_process = multiprocessing.Process(target=consumer, args=(queue,))
producer_process.start()
consumer_process.start()
producer_process.join()
queue.put("DONE") # 发出信号让消费者退出
consumer_process.join()
print("Done")
解释:
multiprocessing.Queue(): 创建一个进程安全的队列。queue.put(message): 将消息放入队列。message = queue.get(): 从队列中检索消息。它会阻塞直到消息可用。queue.put("DONE"): 发送一个特殊的"DONE"消息以通知消费者进程退出。这是终止等待消息的消费者进程的常用方法。
管道
管道为两个进程提供了一种简单的通信方式。它们通常用于单向通信。
import multiprocessing
def sender(conn, messages):
"""Sends messages through the connection"""
for message in messages:
print(f"Sender: Sending {message}")
conn.send(message)
conn.close()
def receiver(conn):
"""Receives messages from the connection"""
while True:
try:
message = conn.recv()
print(f"Receiver: Received {message}")
except EOFError:
break # 连接已关闭
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe() # Create a pipe
messages = ["Hello", "World", "From", "Pipe"]
sender_process = multiprocessing.Process(target=sender, args=(child_conn, messages))
receiver_process = multiprocessing.Process(target=receiver, args=(parent_conn,))
sender_process.start()
receiver_process.start()
sender_process.join()
receiver_process.join()
print("Done")
解释:
parent_conn, child_conn = multiprocessing.Pipe(): 创建一个管道。它返回两个连接对象:parent_conn和child_conn。conn.send(message): 通过连接发送消息。message = conn.recv(): 从连接中接收消息。它会在消息可用时阻塞。conn.close(): 关闭连接。完成使用后关闭连接很重要。EOFError: 当sender关闭管道的端点时,receiver捕获EOFError,表示没有更多消息可以接收。
共享内存
multiprocessing 模块提供了可以被多个进程访问的共享内存对象。这对于共享大量数据很有用。
import multiprocessing
import time
def increment(number, lock):
"""Increments the shared number"""
for _ in range(100000):
with lock:
number.value += 1
if __name__ == '__main__':
number = multiprocessing.Value('i', 0) # 共享整数值
lock = multiprocessing.Lock() # 锁定同步
processes = []
for _ in range(3):
p = multiprocessing.Process(target=increment, args=(number, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
print("Final value:", number.value)
解释:
number = multiprocessing.Value('i', 0): 创建一个整型的共享内存值,初始值为 0。lock = multiprocessing.Lock(): 创建一个锁,用于同步对共享内存的访问。with lock:: 在增加共享数字之前获取锁。这可以防止竞态条件。
同步原语
同步原语用于协调对共享资源的访问并防止竞态条件。《multiprocessing》模块提供了多种同步原语,包括:
- 锁: 一种基本的锁,每次只允许一个进程获取。
- RLock: 可被同一进程多次获取的可重入锁。
- 信号量: 一种更通用的同步原语,允许有限数量的进程访问一个资源。
- 条件: 允许进程等待直到某个条件满足。
- 事件: 一种简单的信号机制,允许一个进程通知其他进程某个事件已经发生。
之前的示例展示了 Lock 的使用。这里是一个使用 Semaphore 的示例:
import multiprocessing
import time
import random
def worker(semaphore, worker_id):
"""Worker function that acquires and releases a semaphore"""
semaphore.acquire()
try:
print(f"Worker {worker_id}: Acquired semaphore")
time.sleep(random.random()) # 模拟一些工作
finally:
print(f"Worker {worker_id}: Releasing semaphore")
semaphore.release()
if __name__ == '__main__':
semaphore = multiprocessing.Semaphore(2) # 每次只允许 2 名
processes = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(semaphore, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print("All workers finished")
解释:
semaphore = multiprocessing.Semaphore(2):创建一个信号量,一次只允许 2 个进程获取它。semaphore.acquire():获取信号量。如果信号量已经达到最大计数,进程将会阻塞,直到另一个进程释放它。semaphore.release(): 释放信号量,允许另一个进程获取它。finally:: 确保信号量总是被释放,即使发生异常也是如此。
原文地址:https://blog.csdn.net/u011290209/article/details/149143521
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!
