Skip to main content Brad's PyNotes

Multiprocessing Module

TL;DR

The multiprocessing module creates separate Python processes that bypass the GIL, enabling true parallel execution for CPU-intensive tasks.

Interesting!

Unlike threading, multiprocessing actually uses multiple CPU cores simultaneously - each process has its own Python interpreter and memory space!

Basic Process Creation

python code snippet start

import multiprocessing as mp
import time

def worker_task(name, duration):
    print(f"Worker {name} starting...")
    time.sleep(duration)
    print(f"Worker {name} finished after {duration} seconds")

if __name__ == '__main__':
    # Create and start processes
    processes = []
    for i in range(3):
        p = mp.Process(target=worker_task, args=(f"Process-{i}", 2))
        processes.append(p)
        p.start()
    
    # Wait for all processes to complete
    for p in processes:
        p.join()
    
    print("All processes completed!")

python code snippet end

Process Pools for Parallel Work

python code snippet start

import multiprocessing as mp

def cpu_intensive_task(n):
    """Simulate CPU-intensive work."""
    result = 0
    for i in range(n * 1000000):
        result += i ** 2
    return result

def square_number(x):
    return x * x

if __name__ == '__main__':
    # Process Pool for parallel execution
    with mp.Pool(processes=4) as pool:
        # Map function across multiple inputs
        numbers = [1, 2, 3, 4, 5, 6, 7, 8]
        squared = pool.map(square_number, numbers)
        print(f"Squared: {squared}")
        
        # Apply async for non-blocking execution
        result = pool.apply_async(cpu_intensive_task, (100,))
        print("Doing other work while task runs...")
        print(f"Task result: {result.get()}")  # Blocks until complete

python code snippet end

Inter-Process Communication

python code snippet start

import multiprocessing as mp
import time

def producer(queue, items):
    """Put items into the queue."""
    for item in items:
        print(f"Producing: {item}")
        queue.put(item)
        time.sleep(0.5)
    queue.put(None)  # Signal completion

def consumer(queue):
    """Get items from the queue."""
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Consuming: {item}")
        time.sleep(1)

if __name__ == '__main__':
    # Create a queue for communication
    queue = mp.Queue()
    
    # Create producer and consumer processes
    items = ["apple", "banana", "cherry", "date"]
    producer_process = mp.Process(target=producer, args=(queue, items))
    consumer_process = mp.Process(target=consumer, args=(queue,))
    
    # Start both processes
    producer_process.start()
    consumer_process.start()
    
    # Wait for completion
    producer_process.join()
    consumer_process.join()

python code snippet end

Shared Memory

python code snippet start

import multiprocessing as mp
import time

def worker_with_shared_data(shared_value, shared_array, lock, worker_id):
    """Worker that modifies shared data safely."""
    for i in range(5):
        with lock:  # Prevent race conditions
            shared_value.value += 1
            shared_array[worker_id] = shared_value.value
            print(f"Worker {worker_id}: value={shared_value.value}, array={list(shared_array[:])}")
        time.sleep(0.1)

if __name__ == '__main__':
    # Create shared memory objects
    shared_value = mp.Value('i', 0)  # Shared integer
    shared_array = mp.Array('i', [0, 0, 0])  # Shared array
    lock = mp.Lock()  # Synchronization primitive
    
    # Create processes that share data
    processes = []
    for i in range(3):
        p = mp.Process(target=worker_with_shared_data, 
                      args=(shared_value, shared_array, lock, i))
        processes.append(p)
        p.start()
    
    # Wait for all processes
    for p in processes:
        p.join()
    
    print(f"Final value: {shared_value.value}")
    print(f"Final array: {list(shared_array[:])}")

python code snippet end

Manager Objects

python code snippet start

import multiprocessing as mp

def worker_with_manager(shared_dict, shared_list, worker_id):
    """Worker using manager objects for complex shared data."""
    shared_dict[f"worker_{worker_id}"] = f"Hello from worker {worker_id}"
    shared_list.append(f"Message from worker {worker_id}")

if __name__ == '__main__':
    # Create manager for complex shared objects
    with mp.Manager() as manager:
        shared_dict = manager.dict()
        shared_list = manager.list()
        
        # Create processes
        processes = []
        for i in range(4):
            p = mp.Process(target=worker_with_manager, 
                          args=(shared_dict, shared_list, i))
            processes.append(p)
            p.start()
        
        # Wait for completion
        for p in processes:
            p.join()
        
        print("Shared dictionary:", dict(shared_dict))
        print("Shared list:", list(shared_list))

python code snippet end

Real-World Example: Parallel File Processing

python code snippet start

import multiprocessing as mp
import os
import time
from pathlib import Path

def process_file(file_path):
    """Process a single file (simulate work)."""
    file_size = os.path.getsize(file_path)
    time.sleep(0.1)  # Simulate processing time
    return {
        'file': file_path.name,
        'size': file_size,
        'processed_by': mp.current_process().name
    }

def parallel_file_processor(directory_path, max_workers=4):
    """Process all files in directory using multiple processes."""
    directory = Path(directory_path)
    files = [f for f in directory.iterdir() if f.is_file()]
    
    if not files:
        print("No files to process")
        return []
    
    # Process files in parallel
    with mp.Pool(processes=max_workers) as pool:
        results = pool.map(process_file, files)
    
    return results

if __name__ == '__main__':
    # Example usage
    current_dir = "."
    results = parallel_file_processor(current_dir)
    
    print(f"Processed {len(results)} files:")
    for result in results:
        print(f"  {result['file']}: {result['size']} bytes (by {result['processed_by']})")

python code snippet end

CPU-Bound vs I/O-Bound Tasks

python code snippet start

import multiprocessing as mp
import threading
import time
import requests

def cpu_bound_task(n):
    """CPU-intensive task - benefits from multiprocessing."""
    total = 0
    for i in range(n):
        total += i ** 2
    return total

def io_bound_task(url):
    """I/O-intensive task - threading might be sufficient."""
    try:
        response = requests.get(url, timeout=5)
        return len(response.content)
    except:
        return 0

if __name__ == '__main__':
    # CPU-bound: Use multiprocessing
    numbers = [1000000] * 4
    
    start_time = time.time()
    with mp.Pool() as pool:
        results = pool.map(cpu_bound_task, numbers)
    mp_time = time.time() - start_time
    
    print(f"Multiprocessing (CPU-bound): {mp_time:.2f} seconds")
    
    # For comparison - sequential execution
    start_time = time.time()
    results = [cpu_bound_task(n) for n in numbers]
    seq_time = time.time() - start_time
    
    print(f"Sequential (CPU-bound): {seq_time:.2f} seconds")
    print(f"Speedup: {seq_time/mp_time:.1f}x")

python code snippet end

Best Practices

python code snippet start

import multiprocessing as mp

def safe_worker(data):
    """Always protect your main code with if __name__ == '__main__'."""
    return data * 2

# This prevents infinite process spawning on Windows
if __name__ == '__main__':
    # Set start method (optional, platform-dependent)
    mp.set_start_method('spawn', force=True)  # Use 'spawn' for cross-platform
    
    # Use context managers for automatic cleanup
    with mp.Pool() as pool:
        results = pool.map(safe_worker, [1, 2, 3, 4, 5])
        print(results)
    
    # For single processes, always join
    process = mp.Process(target=safe_worker, args=(10,))
    process.start()
    process.join()  # Wait for completion
    
    print("All done!")

python code snippet end

Multiprocessing is perfect for CPU-intensive tasks where you need to utilize multiple cores - just remember that each process has its own memory space and communication requires special mechanisms! Compare with threading for I/O-bound tasks and asyncio for concurrent I/O . Use with pathlib for file processing and logging across processes .

Reference: multiprocessing — Process-based parallelism