Skip to main content Brad's PyNotes

PEP 492 Async Await

TL;DR

PEP 492 introduced async def and await keywords, creating native coroutines that make asynchronous programming cleaner and more intuitive than generator-based approaches.

Interesting!

Unlike regular functions that run to completion, async functions can be paused mid-execution with await and resumed later - enabling concurrent execution without threads!

Basic Async Functions

python code snippet start

import asyncio

async def fetch_data():
    """An async function (coroutine)."""
    print("Starting to fetch data...")
    await asyncio.sleep(1)  # Simulate async operation
    print("Data fetched!")
    return {"id": 1, "name": "Alice"}

async def main():
    # Call async function with await
    data = await fetch_data()
    print(f"Received: {data}")

# Run the async function
asyncio.run(main())

python code snippet end

Concurrent Execution

python code snippet start

import asyncio
import time

async def task(name, duration):
    print(f"Task {name} starting...")
    await asyncio.sleep(duration)
    print(f"Task {name} completed after {duration}s")
    return f"Result from {name}"

async def run_concurrent():
    start_time = time.time()
    
    # Run tasks concurrently
    results = await asyncio.gather(
        task("A", 2),
        task("B", 1), 
        task("C", 3)
    )
    
    end_time = time.time()
    print(f"All tasks completed in {end_time - start_time:.1f} seconds")
    print(f"Results: {results}")

asyncio.run(run_concurrent())
# Output shows tasks run concurrently, not sequentially

python code snippet end

Async Context Managers

python code snippet start

import asyncio

class AsyncDatabase:
    async def __aenter__(self):
        print("Connecting to database...")
        await asyncio.sleep(0.5)  # Simulate connection time
        print("Connected!")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection...")
        await asyncio.sleep(0.2)  # Simulate cleanup
        print("Connection closed!")
    
    async def query(self, sql):
        print(f"Executing: {sql}")
        await asyncio.sleep(0.3)  # Simulate query time
        return [{"id": 1, "name": "Alice"}]

async def database_example():
    async with AsyncDatabase() as db:
        results = await db.query("SELECT * FROM users")
        print(f"Query results: {results}")

asyncio.run(database_example())

python code snippet end

Async Iterators

python code snippet start

import asyncio

class AsyncCounter:
    def __init__(self, max_count):
        self.max_count = max_count
        self.count = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.count >= self.max_count:
            raise StopAsyncIteration
        
        await asyncio.sleep(0.5)  # Simulate async work
        self.count += 1
        return self.count

async def async_iteration_example():
    async for number in AsyncCounter(3):
        print(f"Async number: {number}")

asyncio.run(async_iteration_example())

python code snippet end

HTTP Client Example

python code snippet start

import asyncio
import aiohttp  # pip install aiohttp

async def fetch_url(session, url):
    """Fetch a single URL."""
    try:
        async with session.get(url) as response:
            return {
                'url': url,
                'status': response.status,
                'length': len(await response.text())
            }
    except Exception as e:
        return {'url': url, 'error': str(e)}

async def fetch_multiple_urls():
    """Fetch multiple URLs concurrently."""
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2', 
        'https://httpbin.org/json',
        'https://httpbin.org/uuid'
    ]
    
    async with aiohttp.ClientSession() as session:
        start_time = time.time()
        
        # Fetch all URLs concurrently
        results = await asyncio.gather(
            *[fetch_url(session, url) for url in urls]
        )
        
        end_time = time.time()
        
        print(f"Fetched {len(urls)} URLs in {end_time - start_time:.1f} seconds")
        for result in results:
            if 'error' in result:
                print(f"Error for {result['url']}: {result['error']}")
            else:
                print(f"{result['url']}: {result['status']} ({result['length']} chars)")

# asyncio.run(fetch_multiple_urls())  # Uncomment to run

python code snippet end

Producer-Consumer Pattern

python code snippet start

import asyncio
import random

async def producer(queue, name):
    """Produce items and put them in the queue."""
    for i in range(5):
        item = f"{name}-item-{i}"
        await asyncio.sleep(random.uniform(0.1, 0.5))
        await queue.put(item)
        print(f"Produced: {item}")
    
    await queue.put(None)  # Signal completion
    print(f"Producer {name} finished")

async def consumer(queue, name):
    """Consume items from the queue."""
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        
        await asyncio.sleep(random.uniform(0.2, 0.4))  # Simulate processing
        print(f"Consumer {name} processed: {item}")
        queue.task_done()
    
    print(f"Consumer {name} finished")

async def producer_consumer_example():
    queue = asyncio.Queue(maxsize=3)
    
    # Start producer and consumer concurrently
    await asyncio.gather(
        producer(queue, "Producer-1"),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2")
    )

asyncio.run(producer_consumer_example())

python code snippet end

Error Handling

python code snippet start

import asyncio

async def might_fail(should_fail=False):
    await asyncio.sleep(1)
    if should_fail:
        raise ValueError("Something went wrong!")
    return "Success!"

async def error_handling_example():
    tasks = [
        might_fail(False),
        might_fail(True),
        might_fail(False)
    ]
    
    # Handle errors gracefully
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

asyncio.run(error_handling_example())

python code snippet end

Async vs Sync Comparison

python code snippet start

import asyncio
import time
import requests

def sync_fetch(url):
    """Synchronous fetch."""
    response = requests.get(url)
    return len(response.text)

async def async_fetch(session, url):
    """Asynchronous fetch."""
    async with session.get(url) as response:
        text = await response.text()
        return len(text)

def compare_sync_vs_async():
    urls = ['https://httpbin.org/delay/1'] * 3
    
    # Synchronous - runs sequentially
    start = time.time()
    sync_results = [sync_fetch(url) for url in urls]
    sync_time = time.time() - start
    
    # Asynchronous - runs concurrently
    async def async_main():
        async with aiohttp.ClientSession() as session:
            return await asyncio.gather(
                *[async_fetch(session, url) for url in urls]
            )
    
    start = time.time()
    async_results = asyncio.run(async_main())
    async_time = time.time() - start
    
    print(f"Sync time: {sync_time:.1f}s")
    print(f"Async time: {async_time:.1f}s") 
    print(f"Speedup: {sync_time/async_time:.1f}x")

# compare_sync_vs_async()  # Uncomment to run

python code snippet end

When to Use Async

python code snippet start

# Good for I/O-bound operations:
async def good_use_cases():
    # Network requests
    async with aiohttp.ClientSession() as session:
        await session.get('https://api.example.com')
    
    # File I/O (with aiofiles)
    # async with aiofiles.open('file.txt') as f:
    #     content = await f.read()
    
    # Database queries (with asyncpg, aiomysql, etc.)
    # await db.execute("SELECT * FROM users")

# NOT good for CPU-bound operations:
def cpu_intensive_task():
    # This blocks the event loop - use multiprocessing instead
    total = sum(i ** 2 for i in range(1000000))
    return total

# Don't do this in async functions:
async def bad_example():
    result = cpu_intensive_task()  # Blocks everything!
    return result

python code snippet end

Async/await transforms I/O-bound Python code from blocking sequential execution to efficient concurrent processing - perfect for web scraping, API calls, and any network-heavy applications!

To dive deeper into practical async programming, explore Python's asyncio module which provides the runtime infrastructure for these coroutines.

Reference: PEP 492 – Coroutines with async and await syntax