Multiprocessing Module
TL;DR
The multiprocessing
module creates separate Python processes that bypass the GIL, enabling true parallel execution for CPU-intensive tasks.
Interesting!
Unlike threading, multiprocessing actually uses multiple CPU cores simultaneously - each process has its own Python interpreter and memory space!
Basic Process Creation
python code snippet start
import multiprocessing as mp
import time
def worker_task(name, duration):
print(f"Worker {name} starting...")
time.sleep(duration)
print(f"Worker {name} finished after {duration} seconds")
if __name__ == '__main__':
# Create and start processes
processes = []
for i in range(3):
p = mp.Process(target=worker_task, args=(f"Process-{i}", 2))
processes.append(p)
p.start()
# Wait for all processes to complete
for p in processes:
p.join()
print("All processes completed!")
python code snippet end
Process Pools for Parallel Work
python code snippet start
import multiprocessing as mp
def cpu_intensive_task(n):
"""Simulate CPU-intensive work."""
result = 0
for i in range(n * 1000000):
result += i ** 2
return result
def square_number(x):
return x * x
if __name__ == '__main__':
# Process Pool for parallel execution
with mp.Pool(processes=4) as pool:
# Map function across multiple inputs
numbers = [1, 2, 3, 4, 5, 6, 7, 8]
squared = pool.map(square_number, numbers)
print(f"Squared: {squared}")
# Apply async for non-blocking execution
result = pool.apply_async(cpu_intensive_task, (100,))
print("Doing other work while task runs...")
print(f"Task result: {result.get()}") # Blocks until complete
python code snippet end
Inter-Process Communication
python code snippet start
import multiprocessing as mp
import time
def producer(queue, items):
"""Put items into the queue."""
for item in items:
print(f"Producing: {item}")
queue.put(item)
time.sleep(0.5)
queue.put(None) # Signal completion
def consumer(queue):
"""Get items from the queue."""
while True:
item = queue.get()
if item is None:
break
print(f"Consuming: {item}")
time.sleep(1)
if __name__ == '__main__':
# Create a queue for communication
queue = mp.Queue()
# Create producer and consumer processes
items = ["apple", "banana", "cherry", "date"]
producer_process = mp.Process(target=producer, args=(queue, items))
consumer_process = mp.Process(target=consumer, args=(queue,))
# Start both processes
producer_process.start()
consumer_process.start()
# Wait for completion
producer_process.join()
consumer_process.join()
python code snippet end
Shared Memory
python code snippet start
import multiprocessing as mp
import time
def worker_with_shared_data(shared_value, shared_array, lock, worker_id):
"""Worker that modifies shared data safely."""
for i in range(5):
with lock: # Prevent race conditions
shared_value.value += 1
shared_array[worker_id] = shared_value.value
print(f"Worker {worker_id}: value={shared_value.value}, array={list(shared_array[:])}")
time.sleep(0.1)
if __name__ == '__main__':
# Create shared memory objects
shared_value = mp.Value('i', 0) # Shared integer
shared_array = mp.Array('i', [0, 0, 0]) # Shared array
lock = mp.Lock() # Synchronization primitive
# Create processes that share data
processes = []
for i in range(3):
p = mp.Process(target=worker_with_shared_data,
args=(shared_value, shared_array, lock, i))
processes.append(p)
p.start()
# Wait for all processes
for p in processes:
p.join()
print(f"Final value: {shared_value.value}")
print(f"Final array: {list(shared_array[:])}")
python code snippet end
Manager Objects
python code snippet start
import multiprocessing as mp
def worker_with_manager(shared_dict, shared_list, worker_id):
"""Worker using manager objects for complex shared data."""
shared_dict[f"worker_{worker_id}"] = f"Hello from worker {worker_id}"
shared_list.append(f"Message from worker {worker_id}")
if __name__ == '__main__':
# Create manager for complex shared objects
with mp.Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
# Create processes
processes = []
for i in range(4):
p = mp.Process(target=worker_with_manager,
args=(shared_dict, shared_list, i))
processes.append(p)
p.start()
# Wait for completion
for p in processes:
p.join()
print("Shared dictionary:", dict(shared_dict))
print("Shared list:", list(shared_list))
python code snippet end
Real-World Example: Parallel File Processing
python code snippet start
import multiprocessing as mp
import os
import time
from pathlib import Path
def process_file(file_path):
"""Process a single file (simulate work)."""
file_size = os.path.getsize(file_path)
time.sleep(0.1) # Simulate processing time
return {
'file': file_path.name,
'size': file_size,
'processed_by': mp.current_process().name
}
def parallel_file_processor(directory_path, max_workers=4):
"""Process all files in directory using multiple processes."""
directory = Path(directory_path)
files = [f for f in directory.iterdir() if f.is_file()]
if not files:
print("No files to process")
return []
# Process files in parallel
with mp.Pool(processes=max_workers) as pool:
results = pool.map(process_file, files)
return results
if __name__ == '__main__':
# Example usage
current_dir = "."
results = parallel_file_processor(current_dir)
print(f"Processed {len(results)} files:")
for result in results:
print(f" {result['file']}: {result['size']} bytes (by {result['processed_by']})")
python code snippet end
CPU-Bound vs I/O-Bound Tasks
python code snippet start
import multiprocessing as mp
import threading
import time
import requests
def cpu_bound_task(n):
"""CPU-intensive task - benefits from multiprocessing."""
total = 0
for i in range(n):
total += i ** 2
return total
def io_bound_task(url):
"""I/O-intensive task - threading might be sufficient."""
try:
response = requests.get(url, timeout=5)
return len(response.content)
except:
return 0
if __name__ == '__main__':
# CPU-bound: Use multiprocessing
numbers = [1000000] * 4
start_time = time.time()
with mp.Pool() as pool:
results = pool.map(cpu_bound_task, numbers)
mp_time = time.time() - start_time
print(f"Multiprocessing (CPU-bound): {mp_time:.2f} seconds")
# For comparison - sequential execution
start_time = time.time()
results = [cpu_bound_task(n) for n in numbers]
seq_time = time.time() - start_time
print(f"Sequential (CPU-bound): {seq_time:.2f} seconds")
print(f"Speedup: {seq_time/mp_time:.1f}x")
python code snippet end
Best Practices
python code snippet start
import multiprocessing as mp
def safe_worker(data):
"""Always protect your main code with if __name__ == '__main__'."""
return data * 2
# This prevents infinite process spawning on Windows
if __name__ == '__main__':
# Set start method (optional, platform-dependent)
mp.set_start_method('spawn', force=True) # Use 'spawn' for cross-platform
# Use context managers for automatic cleanup
with mp.Pool() as pool:
results = pool.map(safe_worker, [1, 2, 3, 4, 5])
print(results)
# For single processes, always join
process = mp.Process(target=safe_worker, args=(10,))
process.start()
process.join() # Wait for completion
print("All done!")
python code snippet end
Multiprocessing is perfect for CPU-intensive tasks where you need to utilize multiple cores - just remember that each process has its own memory space and communication requires special mechanisms! Compare with threading for I/O-bound tasks and asyncio for concurrent I/O . Use with pathlib for file processing and logging across processes .
Reference: multiprocessing — Process-based parallelism