PEP 492 Async Await
TL;DR
PEP 492 introduced async def
and await
keywords, creating native coroutines that make asynchronous programming cleaner and more intuitive than generator-based approaches.
Interesting!
Unlike regular functions that run to completion, async
functions can be paused mid-execution with await
and resumed later - enabling concurrent execution without threads!
Basic Async Functions
python code snippet start
import asyncio
async def fetch_data():
"""An async function (coroutine)."""
print("Starting to fetch data...")
await asyncio.sleep(1) # Simulate async operation
print("Data fetched!")
return {"id": 1, "name": "Alice"}
async def main():
# Call async function with await
data = await fetch_data()
print(f"Received: {data}")
# Run the async function
asyncio.run(main())
python code snippet end
Concurrent Execution
python code snippet start
import asyncio
import time
async def task(name, duration):
print(f"Task {name} starting...")
await asyncio.sleep(duration)
print(f"Task {name} completed after {duration}s")
return f"Result from {name}"
async def run_concurrent():
start_time = time.time()
# Run tasks concurrently
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3)
)
end_time = time.time()
print(f"All tasks completed in {end_time - start_time:.1f} seconds")
print(f"Results: {results}")
asyncio.run(run_concurrent())
# Output shows tasks run concurrently, not sequentially
python code snippet end
Async Context Managers
python code snippet start
import asyncio
class AsyncDatabase:
async def __aenter__(self):
print("Connecting to database...")
await asyncio.sleep(0.5) # Simulate connection time
print("Connected!")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing database connection...")
await asyncio.sleep(0.2) # Simulate cleanup
print("Connection closed!")
async def query(self, sql):
print(f"Executing: {sql}")
await asyncio.sleep(0.3) # Simulate query time
return [{"id": 1, "name": "Alice"}]
async def database_example():
async with AsyncDatabase() as db:
results = await db.query("SELECT * FROM users")
print(f"Query results: {results}")
asyncio.run(database_example())
python code snippet end
Async Iterators
python code snippet start
import asyncio
class AsyncCounter:
def __init__(self, max_count):
self.max_count = max_count
self.count = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.count >= self.max_count:
raise StopAsyncIteration
await asyncio.sleep(0.5) # Simulate async work
self.count += 1
return self.count
async def async_iteration_example():
async for number in AsyncCounter(3):
print(f"Async number: {number}")
asyncio.run(async_iteration_example())
python code snippet end
HTTP Client Example
python code snippet start
import asyncio
import aiohttp # pip install aiohttp
async def fetch_url(session, url):
"""Fetch a single URL."""
try:
async with session.get(url) as response:
return {
'url': url,
'status': response.status,
'length': len(await response.text())
}
except Exception as e:
return {'url': url, 'error': str(e)}
async def fetch_multiple_urls():
"""Fetch multiple URLs concurrently."""
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/json',
'https://httpbin.org/uuid'
]
async with aiohttp.ClientSession() as session:
start_time = time.time()
# Fetch all URLs concurrently
results = await asyncio.gather(
*[fetch_url(session, url) for url in urls]
)
end_time = time.time()
print(f"Fetched {len(urls)} URLs in {end_time - start_time:.1f} seconds")
for result in results:
if 'error' in result:
print(f"Error for {result['url']}: {result['error']}")
else:
print(f"{result['url']}: {result['status']} ({result['length']} chars)")
# asyncio.run(fetch_multiple_urls()) # Uncomment to run
python code snippet end
Producer-Consumer Pattern
python code snippet start
import asyncio
import random
async def producer(queue, name):
"""Produce items and put them in the queue."""
for i in range(5):
item = f"{name}-item-{i}"
await asyncio.sleep(random.uniform(0.1, 0.5))
await queue.put(item)
print(f"Produced: {item}")
await queue.put(None) # Signal completion
print(f"Producer {name} finished")
async def consumer(queue, name):
"""Consume items from the queue."""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
await asyncio.sleep(random.uniform(0.2, 0.4)) # Simulate processing
print(f"Consumer {name} processed: {item}")
queue.task_done()
print(f"Consumer {name} finished")
async def producer_consumer_example():
queue = asyncio.Queue(maxsize=3)
# Start producer and consumer concurrently
await asyncio.gather(
producer(queue, "Producer-1"),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2")
)
asyncio.run(producer_consumer_example())
python code snippet end
Error Handling
python code snippet start
import asyncio
async def might_fail(should_fail=False):
await asyncio.sleep(1)
if should_fail:
raise ValueError("Something went wrong!")
return "Success!"
async def error_handling_example():
tasks = [
might_fail(False),
might_fail(True),
might_fail(False)
]
# Handle errors gracefully
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(error_handling_example())
python code snippet end
Async vs Sync Comparison
python code snippet start
import asyncio
import time
import requests
def sync_fetch(url):
"""Synchronous fetch."""
response = requests.get(url)
return len(response.text)
async def async_fetch(session, url):
"""Asynchronous fetch."""
async with session.get(url) as response:
text = await response.text()
return len(text)
def compare_sync_vs_async():
urls = ['https://httpbin.org/delay/1'] * 3
# Synchronous - runs sequentially
start = time.time()
sync_results = [sync_fetch(url) for url in urls]
sync_time = time.time() - start
# Asynchronous - runs concurrently
async def async_main():
async with aiohttp.ClientSession() as session:
return await asyncio.gather(
*[async_fetch(session, url) for url in urls]
)
start = time.time()
async_results = asyncio.run(async_main())
async_time = time.time() - start
print(f"Sync time: {sync_time:.1f}s")
print(f"Async time: {async_time:.1f}s")
print(f"Speedup: {sync_time/async_time:.1f}x")
# compare_sync_vs_async() # Uncomment to run
python code snippet end
When to Use Async
python code snippet start
# Good for I/O-bound operations:
async def good_use_cases():
# Network requests
async with aiohttp.ClientSession() as session:
await session.get('https://api.example.com')
# File I/O (with aiofiles)
# async with aiofiles.open('file.txt') as f:
# content = await f.read()
# Database queries (with asyncpg, aiomysql, etc.)
# await db.execute("SELECT * FROM users")
# NOT good for CPU-bound operations:
def cpu_intensive_task():
# This blocks the event loop - use multiprocessing instead
total = sum(i ** 2 for i in range(1000000))
return total
# Don't do this in async functions:
async def bad_example():
result = cpu_intensive_task() # Blocks everything!
return result
python code snippet end
Async/await transforms I/O-bound Python code from blocking sequential execution to efficient concurrent processing - perfect for web scraping, API calls, and any network-heavy applications!
To dive deeper into practical async programming, explore Python's asyncio module which provides the runtime infrastructure for these coroutines.