Python Advanced

Async Python Deep Dive: asyncio, aiohttp, and Concurrent Programming

Master async programming in Python. Learn event loops, coroutines, tasks, aiohttp for concurrent HTTP requests, and when async actually makes sense vs. threads or multiprocessing.

DjangoZen Team Apr 17, 2026 22 min read 2 views

Async Python isn't just "faster sync code." It's a fundamentally different execution model. Understand it deeply and you'll build I/O-bound services that handle thousands of concurrent operations on a single CPU core.

The Mental Model

Synchronous code is a single chef making one dish at a time. Threaded code is multiple chefs, each making one dish. Async code is one chef juggling many dishes, working on whichever one is ready next.

This only helps when tasks spend time waiting (on network, disk, database). For CPU-bound work, async doesn't help — use multiprocessing.

Coroutines 101

import asyncio

async def fetch_data():
    await asyncio.sleep(1)  # simulate I/O
    return "data"

# Calling a coroutine doesn't run it — it returns a coroutine object
coro = fetch_data()
print(coro)  # <coroutine object ...>

# Run it
result = asyncio.run(fetch_data())

The await keyword yields control back to the event loop. The loop can then work on other coroutines until this one is ready to resume.

Running Multiple Coroutines

Sequential (not really async)

async def slow():
    await asyncio.sleep(1)

async def main():
    await slow()  # 1s
    await slow()  # 1s
    await slow()  # 1s
    # Total: 3 seconds

Concurrent with gather

async def main():
    await asyncio.gather(slow(), slow(), slow())
    # Total: 1 second

TaskGroup (Python 3.11+)

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(slow())
        tg.create_task(slow())
        tg.create_task(slow())
    # Total: 1 second, with structured exception handling

TaskGroup is the modern way — it guarantees all tasks complete (or all cancel on first error).

Real-World Example: Concurrent HTTP Requests

Install aiohttp:

pip install aiohttp

Fetch 100 URLs in parallel:

import aiohttp
import asyncio
import time

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

urls = ['https://httpbin.org/delay/1'] * 100

start = time.time()
results = asyncio.run(fetch_all(urls))
elapsed = time.time() - start
print(f"Fetched {len(results)} URLs in {elapsed:.2f}s")
# ~1.5 seconds vs. ~100 seconds sequential

Semaphores: Limiting Concurrency

Don't DDoS yourself. Use a semaphore:

async def fetch_with_limit(sem, session, url):
    async with sem:  # max N concurrent
        async with session.get(url) as r:
            return await r.text()

async def main(urls):
    sem = asyncio.Semaphore(10)  # 10 concurrent requests
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(sem, session, u) for u in urls]
        return await asyncio.gather(*tasks)

Timeouts

try:
    async with asyncio.timeout(5):
        await slow_operation()
except asyncio.TimeoutError:
    print("Too slow!")

Or per-request timeouts with aiohttp:

timeout = aiohttp.ClientTimeout(total=10, connect=3)
async with aiohttp.ClientSession(timeout=timeout) as session:
    ...

Error Handling

With gather, errors by default cancel sibling tasks. Use return_exceptions=True to collect them:

results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
    if isinstance(r, Exception):
        print(f"Failed: {r}")

TaskGroup raises ExceptionGroup (Python 3.11+):

try:
    async with asyncio.TaskGroup() as tg:
        for url in urls:
            tg.create_task(fetch(url))
except* aiohttp.ClientError as eg:
    for exc in eg.exceptions:
        print(f"HTTP error: {exc}")

Async Generators

Stream results as they arrive:

async def fetch_paginated(base_url):
    page = 1
    while True:
        data = await fetch(f"{base_url}?page={page}")
        if not data['results']:
            break
        for item in data['results']:
            yield item
        page += 1

async for item in fetch_paginated('https://api.example.com/users'):
    process(item)

Async Context Managers

class Database:
    async def __aenter__(self):
        self.conn = await asyncpg.connect('postgres://...')
        return self.conn

    async def __aexit__(self, *args):
        await self.conn.close()

async with Database() as conn:
    await conn.fetch('SELECT * FROM users')

Running Blocking Code

If you must call sync code (e.g., pandas, PIL):

import asyncio

def cpu_heavy(data):
    # CPU-bound work
    return processed

async def main(data):
    # Runs in thread pool, doesn't block the event loop
    result = await asyncio.to_thread(cpu_heavy, data)

Async in Django

Django 4.1+ supports async views and ORM queries:

async def async_view(request):
    posts = [p async for p in Post.objects.filter(published=True)]
    return JsonResponse({'posts': [...]})

But be aware — sync middleware blocks the event loop. Use async middleware and ASGI server (Daphne, Uvicorn).

When NOT to Use Async

  • CPU-bound work (use multiprocessing)
  • Code with blocking libraries you can't replace
  • Simple scripts where the complexity cost isn't worth it
  • Shared mutable state (locks in async are tricky)

Debugging Async Code

Enable debug mode:

asyncio.run(main(), debug=True)

Use logging to trace task scheduling. Key tools: - asyncio.all_tasks() — see running tasks - task.get_stack() — inspect where a task is stuck - aiomonitor — live REPL into running event loop

Performance Tuning

  • Use uvloop as the event loop (drop-in replacement, ~2x faster)
  • Use orjson for JSON (10x faster than stdlib)
  • Avoid creating unnecessary tasks — each has memory overhead
  • Batch operations when possible
import uvloop
uvloop.install()  # before asyncio.run()

Summary

Async Python shines for I/O-bound concurrent workloads: API clients, scrapers, WebSocket servers, database-heavy services. Master coroutines, tasks, semaphores, and structured concurrency, and you'll build services that handle 10,000 concurrent connections on a single Python process.

Ready to Build?

Skip the boilerplate. Get production-ready Django packages.

Browse Products