Master async programming in Python. Learn event loops, coroutines, tasks, aiohttp for concurrent HTTP requests, and when async actually makes sense vs. threads or multiprocessing.
Async Python isn't just "faster sync code." It's a fundamentally different execution model. Understand it deeply and you'll build I/O-bound services that handle thousands of concurrent operations on a single CPU core.
Synchronous code is a single chef making one dish at a time. Threaded code is multiple chefs, each making one dish. Async code is one chef juggling many dishes, working on whichever one is ready next.
This only helps when tasks spend time waiting (on network, disk, database). For CPU-bound work, async doesn't help — use multiprocessing.
import asyncio
async def fetch_data():
await asyncio.sleep(1) # simulate I/O
return "data"
# Calling a coroutine doesn't run it — it returns a coroutine object
coro = fetch_data()
print(coro) # <coroutine object ...>
# Run it
result = asyncio.run(fetch_data())
The await keyword yields control back to the event loop. The loop can then work on other coroutines until this one is ready to resume.
async def slow():
await asyncio.sleep(1)
async def main():
await slow() # 1s
await slow() # 1s
await slow() # 1s
# Total: 3 seconds
async def main():
await asyncio.gather(slow(), slow(), slow())
# Total: 1 second
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(slow())
tg.create_task(slow())
tg.create_task(slow())
# Total: 1 second, with structured exception handling
TaskGroup is the modern way — it guarantees all tasks complete (or all cancel on first error).
Install aiohttp:
pip install aiohttp
Fetch 100 URLs in parallel:
import aiohttp
import asyncio
import time
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
urls = ['https://httpbin.org/delay/1'] * 100
start = time.time()
results = asyncio.run(fetch_all(urls))
elapsed = time.time() - start
print(f"Fetched {len(results)} URLs in {elapsed:.2f}s")
# ~1.5 seconds vs. ~100 seconds sequential
Don't DDoS yourself. Use a semaphore:
async def fetch_with_limit(sem, session, url):
async with sem: # max N concurrent
async with session.get(url) as r:
return await r.text()
async def main(urls):
sem = asyncio.Semaphore(10) # 10 concurrent requests
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(sem, session, u) for u in urls]
return await asyncio.gather(*tasks)
try:
async with asyncio.timeout(5):
await slow_operation()
except asyncio.TimeoutError:
print("Too slow!")
Or per-request timeouts with aiohttp:
timeout = aiohttp.ClientTimeout(total=10, connect=3)
async with aiohttp.ClientSession(timeout=timeout) as session:
...
With gather, errors by default cancel sibling tasks. Use return_exceptions=True to collect them:
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, Exception):
print(f"Failed: {r}")
TaskGroup raises ExceptionGroup (Python 3.11+):
try:
async with asyncio.TaskGroup() as tg:
for url in urls:
tg.create_task(fetch(url))
except* aiohttp.ClientError as eg:
for exc in eg.exceptions:
print(f"HTTP error: {exc}")
Stream results as they arrive:
async def fetch_paginated(base_url):
page = 1
while True:
data = await fetch(f"{base_url}?page={page}")
if not data['results']:
break
for item in data['results']:
yield item
page += 1
async for item in fetch_paginated('https://api.example.com/users'):
process(item)
class Database:
async def __aenter__(self):
self.conn = await asyncpg.connect('postgres://...')
return self.conn
async def __aexit__(self, *args):
await self.conn.close()
async with Database() as conn:
await conn.fetch('SELECT * FROM users')
If you must call sync code (e.g., pandas, PIL):
import asyncio
def cpu_heavy(data):
# CPU-bound work
return processed
async def main(data):
# Runs in thread pool, doesn't block the event loop
result = await asyncio.to_thread(cpu_heavy, data)
Django 4.1+ supports async views and ORM queries:
async def async_view(request):
posts = [p async for p in Post.objects.filter(published=True)]
return JsonResponse({'posts': [...]})
But be aware — sync middleware blocks the event loop. Use async middleware and ASGI server (Daphne, Uvicorn).
Enable debug mode:
asyncio.run(main(), debug=True)
Use logging to trace task scheduling. Key tools:
- asyncio.all_tasks() — see running tasks
- task.get_stack() — inspect where a task is stuck
- aiomonitor — live REPL into running event loop
uvloop as the event loop (drop-in replacement, ~2x faster)orjson for JSON (10x faster than stdlib)import uvloop
uvloop.install() # before asyncio.run()
Async Python shines for I/O-bound concurrent workloads: API clients, scrapers, WebSocket servers, database-heavy services. Master coroutines, tasks, semaphores, and structured concurrency, and you'll build services that handle 10,000 concurrent connections on a single Python process.