Back to Blog

Asynchronous Task Processing with Celery and Django

admin
December 2, 2025 4 min read
233 views
Master background task processing using Celery for email sending, report generation, and long-running operations.

Asynchronous Task Processing with Celery

Background tasks are essential for any production Django application. Here's how we use Celery in DjangoZen for emails, notifications, and more.

Why Celery?

  • Non-blocking operations: Don't make users wait
  • Scheduled tasks: Cron-like periodic tasks
  • Retry logic: Automatic failure recovery
  • Scalability: Distribute tasks across workers
  • Monitoring: Track task status and results

Installation & Setup

pip install celery redis

Create celery.py in your project:

# djzen/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djzen.settings')

app = Celery('djzen')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

# Optional: Configure task routes
app.conf.task_routes = {
    'eshop.tasks.send_*': {'queue': 'emails'},
    'eshop.tasks.generate_*': {'queue': 'reports'},
}

Configure settings:

# settings.py
CELERY_BROKER_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
CELERY_RESULT_BACKEND = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'

# Task settings
CELERY_TASK_ALWAYS_EAGER = False  # Set True for testing
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1

Creating Tasks

Email Sending Task

# eshop/tasks.py
from celery import shared_task
from django.core.mail import EmailMultiAlternatives
from django.template.loader import render_to_string
from django.conf import settings

@shared_task(bind=True, max_retries=3)
def send_order_confirmation(self, order_id):
    """Send order confirmation email"""
    try:
        from .models import Order
        order = Order.objects.select_related('user').prefetch_related('items__product').get(id=order_id)

        subject = f'Order Confirmation #{order.order_number}'

        # Render templates
        html_content = render_to_string('emails/order_confirmation.html', {
            'order': order,
            'site_url': settings.SITE_URL,
        })
        text_content = render_to_string('emails/order_confirmation.txt', {
            'order': order,
        })

        # Send email
        email = EmailMultiAlternatives(
            subject=subject,
            body=text_content,
            from_email=settings.DEFAULT_FROM_EMAIL,
            to=[order.user.email],
        )
        email.attach_alternative(html_content, 'text/html')
        email.send()

        # Update order
        order.confirmation_sent = True
        order.save(update_fields=['confirmation_sent'])

        return f'Email sent to {order.user.email}'

    except Exception as exc:
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))


@shared_task
def send_newsletter(subscriber_ids, subject, content):
    """Send newsletter to multiple subscribers"""
    from .models import NewsletterSubscription

    subscribers = NewsletterSubscription.objects.filter(
        id__in=subscriber_ids,
        is_active=True
    )

    sent_count = 0
    for subscriber in subscribers:
        try:
            html_content = render_to_string('emails/newsletter.html', {
                'content': content,
                'email': subscriber.email,
                'unsubscribe_url': f'{settings.SITE_URL}/newsletter/unsubscribe/?email={subscriber.email}',
            })

            email = EmailMultiAlternatives(
                subject=subject,
                body=content,
                from_email=settings.DEFAULT_FROM_EMAIL,
                to=[subscriber.email],
            )
            email.attach_alternative(html_content, 'text/html')
            email.send()
            sent_count += 1

        except Exception as e:
            print(f'Failed to send to {subscriber.email}: {e}')

    return f'Sent {sent_count}/{len(subscriber_ids)} emails'

Report Generation

@shared_task(bind=True, time_limit=300)
def generate_sales_report(self, user_id, date_from, date_to):
    """Generate sales report PDF"""
    import io
    from reportlab.lib.pagesizes import letter
    from reportlab.pdfgen import canvas
    from django.core.files.base import ContentFile

    from .models import Order, Report

    # Query data
    orders = Order.objects.filter(
        created_at__range=[date_from, date_to],
        status='completed'
    ).select_related('user')

    # Generate PDF
    buffer = io.BytesIO()
    p = canvas.Canvas(buffer, pagesize=letter)

    # Add content...
    p.drawString(100, 750, f'Sales Report: {date_from} to {date_to}')
    p.drawString(100, 730, f'Total Orders: {orders.count()}')
    p.drawString(100, 710, f'Total Revenue: €{orders.aggregate(total=Sum("total"))["total"]:.2f}')

    p.save()
    buffer.seek(0)

    # Save report
    report = Report.objects.create(
        user_id=user_id,
        report_type='sales',
        status='completed',
    )
    report.file.save(
        f'sales_report_{date_from}_{date_to}.pdf',
        ContentFile(buffer.read())
    )

    # Notify user
    send_notification.delay(
        user_id,
        'Your sales report is ready',
        f'/reports/{report.id}/'
    )

    return report.id

License Generation

@shared_task
def generate_license_keys(order_id):
    """Generate license keys for purchased products"""
    import secrets
    from .models import Order, License

    order = Order.objects.prefetch_related('items__product').get(id=order_id)

    licenses_created = []
    for item in order.items.filter(product__requires_license=True):
        for _ in range(item.quantity):
            license_key = f'DJZ-{secrets.token_hex(4).upper()}-{secrets.token_hex(4).upper()}'

            license = License.objects.create(
                user=order.user,
                product=item.product,
                order=order,
                license_key=license_key,
                license_type=item.license_type,
                is_active=True,
            )
            licenses_created.append(license_key)

    return licenses_created

Periodic Tasks

Schedule recurring tasks with Celery Beat:

# settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'cleanup-expired-sessions': {
        'task': 'eshop.tasks.cleanup_expired_sessions',
        'schedule': crontab(hour=3, minute=0),  # Daily at 3 AM
    },
    'send-trial-expiration-reminders': {
        'task': 'eshop.tasks.send_trial_expiration_emails',
        'schedule': crontab(hour=9, minute=0),  # Daily at 9 AM
    },
    'update-site-statistics': {
        'task': 'eshop.tasks.update_site_stats',
        'schedule': crontab(minute='*/15'),  # Every 15 minutes
    },
    'generate-sitemap': {
        'task': 'eshop.tasks.generate_sitemap',
        'schedule': crontab(hour=2, minute=0, day_of_week=0),  # Weekly
    },
}
# tasks.py
@shared_task
def cleanup_expired_sessions():
    """Remove expired download links and sessions"""
    from django.utils import timezone
    from .models import DownloadLink

    expired = DownloadLink.objects.filter(
        expires_at__lt=timezone.now()
    )
    count = expired.count()
    expired.delete()

    return f'Cleaned up {count} expired download links'


@shared_task
def send_trial_expiration_emails():
    """Send reminder emails for expiring trials"""
    from datetime import timedelta
    from django.utils import timezone
    from .models import License

    # Trials expiring in 3 days
    expiring_soon = License.objects.filter(
        license_type='trial',
        is_active=True,
        expires_at__date=timezone.now().date() + timedelta(days=3),
        expiration_reminder_sent=False
    ).select_related('user', 'product')

    for license in expiring_soon:
        send_trial_expiration_reminder.delay(license.id)
        license.expiration_reminder_sent = True
        license.save(update_fields=['expiration_reminder_sent'])

    return f'Sent {expiring_soon.count()} expiration reminders'

Running Celery

Development:

# Start worker
celery -A djzen worker -l INFO

# Start beat scheduler
celery -A djzen beat -l INFO

# Or both together
celery -A djzen worker -B -l INFO

Production with systemd:

# /etc/systemd/system/celery.service
[Unit]
Description=Celery Worker
After=network.target

[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/var/www/djzen
ExecStart=/var/www/djzen/venv/bin/celery -A djzen multi start worker1 \
    --pidfile=/var/run/celery/%n.pid \
    --logfile=/var/log/celery/%n%I.log \
    --loglevel=INFO
ExecStop=/var/www/djzen/venv/bin/celery multi stopwait worker1 \
    --pidfile=/var/run/celery/%n.pid
ExecReload=/var/www/djzen/venv/bin/celery -A djzen multi restart worker1 \
    --pidfile=/var/run/celery/%n.pid

[Install]
WantedBy=multi-user.target

Monitoring with Flower

pip install flower
celery -A djzen flower --port=5555

Best Practices

  1. Keep tasks small - Break large tasks into subtasks
  2. Use task chains - chain(task1.s(), task2.s())
  3. Handle failures - Implement retry logic
  4. Set time limits - Prevent runaway tasks
  5. Monitor queues - Watch for backlogs
  6. Use priorities - Critical tasks first

Celery makes your Django app production-ready. Check our templates with Celery pre-configured!

Comments (0)

Please login to leave a comment.

No comments yet. Be the first to comment!