Day05:Python中的并发和并行(3)
使用 multiprocessing
模块实现多进程
Python 中的 multiprocessing
模块提供了一种创建进程的方式,允许你并行运行代码。这对于受全局解释器锁(GIL)限制的多线程可能有限的 CPU 密集型任务特别有用。通过利用多个核心,multiprocessing 可以显著提高计算密集型操作的性能。本课程将涵盖使用 multiprocessing
模块的基础知识,包括进程创建、通信和同步。
理解 multiprocessing
模块
multiprocessing
模块是一个支持使用与 threading
模块相似的 API 来创建进程的包。它提供了本地和远程并发,通过使用子进程而不是线程来有效规避全局解释器锁(GIL)。正因如此,multiprocessing
模块允许程序员充分利用给定机器上的多个处理器。
关键概念
- 进程: 执行中的程序实例。每个进程都有自己独立的内存空间,这意味着变量默认情况下不会在进程之间共享。
- 池: 一组工作进程的集合。使用池比手动创建和销毁进程更高效,尤其是在你需要执行许多任务时。
- 队列: 一种允许进程通过发送和接收消息进行通信的数据结构。
- 管道: 两个进程之间的连接,允许它们发送和接收数据。
- 锁: 一种同步原语,防止多个进程同时访问共享资源,避免竞态条件。
- 值和数组: 多个进程可以访问的共享内存对象。
为什么使用多进程?
多进程特别适用于:
- CPU 密集型任务: 那些大部分时间用于执行计算的任务,例如图像处理、科学模拟和数据分析。
- 可并行化任务: 那些可以分解为更小、独立子任务的任务,这些子任务可以同时执行。
- 绕过 GIL: CPython 中的全局解释器锁限制了线程的真正并行性。多进程通过使用独立的进程绕过这一限制。
创建进程
使用 multiprocessing
模块的基本方式是创建 Process
对象。每个 Process
对象会在一个单独的进程中运行目标函数。
基本进程创建
import multiprocessing
import time
def worker(num):
"""Worker function to be executed in a separate process"""
print(f"Worker {num}: Starting")
time.sleep(2) # 模拟一些工作
print(f"Worker {num}: Finishing")
if __name__ == '__main__':
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join() # 等待所有进程完成
print("All workers finished")
解释:
import multiprocessing
: 导入必要的模块。worker(num)
: 定义每个进程将执行的函数。它接受一个数字作为参数来标识工作进程。multiprocessing.Process(target=worker, args=(i,))
: 创建一个Process
对象,指定目标函数 (worker
) 及其参数 (i
)。注意args=(i,)
中的逗号,这是将其作为元组所必需的,即使只有一个元素。p.start()
: 启动进程,导致worker
函数在一个单独的进程中被执行。p.join()
: 等待进程完成后再继续。这确保了主程序不会在所有工作进程都完成之前退出。if __name__ == '__main__':
: 这至关重要。在一些平台(尤其是 Windows)上,multiprocessing
模块要求主程序的入口点受此条件保护,以避免递归地生成新进程。
向进程传递参数
args
参数允许你向目标函数传递参数。它必须是一个元组。
import multiprocessing
import time
def calculate_square(number, result, square_sum):
"""
Calculates the square of a number and stores the result in a shared memory array.
Also updates the sum of squares in a shared memory value.
"""
print(f"Calculating square of {number}")
square = number * number
result[number] = square # 将正方形存储在结果数组中
with square_sum.get_lock():
square_sum.value += square
print(f"Square of {number} is {square}")
if __name__ == '__main__':
numbers = range(1, 6)
result = multiprocessing.Array('i', len(numbers)) # 'i' for integer
square_sum = multiprocessing.Value('i', 0)
processes = []
for i, number in enumerate(numbers):
p = multiprocessing.Process(target=calculate_square, args=(number, result, square_sum))
processes.append(p)
p.start()
for p in processes:
p.join()
print("Result:", list(result))
print("Sum of squares:", square_sum.value)
解释:
multiprocessing.Array('i', len(numbers))
: 创建一个共享内存的整型数组。第一个参数,'i'
,指定数据类型(此处为整型)。第二个参数指定数组的大小。multiprocessing.Value('i', 0)
: 创建一个整型的共享内存值,初始值为 0。with square_sum.get_lock():
: 在更新共享的square_sum
值之前获取锁。这防止了竞态条件,即多个进程可能同时尝试更新值,导致结果错误。使用with
语句确保锁在异常发生时自动释放。
进程名称和 ID
每个进程都有一个名称和进程 ID(PID)。您可以使用 name
和 pid
属性来访问 Process
对象。
import multiprocessing
import os
def worker():
print(f"Worker name: {multiprocessing.current_process().name}")
print(f"Worker PID: {os.getpid()}")
if __name__ == '__main__':
p = multiprocessing.Process(target=worker, name="MyWorkerProcess")
p.start()
p.join()
解释:
multiprocessing.current_process()
: 返回代表当前进程的Process
对象。os.getpid()
: 返回当前进程的进程 ID。这是获取 PID 的另一种方法,使用os
模块。name="MyWorkerProcess"
: 在创建进程时设置进程的名称。如果你不指定名称,它将默认为类似"Process-1"的名称。
进程池
进程池管理一组工作进程。当你有许多任务需要执行时,使用进程池通常比手动创建和销毁进程更高效。
使用 Pool.apply()
apply()
方法在一个工作进程中执行带有给定参数的函数,直到结果准备好才会解除阻塞。
import multiprocessing
import time
def cube(x):
"""Calculates the cube of a number"""
time.sleep(1) # 模拟一些工作
return x * x * x
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4) # 创建一个由 4 个工作进程组成的池
numbers = [1, 2, 3, 4, 5]
results = []
for num in numbers:
result = pool.apply(cube, args=(num,))
results.append(result)
pool.close() # 阻止将更多任务提交到池中
pool.join() # 等待所有任务完成
print("Results:", results)
解释:
multiprocessing.Pool(processes=4)
: 创建一个包含 4 个工作进程的进程池。参数processes
指定要使用的工作进程数量。如果省略,则默认为系统中的 CPU 数量。pool.apply(cube, args=(num,))
: 在其中一个工作进程中应用cube
函数到参数num
。它会阻塞直到结果可用。pool.close()
: 防止向进程池提交更多任务。在调用join()
之前必须调用close()
。pool.join()
: 等待池中的所有任务完成。
使用 Pool.map()
map()
方法将一个函数应用于可迭代对象中的每个元素,并返回一个包含结果列表。这是一种并行化简单循环的便捷方式。
import multiprocessing
import time
def square(x):
"""Calculates the square of a number"""
time.sleep(1) # Simulate some work
return x * x
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
numbers = [1, 2, 3, 4, 5]
results = pool.map(square, numbers) # Apply the square function to each number in parallel
pool.close()
pool.join()
print("Results:", results)
解释:
pool.map(square, numbers)
: 将函数square 并行应用于 `numbers 列表中的每个数字。它返回一个包含结果的列表。结果的顺序与输入可迭代对象的顺序相对应。`
使用 Pool.imap()
imap()
方法与 map()
类似,但它返回一个迭代器,该迭代器在结果可用时产生结果。如果你不需要一次性获得所有结果,这种方式可能更高效。
import multiprocessing
import time
def double(x):
"""Calculates double of a number"""
time.sleep(1) # Simulate some work
return x * 2
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
numbers = [1, 2, 3, 4, 5]
results = pool.imap(double, numbers) # Returns an iterator
for result in results:
print("Result:", result)
pool.close()
pool.join()
解释:
pool.imap(double, numbers)
:将double
函数应用于numbers
列表中的每个数字,并返回一个迭代器。for result in results:
:在结果可用时进行迭代。这避免了在处理第一个结果之前等待所有任务完成。
使用 Pool.apply_async()
和 Pool.map_async()
这些是 apply()
和 map()
的异步版本。它们返回一个 AsyncResult
对象,允许你检查任务状态并在稍后获取结果。
import multiprocessing
import time
def triple(x):
"""Calculates the triple of a number"""
time.sleep(1) # Simulate some work
return x * 3
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
numbers = [1, 2, 3, 4, 5]
# Asynchronous map
result = pool.map_async(triple, numbers)
pool.close()
pool.join()
print("Results:", result.get()) # Get the results
解释:
result = pool.map_async(triple, numbers)
: 异步地将triple
函数应用于numbers
列表中的每个数字。它立即返回一个AsyncResult
对象,而无需等待任务完成。result.get()
: 等待所有任务完成并返回结果。如果任务已经完成,它会立即返回结果。
进程间通信 (IPC)
由于进程拥有独立的内存空间,你需要使用特殊的机制来在它们之间共享数据。multiprocessing
模块提供了多种实现这一功能的方法。
队列
队列是一种线程和进程安全的在进程间传递消息的方式。
import multiprocessing
def producer(queue):
"""Sends messages to the queue"""
for i in range(5):
message = f"Message {i}"
print(f"Producer: Sending {message}")
queue.put(message)
def consumer(queue):
"""Receives messages from the queue"""
while True:
message = queue.get()
if message == "DONE":
break
print(f"Consumer: Received {message}")
if __name__ == '__main__':
queue = multiprocessing.Queue()
producer_process = multiprocessing.Process(target=producer, args=(queue,))
consumer_process = multiprocessing.Process(target=consumer, args=(queue,))
producer_process.start()
consumer_process.start()
producer_process.join()
queue.put("DONE") # 发出信号让消费者退出
consumer_process.join()
print("Done")
解释:
multiprocessing.Queue()
: 创建一个进程安全的队列。queue.put(message)
: 将消息放入队列。message = queue.get()
: 从队列中检索消息。它会阻塞直到消息可用。queue.put("DONE")
: 发送一个特殊的"DONE"消息以通知消费者进程退出。这是终止等待消息的消费者进程的常用方法。
管道
管道为两个进程提供了一种简单的通信方式。它们通常用于单向通信。
import multiprocessing
def sender(conn, messages):
"""Sends messages through the connection"""
for message in messages:
print(f"Sender: Sending {message}")
conn.send(message)
conn.close()
def receiver(conn):
"""Receives messages from the connection"""
while True:
try:
message = conn.recv()
print(f"Receiver: Received {message}")
except EOFError:
break # 连接已关闭
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe() # Create a pipe
messages = ["Hello", "World", "From", "Pipe"]
sender_process = multiprocessing.Process(target=sender, args=(child_conn, messages))
receiver_process = multiprocessing.Process(target=receiver, args=(parent_conn,))
sender_process.start()
receiver_process.start()
sender_process.join()
receiver_process.join()
print("Done")
解释:
parent_conn, child_conn = multiprocessing.Pipe()
: 创建一个管道。它返回两个连接对象:parent_conn
和child_conn
。conn.send(message)
: 通过连接发送消息。message = conn.recv()
: 从连接中接收消息。它会在消息可用时阻塞。conn.close()
: 关闭连接。完成使用后关闭连接很重要。EOFError
: 当sender
关闭管道的端点时,receiver
捕获EOFError
,表示没有更多消息可以接收。
共享内存
multiprocessing
模块提供了可以被多个进程访问的共享内存对象。这对于共享大量数据很有用。
import multiprocessing
import time
def increment(number, lock):
"""Increments the shared number"""
for _ in range(100000):
with lock:
number.value += 1
if __name__ == '__main__':
number = multiprocessing.Value('i', 0) # 共享整数值
lock = multiprocessing.Lock() # 锁定同步
processes = []
for _ in range(3):
p = multiprocessing.Process(target=increment, args=(number, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
print("Final value:", number.value)
解释:
number = multiprocessing.Value('i', 0)
: 创建一个整型的共享内存值,初始值为 0。lock = multiprocessing.Lock()
: 创建一个锁,用于同步对共享内存的访问。with lock:
: 在增加共享数字之前获取锁。这可以防止竞态条件。
同步原语
同步原语用于协调对共享资源的访问并防止竞态条件。《multiprocessing
》模块提供了多种同步原语,包括:
- 锁: 一种基本的锁,每次只允许一个进程获取。
- RLock: 可被同一进程多次获取的可重入锁。
- 信号量: 一种更通用的同步原语,允许有限数量的进程访问一个资源。
- 条件: 允许进程等待直到某个条件满足。
- 事件: 一种简单的信号机制,允许一个进程通知其他进程某个事件已经发生。
之前的示例展示了 Lock
的使用。这里是一个使用 Semaphore
的示例:
import multiprocessing
import time
import random
def worker(semaphore, worker_id):
"""Worker function that acquires and releases a semaphore"""
semaphore.acquire()
try:
print(f"Worker {worker_id}: Acquired semaphore")
time.sleep(random.random()) # 模拟一些工作
finally:
print(f"Worker {worker_id}: Releasing semaphore")
semaphore.release()
if __name__ == '__main__':
semaphore = multiprocessing.Semaphore(2) # 每次只允许 2 名
processes = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(semaphore, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print("All workers finished")
解释:
semaphore = multiprocessing.Semaphore(2)
:创建一个信号量,一次只允许 2 个进程获取它。semaphore.acquire()
:获取信号量。如果信号量已经达到最大计数,进程将会阻塞,直到另一个进程释放它。semaphore.release()
: 释放信号量,允许另一个进程获取它。finally:
: 确保信号量总是被释放,即使发生异常也是如此。
原文地址:https://blog.csdn.net/u011290209/article/details/149143521
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!