Concurrent Futures
TL;DR
The concurrent.futures
module provides a simple, high-level interface for executing tasks concurrently using either threads or processes, making parallel programming accessible without dealing with low-level threading or multiprocessing details.
Interesting!
Unlike traditional threading, concurrent.futures
automatically manages pool lifecycles and provides a unified interface - you can switch between thread-based and process-based execution by simply changing the executor class.
The Two Executors
Python’s concurrent.futures offers two main executor classes:
python code snippet start
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def slow_task(n):
time.sleep(1)
return n * n
# Thread-based (good for I/O-bound tasks)
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(slow_task, [1, 2, 3, 4]))
# Process-based (good for CPU-bound tasks)
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(slow_task, [1, 2, 3, 4]))
python code snippet end
Submit vs Map
Two main ways to execute tasks - submit()
for individual tasks, map()
for batch processing:
python code snippet start
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_url(url):
# Simulate web request
return f"Data from {url}"
urls = ['http://site1.com', 'http://site2.com', 'http://site3.com']
# Using submit() - more control
with ThreadPoolExecutor(max_workers=3) as executor:
future_to_url = {executor.submit(fetch_url, url): url for url in urls}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
print(f"Got {data}")
except Exception as exc:
print(f"{url} generated an exception: {exc}")
# Using map() - simpler for uniform tasks
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(fetch_url, urls))
python code snippet end
Error Handling and Timeouts
Futures provide built-in error handling and timeout support:
python code snippet start
from concurrent.futures import ThreadPoolExecutor, TimeoutError
def risky_task(n):
if n == 3:
raise ValueError("Number 3 is unlucky!")
time.sleep(n)
return n * 2
with ThreadPoolExecutor() as executor:
futures = [executor.submit(risky_task, i) for i in range(1, 5)]
for future in futures:
try:
result = future.result(timeout=2) # 2-second timeout
print(f"Result: {result}")
except TimeoutError:
print("Task timed out")
except ValueError as e:
print(f"Task failed: {e}")
python code snippet end
The concurrent.futures module transforms complex parallel programming into simple, readable code while handling the tricky details of thread and process management for you.
Threading and Multiprocessing provide lower-level control over concurrent execution.