Build a production-ready real-time Django app. Covers Channels architecture, WebSocket consumers, group messaging, authentication, Redis channel layer, and deployment with Daphne + nginx.
Traditional Django is request/response. Channels extends Django to handle WebSockets, long-running connections, and background tasks — all within the Django project you already know.
Use WebSockets for: - Live chat - Real-time notifications - Collaborative editing - Live dashboards - Trading tickers - Multiplayer games
HTTP polling wastes resources. Server-Sent Events only go server → client. WebSockets are full duplex and efficient.
Browser <--WS--> Daphne (ASGI) <--> Channel Layer (Redis) <--> Consumer
The channel layer (typically Redis) lets multiple Django workers communicate. Without it, a consumer can only send to connections on its own process.
pip install channels channels_redis daphne
Add to settings:
INSTALLED_APPS = [
'daphne', # must be first
'django.contrib.staticfiles',
...
'channels',
]
ASGI_APPLICATION = 'myproject.asgi.application'
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {'hosts': [('127.0.0.1', 6379)]},
},
}
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
django_asgi_app = get_asgi_application()
from chat.routing import websocket_urlpatterns
application = ProtocolTypeRouter({
'http': django_asgi_app,
'websocket': AuthMiddlewareStack(
URLRouter(websocket_urlpatterns)
),
})
# chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.group_name = f'chat_{self.room_name}'
# Join the room group
await self.channel_layer.group_add(self.group_name, self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.group_name, self.channel_name)
async def receive(self, text_data):
data = json.loads(text_data)
message = data['message']
user = self.scope['user']
# Broadcast to everyone in the room
await self.channel_layer.group_send(self.group_name, {
'type': 'chat_message',
'message': message,
'username': user.username if user.is_authenticated else 'Anonymous',
})
async def chat_message(self, event):
await self.send(text_data=json.dumps({
'message': event['message'],
'username': event['username'],
}))
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]
const socket = new WebSocket(`ws://${window.location.host}/ws/chat/lobby/`);
socket.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log(`${data.username}: ${data.message}`);
};
document.querySelector('#send').addEventListener('click', () => {
const message = document.querySelector('#input').value;
socket.send(JSON.stringify({message}));
});
AuthMiddlewareStack populates scope['user'] from Django sessions. For token auth:
from channels.middleware import BaseMiddleware
from rest_framework.authtoken.models import Token
from channels.db import database_sync_to_async
class TokenAuthMiddleware(BaseMiddleware):
async def __call__(self, scope, receive, send):
query_string = scope['query_string'].decode()
params = dict(p.split('=') for p in query_string.split('&') if '=' in p)
token_key = params.get('token')
if token_key:
scope['user'] = await self.get_user(token_key)
else:
scope['user'] = AnonymousUser()
return await super().__call__(scope, receive, send)
@database_sync_to_async
def get_user(self, token_key):
try:
return Token.objects.get(key=token_key).user
except Token.DoesNotExist:
return AnonymousUser()
Client connects with: wss://example.com/ws/chat/?token=abc123
Consumers are async. Use database_sync_to_async:
from channels.db import database_sync_to_async
class ChatConsumer(AsyncWebsocketConsumer):
@database_sync_to_async
def save_message(self, content):
return Message.objects.create(
user=self.scope['user'],
room=self.room_name,
content=content,
)
async def receive(self, text_data):
data = json.loads(text_data)
msg = await self.save_message(data['message'])
...
Or use async ORM directly (Django 4.1+):
msg = await Message.objects.acreate(user=..., content=...)
To push a message to connected WebSockets from a normal view or Celery task:
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
def notify_user(user_id, message):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f'user_{user_id}',
{'type': 'notification', 'message': message},
)
from channels.testing import WebsocketCommunicator
from myproject.asgi import application
async def test_chat():
communicator = WebsocketCommunicator(application, '/ws/chat/lobby/')
connected, _ = await communicator.connect()
assert connected
await communicator.send_json_to({'message': 'Hello'})
response = await communicator.receive_json_from()
assert response['message'] == 'Hello'
await communicator.disconnect()
[Unit]
Description=Daphne ASGI server
After=network.target
[Service]
User=www-data
WorkingDirectory=/srv/myapp
ExecStart=/srv/myapp/venv/bin/daphne -b 127.0.0.1 -p 8001 myproject.asgi:application
Restart=always
[Install]
WantedBy=multi-user.target
server {
listen 80;
server_name example.com;
location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
}
location /ws/ {
proxy_pass http://127.0.0.1:8001;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_read_timeout 86400; # keep WS alive
}
}
class ReliableSocket {
constructor(url) {
this.url = url;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onclose = () => setTimeout(() => this.connect(), 2000);
}
}
ALLOWED_HOSTS is only for HTTP)For simple "server push" without true bidirectional needs, Server-Sent Events (SSE) are simpler:
from django.http import StreamingHttpResponse
def sse_view(request):
def stream():
while True:
yield f"data: {json.dumps({'time': time.time()})}\n\n"
time.sleep(1)
return StreamingHttpResponse(stream(), content_type='text/event-stream')
Channels unlocks real-time Django. Master consumers, groups, authentication, and deployment — and you can build chat, notifications, live dashboards, and collaborative apps without leaving the Django ecosystem.