Skip to main content Brad's PyNotes

Concurrent Futures

TL;DR

The concurrent.futures module provides a simple, high-level interface for executing tasks concurrently using either threads or processes, making parallel programming accessible without dealing with low-level threading or multiprocessing details.

Interesting!

Unlike traditional threading, concurrent.futures automatically manages pool lifecycles and provides a unified interface - you can switch between thread-based and process-based execution by simply changing the executor class.

The Two Executors

Python’s concurrent.futures offers two main executor classes:

python code snippet start

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def slow_task(n):
    time.sleep(1)
    return n * n

# Thread-based (good for I/O-bound tasks)
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(slow_task, [1, 2, 3, 4]))

# Process-based (good for CPU-bound tasks)
with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(slow_task, [1, 2, 3, 4]))

python code snippet end

Submit vs Map

Two main ways to execute tasks - submit() for individual tasks, map() for batch processing:

python code snippet start

from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_url(url):
    # Simulate web request
    return f"Data from {url}"

urls = ['http://site1.com', 'http://site2.com', 'http://site3.com']

# Using submit() - more control
with ThreadPoolExecutor(max_workers=3) as executor:
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}
    
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"Got {data}")
        except Exception as exc:
            print(f"{url} generated an exception: {exc}")

# Using map() - simpler for uniform tasks
with ThreadPoolExecutor(max_workers=3) as executor:
    results = list(executor.map(fetch_url, urls))

python code snippet end

Error Handling and Timeouts

Futures provide built-in error handling and timeout support:

python code snippet start

from concurrent.futures import ThreadPoolExecutor, TimeoutError

def risky_task(n):
    if n == 3:
        raise ValueError("Number 3 is unlucky!")
    time.sleep(n)
    return n * 2

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(risky_task, i) for i in range(1, 5)]
    
    for future in futures:
        try:
            result = future.result(timeout=2)  # 2-second timeout
            print(f"Result: {result}")
        except TimeoutError:
            print("Task timed out")
        except ValueError as e:
            print(f"Task failed: {e}")

python code snippet end

The concurrent.futures module transforms complex parallel programming into simple, readable code while handling the tricky details of thread and process management for you.

Threading and Multiprocessing provide lower-level control over concurrent execution.

Reference: concurrent.futures — Launching parallel tasks