自学内容网 自学内容网

Day05:Python中的并发和并行(3)

使用 multiprocessing 模块实现多进程

Python 中的 multiprocessing 模块提供了一种创建进程的方式,允许你并行运行代码。这对于受全局解释器锁(GIL)限制的多线程可能有限的 CPU 密集型任务特别有用。通过利用多个核心,multiprocessing 可以显著提高计算密集型操作的性能。本课程将涵盖使用 multiprocessing 模块的基础知识,包括进程创建、通信和同步。

理解 multiprocessing 模块

multiprocessing 模块是一个支持使用与 threading 模块相似的 API 来创建进程的包。它提供了本地和远程并发,通过使用子进程而不是线程来有效规避全局解释器锁(GIL)。正因如此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。

关键概念

  • 进程: 执行中的程序实例。每个进程都有自己独立的内存空间,这意味着变量默认情况下不会在进程之间共享。
  • 池: 一组工作进程的集合。使用池比手动创建和销毁进程更高效,尤其是在你需要执行许多任务时。
  • 队列: 一种允许进程通过发送和接收消息进行通信的数据结构。
  • 管道: 两个进程之间的连接,允许它们发送和接收数据。
  • 锁: 一种同步原语,防止多个进程同时访问共享资源,避免竞态条件。
  • 值和数组: 多个进程可以访问的共享内存对象。

为什么使用多进程?

多进程特别适用于:

  • CPU 密集型任务: 那些大部分时间用于执行计算的任务,例如图像处理、科学模拟和数据分析。
  • 可并行化任务: 那些可以分解为更小、独立子任务的任务,这些子任务可以同时执行。
  • 绕过 GIL: CPython 中的全局解释器锁限制了线程的真正并行性。多进程通过使用独立的进程绕过这一限制。

创建进程

使用 multiprocessing 模块的基本方式是创建 Process 对象。每个 Process 对象会在一个单独的进程中运行目标函数。

基本进程创建

import multiprocessing
import time

def worker(num):
    """Worker function to be executed in a separate process"""
    print(f"Worker {num}: Starting")
    time.sleep(2)  # 模拟一些工作
    print(f"Worker {num}: Finishing")

if __name__ == '__main__':
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()  # 等待所有进程完成

    print("All workers finished")

解释:

  1. import multiprocessing: 导入必要的模块。
  2. worker(num): 定义每个进程将执行的函数。它接受一个数字作为参数来标识工作进程。
  3. multiprocessing.Process(target=worker, args=(i,)) : 创建一个 Process 对象,指定目标函数 (worker) 及其参数 (i)。注意 args=(i,) 中的逗号,这是将其作为元组所必需的,即使只有一个元素。
  4. p.start(): 启动进程,导致 worker 函数在一个单独的进程中被执行。
  5. p.join(): 等待进程完成后再继续。这确保了主程序不会在所有工作进程都完成之前退出。
  6. if __name__ == '__main__':: 这至关重要。在一些平台(尤其是 Windows)上,multiprocessing 模块要求主程序的入口点受此条件保护,以避免递归地生成新进程。

向进程传递参数

args 参数允许你向目标函数传递参数。它必须是一个元组。

import multiprocessing
import time

def calculate_square(number, result, square_sum):
    """
    Calculates the square of a number and stores the result in a shared memory array.
    Also updates the sum of squares in a shared memory value.
    """
    print(f"Calculating square of {number}")
    square = number * number
    result[number] = square  # 将正方形存储在结果数组中
    with square_sum.get_lock():
        square_sum.value += square
    print(f"Square of {number} is {square}")

if __name__ == '__main__':
    numbers = range(1, 6)
    result = multiprocessing.Array('i', len(numbers))  # 'i' for integer
    square_sum = multiprocessing.Value('i', 0)

    processes = []
    for i, number in enumerate(numbers):
        p = multiprocessing.Process(target=calculate_square, args=(number, result, square_sum))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print("Result:", list(result))
    print("Sum of squares:", square_sum.value)

解释:

  1. multiprocessing.Array('i', len(numbers)) : 创建一个共享内存的整型数组。第一个参数,'i',指定数据类型(此处为整型)。第二个参数指定数组的大小。
  2. multiprocessing.Value('i', 0): 创建一个整型的共享内存值,初始值为 0。
  3. with square_sum.get_lock():: 在更新共享的 square_sum 值之前获取锁。这防止了竞态条件,即多个进程可能同时尝试更新值,导致结果错误。使用 with 语句确保锁在异常发生时自动释放。

进程名称和 ID

每个进程都有一个名称和进程 ID(PID)。您可以使用 name 和 pid 属性来访问 Process 对象。

import multiprocessing
import os

def worker():
    print(f"Worker name: {multiprocessing.current_process().name}")
    print(f"Worker PID: {os.getpid()}")

if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, name="MyWorkerProcess")
    p.start()
    p.join()

解释:

  1. multiprocessing.current_process() : 返回代表当前进程的 Process 对象。
  2. os.getpid(): 返回当前进程的进程 ID。这是获取 PID 的另一种方法,使用 os 模块。
  3. name="MyWorkerProcess": 在创建进程时设置进程的名称。如果你不指定名称,它将默认为类似"Process-1"的名称。

进程池

进程池管理一组工作进程。当你有许多任务需要执行时,使用进程池通常比手动创建和销毁进程更高效。

使用 Pool.apply()

apply() 方法在一个工作进程中执行带有给定参数的函数,直到结果准备好才会解除阻塞。

import multiprocessing
import time

def cube(x):
    """Calculates the cube of a number"""
    time.sleep(1)  # 模拟一些工作
    return x * x * x

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)  # 创建一个由 4 个工作进程组成的池
    numbers = [1, 2, 3, 4, 5]
    results = []
    for num in numbers:
        result = pool.apply(cube, args=(num,))
        results.append(result)

    pool.close()  # 阻止将更多任务提交到池中
    pool.join()   # 等待所有任务完成

    print("Results:", results)

解释:

  1. multiprocessing.Pool(processes=4) : 创建一个包含 4 个工作进程的进程池。参数 processes 指定要使用的工作进程数量。如果省略,则默认为系统中的 CPU 数量。
  2. pool.apply(cube, args=(num,)): 在其中一个工作进程中应用 cube 函数到参数 num。它会阻塞直到结果可用。
  3. pool.close(): 防止向进程池提交更多任务。在调用 join() 之前必须调用 close()
  4. pool.join(): 等待池中的所有任务完成。

使用 Pool.map()

map() 方法将一个函数应用于可迭代对象中的每个元素,并返回一个包含结果列表。这是一种并行化简单循环的便捷方式。

import multiprocessing
import time

def square(x):
    """Calculates the square of a number"""
    time.sleep(1)  # Simulate some work
    return x * x

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    numbers = [1, 2, 3, 4, 5]
    results = pool.map(square, numbers)  # Apply the square function to each number in parallel

    pool.close()
    pool.join()

    print("Results:", results)

解释:

  1. pool.map(square, numbers): 将函数 square 并行应用于 `numbers 列表中的每个数字。它返回一个包含结果的列表。结果的顺序与输入可迭代对象的顺序相对应。`

使用 Pool.imap()

imap() 方法与 map() 类似,但它返回一个迭代器,该迭代器在结果可用时产生结果。如果你不需要一次性获得所有结果,这种方式可能更高效。

import multiprocessing
import time

def double(x):
    """Calculates double of a number"""
    time.sleep(1)  # Simulate some work
    return x * 2

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    numbers = [1, 2, 3, 4, 5]
    results = pool.imap(double, numbers)  # Returns an iterator

    for result in results:
        print("Result:", result)

    pool.close()
    pool.join()

解释:

  1. pool.imap(double, numbers):将 double 函数应用于 numbers 列表中的每个数字,并返回一个迭代器。
  2. for result in results::在结果可用时进行迭代。这避免了在处理第一个结果之前等待所有任务完成。

使用 Pool.apply_async() 和 Pool.map_async()

这些是 apply() 和 map() 的异步版本。它们返回一个 AsyncResult 对象,允许你检查任务状态并在稍后获取结果。

import multiprocessing
import time

def triple(x):
    """Calculates the triple of a number"""
    time.sleep(1)  # Simulate some work
    return x * 3

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    numbers = [1, 2, 3, 4, 5]

    # Asynchronous map
    result = pool.map_async(triple, numbers)

    pool.close()
    pool.join()

    print("Results:", result.get())  # Get the results

解释:

  1. result = pool.map_async(triple, numbers) : 异步地将 triple 函数应用于 numbers 列表中的每个数字。它立即返回一个 AsyncResult 对象,而无需等待任务完成。
  2. result.get(): 等待所有任务完成并返回结果。如果任务已经完成,它会立即返回结果。

进程间通信 (IPC)

由于进程拥有独立的内存空间,你需要使用特殊的机制来在它们之间共享数据。multiprocessing 模块提供了多种实现这一功能的方法。

队列

队列是一种线程和进程安全的在进程间传递消息的方式。

import multiprocessing

def producer(queue):
    """Sends messages to the queue"""
    for i in range(5):
        message = f"Message {i}"
        print(f"Producer: Sending {message}")
        queue.put(message)

def consumer(queue):
    """Receives messages from the queue"""
    while True:
        message = queue.get()
        if message == "DONE":
            break
        print(f"Consumer: Received {message}")

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    producer_process = multiprocessing.Process(target=producer, args=(queue,))
    consumer_process = multiprocessing.Process(target=consumer, args=(queue,))

    producer_process.start()
    consumer_process.start()

    producer_process.join()
    queue.put("DONE")  # 发出信号让消费者退出
    consumer_process.join()

    print("Done")

解释:

  1. multiprocessing.Queue(): 创建一个进程安全的队列。
  2. queue.put(message): 将消息放入队列。
  3. message = queue.get(): 从队列中检索消息。它会阻塞直到消息可用。
  4. queue.put("DONE"): 发送一个特殊的"DONE"消息以通知消费者进程退出。这是终止等待消息的消费者进程的常用方法。

管道

管道为两个进程提供了一种简单的通信方式。它们通常用于单向通信。

import multiprocessing

def sender(conn, messages):
    """Sends messages through the connection"""
    for message in messages:
        print(f"Sender: Sending {message}")
        conn.send(message)
    conn.close()

def receiver(conn):
    """Receives messages from the connection"""
    while True:
        try:
            message = conn.recv()
            print(f"Receiver: Received {message}")
        except EOFError:
            break  # 连接已关闭
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()  # Create a pipe
    messages = ["Hello", "World", "From", "Pipe"]

    sender_process = multiprocessing.Process(target=sender, args=(child_conn, messages))
    receiver_process = multiprocessing.Process(target=receiver, args=(parent_conn,))

    sender_process.start()
    receiver_process.start()

    sender_process.join()
    receiver_process.join()

    print("Done")

解释:

  1. parent_conn, child_conn = multiprocessing.Pipe() : 创建一个管道。它返回两个连接对象:parent_conn 和 child_conn
  2. conn.send(message): 通过连接发送消息。
  3. message = conn.recv(): 从连接中接收消息。它会在消息可用时阻塞。
  4. conn.close(): 关闭连接。完成使用后关闭连接很重要。
  5. EOFError: 当 sender 关闭管道的端点时,receiver 捕获 EOFError,表示没有更多消息可以接收。

共享内存

multiprocessing 模块提供了可以被多个进程访问的共享内存对象。这对于共享大量数据很有用。

import multiprocessing
import time

def increment(number, lock):
    """Increments the shared number"""
    for _ in range(100000):
        with lock:
            number.value += 1

if __name__ == '__main__':
    number = multiprocessing.Value('i', 0)  # 共享整数值
    lock = multiprocessing.Lock()  # 锁定同步

    processes = []
    for _ in range(3):
        p = multiprocessing.Process(target=increment, args=(number, lock))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print("Final value:", number.value)

解释:

  1. number = multiprocessing.Value('i', 0) : 创建一个整型的共享内存值,初始值为 0。
  2. lock = multiprocessing.Lock(): 创建一个锁,用于同步对共享内存的访问。
  3. with lock:: 在增加共享数字之前获取锁。这可以防止竞态条件。

同步原语

同步原语用于协调对共享资源的访问并防止竞态条件。《multiprocessing》模块提供了多种同步原语,包括:

  • 锁: 一种基本的锁,每次只允许一个进程获取。
  • RLock: 可被同一进程多次获取的可重入锁。
  • 信号量: 一种更通用的同步原语,允许有限数量的进程访问一个资源。
  • 条件: 允许进程等待直到某个条件满足。
  • 事件: 一种简单的信号机制,允许一个进程通知其他进程某个事件已经发生。

之前的示例展示了 Lock 的使用。这里是一个使用 Semaphore 的示例:

import multiprocessing
import time
import random

def worker(semaphore, worker_id):
    """Worker function that acquires and releases a semaphore"""
    semaphore.acquire()
    try:
        print(f"Worker {worker_id}: Acquired semaphore")
        time.sleep(random.random())  # 模拟一些工作
    finally:
        print(f"Worker {worker_id}: Releasing semaphore")
        semaphore.release()

if __name__ == '__main__':
    semaphore = multiprocessing.Semaphore(2)  # 每次只允许 2 名
    processes = []

    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(semaphore, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print("All workers finished")

解释:

  1. semaphore = multiprocessing.Semaphore(2) :创建一个信号量,一次只允许 2 个进程获取它。
  2. semaphore.acquire():获取信号量。如果信号量已经达到最大计数,进程将会阻塞,直到另一个进程释放它。
  3. semaphore.release(): 释放信号量,允许另一个进程获取它。
  4. finally:: 确保信号量总是被释放,即使发生异常也是如此。

原文地址:https://blog.csdn.net/u011290209/article/details/149143521

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!