Welcome to Django and Celery tutorial.

You will learn what is Celery, why and when to use it and how to set up a Django project with Celery and see a few examples of different Celery tasks.

Git repository

The code from this tutorial is available on GitHub: Celery Tutorial on GitHub

What is Celery?

Celery is a distributed task queue system for Python.

What is a task queue

A task queue is like a to-do list for your app. It stores tasks that need to be executed, along their associated function calls and arguments. Tasks are processed in a specific order, first-in-first-out.

What distributed means?

Multiple workers: Celery can distribute tasks across multiple worker processes. These workers can run on different servers, allowing for parallel processing of tasks. Scalability: You can scale task processing horizontally by adding more workers on more servers to handle increased workloads without modifying your application. Fault tolerance: If one worker fails or goes offline, tasks can be redistributed to other available workers. Load balancing: Celery can distribute tasks evenly across available workers, preventing any single worker from becoming overloaded. Geographical distribution: Workers can be located in different physical locations or data centers, which can be helpful if workload needs to be performed in different physical locations.

Why and When to Use Celery

While working with Django, most of our code focuses on responding to HTTP requests from browsers and other HTTP clients.

It is important to respond to requests quickly. If a website takes longer than 500ms to respond it can start to annoy users. A few seconds to generate a response is only acceptable in rare situations, such as submitting an e-commerce order.

By default, Gunicorn HTTP server drops the connection if a response takes more than 30 seconds.

While your app might not be complex, there are types of tasks that take long time to complete.

Applications may need to perform complex and time-consuming operations or interact with other services.

For example, sending an email can take anywhere from 50ms to several seconds.

The good news is that some operations can be performed outside the request/response cycle.

Back to the sending an email example, if a user signs up, we can schedule sending an email and proceed with generating and sending the response to the client without the delays for actually sending that email.

One way to achieve this is by using asynchronous features, but that alone doesn't guarantee successful completion of the operation, nor does it provide robust control over retries in case of a failure. Additionally, it would be tied to a specific worker process, if it is terminated, then all in async tasks are lost with it.

This is where task queues, and specifically Celery, come into play. When you need to perform a potentially time-consuming operation, you send a message to a task queue and move on. This message will be later picked up and executed by a Celery worker process. Common choices for the task queue database are Redis or RabbitMQ.

To perform an operation via a task queue, you need to define a Task, which is essentially a function wrapped in a task Python decorator. You will then call it in a special way, which will be explained in this tutorial.

Another use case for Celery is scheduled jobs. If you want your app to perform certain operations at specific intervals, such as every 10 minutes, every hour, midnight, or Saturday mornings, you will need a scheduler. A scheduler is a process that waits for the right moment to send a Celery task to the queue. That's its sole job.

A scheduler does not guarantee the precise timing of execution; it only sends messages to the queue at the correct time. When the execution actually happens depends on your queue workers' setup and the number of tasks in the queue. In some cases, it may not happen at all if no worker is available to process the messages.

You can have multiple queues to separate tasks and prevent clogging. For instance, you might have a separate queue for short-lived but important and time-sensitive tasks like sending emails and notifications. This leaves other potentially long-running tasks in a queue where you are less concerned about their timely completion.

An advanced use of Celery involves having multiple projects communicate asynchronously via different Celery queues. One application can send tasks that only another application reads from, and vice versa. Unless you need an immediate response from the other application (in which case HTTP calls would be necessary), this method of communication is convenient. The other application, often called a service or microservice, does not need to be running all the time or scaled to handle the workload immediately; it will process tasks from the queue as quickly as it can.

Warning

While Celery specifically and task queues in general are great for offloading work from the request/response cycle, they are not a silver bullet for poorly performing code. You cannot merely offload tasks to the queue and expect performance problems to disappear. If you send enough jobs to the task queue, it will eventually take a while for workers to process them. You can scale up the number of workers, but then the database might become a bottleneck, slowing down web requests because background tasks overloaded the database.

I hope this introduction has been useful and provides enough context about what Celery is good for, when to use it, and what it actually does.

Starting a Django project with Celery using Docker

Create a directory for your project, switch to it and initialize a git repository:

mkdir celerytutorial
cd celerytutorial
git init

We will be using Docker for the development of our project. You will not be needing a Python installation outside of Docker.

Open your favorite code editor and create these files:

Dockerfile

The essential part of building a Docker image

FROM python:3.12.5-bullseye
SHELL ["/bin/bash", "--login", "-c"]
ARG USER_ID=1000
ARG GROUP_ID=1000
RUN groupadd -g $USER_ID -o app
RUN useradd -m -u $USER_ID -g $GROUP_ID -o -s /bin/bash app
ENV PIP_NO_CACHE_DIR off
ENV PIP_DISABLE_PIP_VERSION_CHECK on
ENV PYTHONUNBUFFERED 1
ENV PYTHONDONTWRITEBYTECODE 1
ENV COLUMNS 80
RUN apt-get update \
 && apt-get install -y --force-yes \
 curl nano python3-pip gettext chrpath libssl-dev libxft-dev \
 libfreetype6 libfreetype6-dev  libfontconfig1 libfontconfig1-dev \
  && rm -rf /var/lib/apt/lists/*
WORKDIR /code/
COPY requirements.txt /code/
RUN pip install wheel
RUN pip install -r requirements.txt
COPY . /code/
USER app

requirements.txt

Needed to specify and manage Python package dependencies for our project

Django==5.1.1
django-environ==0.11.2
gunicorn==22.0.0
psycopg[binary]==3.2.1
whitenoise==6.7.0
Pillow==10.4.0
celery-redbeat==2.2.0
celery[redis]==5.4.0

docker-compose.yml

Needed to define and manage all the services for our project.

x-app: &app
  build: .
  restart: always
  env_file:
    - .env
  volumes:
    - .:/code
  links:
    - db
    - redis
  depends_on:
    - db
    - redis

services:
  redis:
    image: redis:7
    ports:
      - "6379:6379"
  db:
    image: postgres:16
    environment:
      - POSTGRES_USER=celerytutorial
      - POSTGRES_PASSWORD=celerytutorial
      - POSTGRES_DB=celerytutorial
    ports:
      - "5432:5432"
  web:
    <<: *app
    command: python manage.py runserver 0.0.0.0:9000
    ports:
      - "127.0.0.1:9000:9000"
  celery:
    <<: *app
    command: celery -A project.celeryapp:app  worker -Q default -n celerytutorial.%%h --loglevel=INFO --max-memory-per-child=512000 --concurrency=1
  beat:
    <<: *app
    command: celery -A project.celeryapp:app beat -S redbeat.RedBeatScheduler  --loglevel=DEBUG --pidfile /tmp/celerybeat.pid

.env

Create .env and .start.env files with identical content. .start.env will be added to version control, but not .env. .start.env serves as a template for developers to copy and modify locally, ensuring necessary variables are documented without exposing sensitive data.

DATABASE_URL=postgresql://celerytutorial:celerytutorial@db:5432/celerytutorial
REDIS_URL=redis://redis:6379/0
DEBUG=True

.gitignore

Create this file in the root of the project. This file lists all files and paths within a project that shouldn't be included into version control. Here is the content of the file you should include for a start, feel free to add paths that you see fit(e.g. temporary files created by your code editor or any other tool)

.vscode/*
.idea/*
node_modules/*
Thumbs.db
Thumbs.db:encryptable
ehthumbs.db
ehthumbs_vista.db
[Dd]esktop.ini
$RECYCLE.BIN/
.DS_Store
.AppleDouble
.LSOverride
__pycache__/
*.py[cod]
*$py.class
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
db.sqlite3
db.sqlite3-journal
*.log
*.mo
*.pot
celerybeat-schedule
celerybeat.pid
env/
.env

.dockerignore

Similar to .gitignore, .dockerignore is a list of files that shouldn't be included into Docker builds.

It is not super important here, but it can help reduce the size of the build image.

In this tutorial we are not using python virtual environments on the host, but let's include it here anyway.

Including the .git folder here also helps reduce the size of the image.

env/
.git/

Pull images with Docker Compose

Go to the terminal and run this command to pull all images for DBs.

Running this command will make Docker download images for services with defined image and not require the build.

This step is optional because they will be pulled anyway on the start of containers.

docker compose pull

The output of successful execution of this command should look like this:

docker compose pull

Build Docker image for Django Project

This is an optional step for the first go, because the image will be built on containers start. But if you change requirements later on, you will need to run this command again.

docker compose build

The output of successful execution of this command should look similar to this:

docker compose build

Start Django Project

Now let's start a shell within our web container.

docker compose run web bash

Now you will see a bash prompt from within a container. To leave this shell type exit or press CTRL-D.

bash prompt from docker compose run web bash

Start our Django project:

django-admin startproject project .

Initial Django configuration

Open your code editor and let's edit some files.

Edit the project/settings.py file:

from pathlib import Path
import environ
import os

env = environ.Env(
    # set casting, default value
    DEBUG=(bool, False)
)

BASE_DIR = Path(__file__).resolve().parent.parent
# Take environment variables from .env file
environ.Env.read_env(os.path.join(BASE_DIR, ".env"))

SECRET_KEY = env("SECRET_KEY", default="change_me")

DEBUG = env("DEBUG", default=False)

ALLOWED_HOSTS = env.list("ALLOWED_HOSTS", default=["*"])
# Application definition

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
]

MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'whitenoise.middleware.WhiteNoiseMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

ROOT_URLCONF = 'project.urls'

TEMPLATES = [
    {
        'BACKEND': 'django.template.backends.django.DjangoTemplates',
        'DIRS': [],
        'APP_DIRS': True,
        'OPTIONS': {
            'context_processors': [
                'django.template.context_processors.debug',
                'django.template.context_processors.request',
                'django.contrib.auth.context_processors.auth',
                'django.contrib.messages.context_processors.messages',
            ],
        },
    },
]

WSGI_APPLICATION = 'project.wsgi.application'


# Database
# https://docs.djangoproject.com/en/5.0/ref/settings/#databases

DATABASES = {
    "default": env.db(default="sqlite:///db.sqlite3"),
}
# Password validation
# https://docs.djangoproject.com/en/5.0/ref/settings/#auth-password-validators

AUTH_PASSWORD_VALIDATORS = [
    {
        'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
    },
]
SECURE_PROXY_SSL_HEADER = ("HTTP_X_FORWARDED_PROTO", "https")
LOGGING = {
    "version": 1,
    "disable_existing_loggers": False,
    "handlers": {"console": {"class": "logging.StreamHandler"}},
    "loggers": {"": {"handlers": ["console"], "level": "DEBUG"}},
}

# Internationalization
# https://docs.djangoproject.com/en/5.0/topics/i18n/

LANGUAGE_CODE = 'en-us'

TIME_ZONE = 'UTC'

USE_I18N = True

USE_TZ = True


# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/5.0/howto/static-files/

STATIC_URL = env.str("STATIC_URL", default="/static/")
STATIC_ROOT = env.str("STATIC_ROOT", default=BASE_DIR / "staticfiles")

WHITENOISE_USE_FINDERS = True
WHITENOISE_AUTOREFRESH = DEBUG


# Default primary key field type
# https://docs.djangoproject.com/en/5.0/ref/settings/#default-auto-field

DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'


MEDIA_ROOT = env("MEDIA_ROOT", default=BASE_DIR / "media")
MEDIA_URL = env("MEDIA_PATH", default="/media/")

REDIS_URL = env("REDIS_URL", default=None)

Create a file project/celeryapp.py with the following content:

import os
from celery import Celery
from kombu import Queue
from .settings import REDIS_URL

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "project.settings")
app = Celery("project")
app.autodiscover_tasks()
app.conf.broker_url = REDIS_URL
app.conf.result_backend = REDIS_URL
app.conf.accept_content = ["application/json"]
app.conf.task_serializer = "json"
app.conf.result_serializer = "json"
app.conf.task_default_queue = "main"
app.conf.task_create_missing_queues = True
app.conf.task_queues = (Queue("main"),)
app.conf.broker_pool_limit = 1
app.conf.broker_connection_timeout = 30
app.conf.worker_prefetch_multiplier = 1
app.conf.redbeat_redis_url = REDIS_URL

This file configures Celery for your Django project:

  • Imports necessary modules
  • Loads environment variables
  • Initializes Celery app
  • Configures Celery settings (serialization, queues, timeouts, etc.)

Edit the file project/__init__.py:

This will make import Celery

from .celeryapp import app as celery_app

__all__ = [
    "celery_app",
]

Don't forget to save all files.

Apply migrations

Let's apply Django migrations and make sure our project and database connection works correctly.

In the Docker bash shell run this:

python manage.py migrate

By the way if you closed the shell or you want to run one command in Docker without launching shell you can do it another way:

docker compose run web python manage.py migrate

Start containers

This command will start all services defined in docker-compose.yml. Run this command outside docker compose shell.

docker compose up -d

The output of successful execution of this command should look this way:

docker compose up

Check logs of Django Project with docker compose logs

Now to make sure our services are actually working fine let's see logs.

docker compose logs -f

You will see logs for all services running. To stop following logs press CTRL-C.

To show logs for specific app you can specify process(es) to follow:

docker compose logs -f web celery

Here is the output for docker compose logs -f web:

docker compose logs -f web

Important notes:

  • If you make changes to the .env file, you'll need to restart Docker Compose for the changes to take effect. Do this by pressing CTRL-C to stop the current process(or docker compose stop), then run docker compose up again.
  • When modifying the docker-compose.yml file, you may need to perform a complete reset. Use docker compose stop to stop and remove all containers, networks created by Docker Compose. Keep in mind docker compose down will also remove all the DB data in the current setup. Then, restart the project with docker compose up -d.
  • From here it is implied that python commands are executed within a Docker Compose shell.

You can now see it in browser.

Please note, that if you are on Windows + WSL, http://0.0.0.0 address will not work, you should use http://localhost:9000 image

Start an application

In order to showcase Celery tasks, need an app where they will live, a model and some views.

python manage.py startapp mainapp

Create mainapp/tasks.py and leave it empty for now. That's where our Celery tasks will be defined.

Create a Report model in mainapp/models.py:

from django.db import models
from collections import namedtuple

REPORT_STATUSES = namedtuple("REPORT_STATUSES", "pending running finished error")._make(
    range(4)
)


class Report(models.Model):
    REPORT_STATUS_CHOICES = [
        (REPORT_STATUSES.pending, "Pending"),
        (REPORT_STATUSES.running, "Running"),
        (REPORT_STATUSES.finished, "Finished"),
        (REPORT_STATUSES.error, "Error"),
    ]
    dt_created = models.DateTimeField(auto_now_add=True)
    dt_started = models.DateTimeField(null=True, blank=True)
    dt_finished = models.DateTimeField(null=True, blank=True)
    complexity = models.PositiveIntegerField(default=10)
    status = models.IntegerField(
        choices=REPORT_STATUS_CHOICES, default=REPORT_STATUSES.pending
    )
    result = models.TextField(blank=True, null=True)

    def __str__(self):
        return f"{self.name} - {self.id}"

    class Meta:
        ordering = ["-dt_created"]
        verbose_name = "Report"
        verbose_name_plural = "Reports"

Let's create a page which we will use to trigger our tasks.

Create a view in mainapp/views.py:

from django.shortcuts import render

def index(request):
    return render(request, "mainapp/index.html")

Create a template in mainapp/templates/mainapp/index.html:

<html>
<head>
    <title>Tasks</title>
    <meta charset="UTF-8">
</head>

<body>
    <h1>Tasks</h1>
    <ul>
    </ul>
</body>
</html>

And add this view into URLConf in project/urls.py:

from django.contrib import admin
from django.urls import path
from mainapp.views import index
urlpatterns = [
    path('', index),
    path('admin/', admin.site.urls),
]

Add our application to INSTALLED_APPS in project/settings.py:

INSTALLED_APPS = [
    "django.contrib.admin",
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.messages",
    "django.contrib.staticfiles",
    "mainapp", # new
]

Let's create and apply migrations in Docker:

If you are already in Docker shell:

python manage.py makemigrations
python manage.py migrate

and open the app in browser: https://localhost:9000/ image

Make the initial commit

git add .
git commit -m'Initial Commit'

Go to GitHub create a new repository. image

When you create a repo you will be offered with a list of commands, copy the one that starts with git remote add origin and paste it in your terminal, outside of Docker container shell and run it. image

Run git push -u origin master

Refresh the GitHub repository page to see that your code is there. You can also check that none of the unwanted files got added to version control.

Deploying Django Celery app

Now this step you might consider optional, but I insist it is very important. You should deploy your app sooner rather than later.

By deploying app as early as possible, you will avoid the situation when you finished the whole project, it works on your machine, but doesn't work when deployed.

Deploying your app after you have finished the whole project means the task of debugging would be more time consuming and you might end up having to rewrite larger chunks of code.

By deploying the project early and often, you make it easier to find the cause of the problem in small most recent code changes.

Go to Appliku Dashboard

Add an Ubuntu or a Debian server from a cloud provider of your choice, if you haven't already.

My personal favorite is Hetzner, especially their ARM64 Ampere VPS servers. Their performance is great and pricing is unbeatable.

Create an application from your GitHub repository.

image

From your application dashboard page go to databases and add a PostgreSQL database and a Redis one.

image

image

image

Next step add processes to run your app. From Application overview page click Add Processes button. Or go to Settings -> Processes. image

Click Add Process and 4 processes: image

web: gunicorn project.wsgi --log-file -
release: python manage.py migrate
worker:celery -A project.celeryapp:app  worker -Q default -n celerytutorial.%%h --loglevel=INFO --max-memory-per-child=512000 --concurrency=1
beat: celery -A project.celeryapp:app beat -S redbeat.RedBeatScheduler  --loglevel=DEBUG --pidfile /tmp/celerybeat.pid

Keep in mind that the web and release processes' names are special. The process called web will receive HTTP traffic from the Nginx and the release one is a command that is executed after each successful deployment.

The rest of the processes can have any names as long as they consist of letters and numbers, no special characters.

Click on Save and Deploy and the first deployment will start.

After deployment has finished, click on Open App -> Domain name.

image

Our app has been successfully deployed! image

Appliku gives the app a subdomain with your app's name and SSL certificate via Let's Encrypt. You can also add your own custom domain(s) in application settings.

That was easy, right?

Now let's get back to the code. Time to add the first Celery task.

Simple Celery task

Go to mainapp/tasks.py and make the first task.

from celery import shared_task
import logging
import time
logger = logging.getLogger(__name__)


@shared_task(name="dummy_and_slow")
def dummy_and_slow():
    time.sleep(2)
    logger.debug("Dummy and slow task has finished")

This task doesn't do anything useful, it waits for 2 seconds, which will be enough to illustrate the potential problem of a long running code.

Let's call it as a regular function from a view.

Go to mainapp/views.py and add another view.

from django.shortcuts import render
from .tasks import dummy_and_slow # new
import time # new

def index(request):
    return render(request, "mainapp/index.html")

def dummy_and_slow_view(request): # new
    start_time = time.time()
    dummy_and_slow()
    end_time = time.time()
    execution_time = end_time - start_time
    context = {"task_name": "Dummy and slow", "execution_time": round(execution_time, 2)}
    return render(request, "mainapp/generic.html", context=context)

Create a template templates/mainapp/generic.html:

<html>

<head>
    <title>Task results</title>
    <meta charset="UTF-8">
</head>
<body>
    <h1>Task {{task_name}} has finished</h1>
    <h2>Execution time: {{execution_time}}s</h2>
</body>
</html>

and add the view to project/urls.py:

from django.contrib import admin
from django.urls import path
from mainapp.views import index, dummy_and_slow_view # added a view
urlpatterns = [
    path('', index),
    path('dummy_and_slow_view', dummy_and_slow_view, name="dummy_and_slow_view"), # new
    path('admin/', admin.site.urls),
]

And update mainapp/templates/mainapp/index.html to include a link to our view:

<html>
<head>
    <title>Tasks</title>
    <meta charset="UTF-8">
</head>

<body>
    <h1>Tasks</h1>
    <ul>
        <li>
            <a href="{% url "dummy_and_slow_view" %}">Dummy and slow</a>
        </li>
    </ul>
</body>
</html>

Go to our app in the browser http://localhost:9000/ image click on the new link, let's see the result: image

It took more than 2 seconds for the page to render. This is a very representative example of how long it might take to send a single email. Imagine, a person clicked a "Sign Up" button and waiting, watching the browser loading indicator to spin. That's suboptimal experience, let's make it better. Let's dispatch a Celery task.

The simplest way to do that is to go to our view and replace a direct function call dummy_and_slow() with calling dummy_and_slow.delay() method.

Now our view should look like this in mainapp/views.py:

def dummy_and_slow_view(request):
    start_time = time.time()
    dummy_and_slow.delay() # <-- this changed
    end_time = time.time()
    execution_time = end_time - start_time
    context = {"task_name": "Dummy and slow", "execution_time": round(execution_time, 2)}
    return render(request, "mainapp/generic.html", context=context)

Don't forget to stop the docker compose. To do that run docker compose down, then docker compose up -d

Now when you open this URL http://localhost:9000/dummy_and_slow_view you will see this: image

Execution time: 0.08s

You see that execution time of the view itself improved when the Celery task was sent, but and the long running operation is not blocking the request/re

You will see that the task was executed from the docker compose logs:

celery-1         | [2024-08-17 10:59:10,154: WARNING/ForkPoolWorker-1] Dummy and slow task has finished
celery-1         | [2024-08-17 10:59:10,156: INFO/ForkPoolWorker-1] Task dummy_and_slow[4c97e4ef-d73b-4623-8eec-b9fb5bea4753] succeeded in 2.002374355099164s: None

Celery API in more details

Celery Task definitions

There are two ways to define a task.

One way is to import the Celery app instance and use @app.task decorator. Another way is using the @shared_task decorator, which is not tied to a particular instance of Celery.

The key difference lies in how these tasks are registered and how they behave when multiple Celery app instances are involved:

@app.task decorator:

  • Requires importing a Celery app instance
  • Ties task to specific app instance
from project.celeryapp import app

@app.task
def sometask():
    pass

@shared_task decorator(recommended):

  • Not tied to specific Celery instance
  • Automatically discovered by any Celery app
from celery import shared_task

@shared_task
def another_task():
    pass

The "not tied to a particular Celery instance" aspect of @shared_task means that it's more flexible and can be easily shared across different parts of a large project without needing to pass around Celery app instances.

For many simple Celery setups, this distinction may not be noticeable or important.

If you are an author of a library, you have to use the @shared_task decorator, because you can't have your code tied to a particular Celery instance.

So, since at the end of the day @shared_task doesn't have any downsides and usually there is only one Celery instance in a project I suggest to stick to the @shared_task decorator.

Like any python function your task can be with or without arguments:

@shared_task(name="fn1")
def fn1():
    pass

@shared_task(name="fn2")
def fn2(dishes, pancake, tasty=False):
    pass

Three ways to call a Celery task

  1. Using .delay()
  2. Using .apply_async()
  3. Using current_app.send_task()

Sending a Celery task via .delay

This is the most simple way of sending a Celery task.

Instead of calling the function itself directly, you call its method .delay

Without arguments:

fn1.delay()

With arguments:

fn2.delay(1, 'good_one', something_else=true)

Sending a Celery task via .apply_async

This method allows you to pass additional parameters to the task itself as well as arguments to the task function.

Without arguments:

fn1.apply_async()

With positional arguments:

fn2.apply_async(args=(1, 'nice_one', true))

With keyword arguments:

fn2.apply_async(kwargs={"dishes": 1, "pancacke": "best one", "tasty": True})

In addition to arguments of the function itself we can also pass arguments that would affect the task execution.

fn2.apply_async(
    args=(1, 'nice_one', true), # passing arguments as positional
    kwargs={"dishes": 1, "pancacke": "best one", "tasty": True}, # passing keyword arguments
    expires=10, # in seconds, after this many seconds task received a worker will be rejected,
    countdown=5, # in seconds, execute task after N seconds, not before
    eta=datetime.now(timezone.utc) + timedelta(days=1), # datetime, execute task after this date and time
    queue='priority', # queue name where to send task
    ignore_result=False, # boolean, wether or not to store the result of the task in result backend,
    retry=False, # if task has failed, attempt retry or not
)

Those were probably the most important and typical parameters you might want to use while sending tasks. Please do read the official documentation as there are a lot of caveats and detailed explanations for each of those arguments listed here and some more: Calling tasks

Sending tasks via current_app.send_task

send_task is similar to apply_async, but instead of importing a function, you use the name of the task as a string, the rest is similar to apply_async.

from celery import current_app

current_app.send_task(
    "fn2",
    kwargs={"dishes": 1, "pancacke": "best one", "tasty": True},
    queue="priority",
    expires=30,
)

Since you are not importing the function, but calling a task by name as a string you have to make sure you know the task name and it won't change when you rearrange the code later.

For this reason I recommend specifying the task name in the @shared_task decorator instead of relying on automatic task naming.

This way of calling Celery tasks is also my favorite because it allows to solve the circular import problem when you call a task from a Django model method and the task also imports this model.

The current_app.send_task approach also allows to call tasks that are not defined in the code of the current project.

That's what I mentioned in the intro, about having multiple projects interact with each other via the same celery instance.

You can't import the function, so you have to use send_task method and pass the name of the function as a string. That means you have to come up with some naming convention for tasks, have some prefixes for task names so they don't overlap between projects. For example, give them prefixes with the name of the service: main_ for the main part of the app, and a service that does video compression video_, service that does emails sending email_service_ prefix. This will also help with automatic routing of tasks based on their names.

I recommend having some prefix for the project from day one, because if you need to introduce another prefix later, you won't need to fix all the existing tasks name. Even if you won't need that in the future it is a good habit to have.

One important thing, about .send_task: There is a Celery setting, called task_always_eager Celery Configuration and Defaults: task_always_eager If this is True, all tasks will be executed locally by blocking until the task returns.

This setting will not affect tasks sent with .send_task, they will not be executed without a worker processing it.

How to correctly send a Celery task from a view

Usually, when your view is running, all interactions with the database are happening in a transaction.

This means that until the view finishes executing, the transaction is not committed and as such changes to the database won't be visible from the outside including your Celery task.

Django Docs: Database transactions

When you create or change a database object and immediately send a task with its ID, the task might start before the database update is complete.

This can cause errors because the task won't see the recent changes.

You might get an 'ObjectDoesNotExist' error if the task tries to access an object that hasn't been fully saved yet.

Let's see some examples.

# views.py
from celery import current_app

def welcome(request):
    user_object = User.objects.create(**somearguments)
    current_app.send_task("send_welcome_email", kwargs={"user_id": user_object.id})
    return render(request, template_name="mainapp/welcome.html")

# tasks.py
@shared_task(name="send_welcome_email")
def send_welcome_email(user_id):
    user_object = User.objects.get(pk=user_id)
    send_an_email(template='mainapp/email_templates/welcome.html', user_object=user_object)

This code looks fine at first, but the line user_object = User.objects.get(pk=user_id) will cause an ObjectDoesNotExist exception.

It will happen because the transaction will be committed after the view finishes, but our task has already been sent.

To fix that we will use transaction.on_commit to run a function that sends our task after transaction is committed.

# views.py
from celery import current_app
from functools import partial
from django.db import transaction

def welcome(request):
    user_object = User.objects.create(**somearguments)
    transaction.on_commit(partial(current_app.send_task, "send_welcome_email", kwargs={"user_id": user_object.id}))
    return render(request, template_name="mainapp/welcome.html")

Relevant Django Docs: Performing actions after commit

Workers setup

Let's review worker's command line and the arguments.

To run a worker the bare minimum is this command:

celery -A project.celeryapp:app worker

Here are important arguments you might want to use:

  • -Q: queues to consume tasks from
  • --concurrency: determines how many tasks a worker can process simultaneously
  • --loglevel: Determines the verbosity of the logging output. Possible values: DEBUG, INFO, ERROR, WARNING, CRITICAL.
  • -n, --hostname used to set a custom hostname for the worker. Example: -n celerytutorial.%%h
  • --max-memory-per-child This option sets a memory limit for each worker child process. When a worker reaches this memory threshold, it will be replaced with a new process after completing its current task.
  • --max-tasks-per-child limits based on the number of tasks processed.

Example App: Generate reports

Let's explore a common challenge in web applications: generating reports efficiently as your data grows.

Imagine you've built a feature to generate reports in your Django application. Initially, it works well, especially in development with minimal data. However, as you deploy the application and accumulate more data over time, you start to notice some issues:

Report generation time increases significantly. The process blocks the web server, leading to slow response times. Complex reports with multiple database queries and data manipulations become problematic. What was once a simple view function now struggles to handle the increased load. This is where Celery, a distributed task queue, can help.

By moving the report generation process to a Celery task, we can:

Offload the heavy processing from the web server. Allow the report to generate asynchronously in the background. Improve the overall responsiveness of our application. In this example, we'll simulate a time-consuming report generation process using time.sleep(). This simplification allows us to focus on the implementation of Celery tasks without getting bogged down in complex report logic.

Let's dive in and see how we can use Celery to handle our report generation more efficiently.

Go to mainapp/views.py and create 3 views: ReportCreateView, ReportDetailView and ReportListView

from django.views.generic.edit import CreateView
from django.views.generic.detail import DetailView
from django.views.generic.list import ListView
from django.urls import reverse  # Add this line
from .models import Report

class ReportCreateView(CreateView):
    model = Report
    fields = ["complexity", ]
    template_name = "mainapp/report_form.html"
    def get_success_url(self):
        return reverse("report_detail", kwargs={"pk": self.object.pk})

class ReportDetailView(DetailView):
    model = Report
    context_object_name = "report"
    template_name = "mainapp/report_detail.html"

class ReportListView(ListView):
    model = Report
    template_name = "mainapp/report_list.html"
    context_object_name = "reports"
    ordering = ["-dt_created"]
    paginate_by = 10

and our project/urls.py now looks like this when we import our views and add report related URLs:

from django.contrib import admin
from django.urls import path
from mainapp.views import (
    index,
    dummy_and_slow_view,
    ReportCreateView,
    ReportDetailView,
    ReportListView,
)

urlpatterns = [
    path("", index),
    path("dummy_and_slow_view", dummy_and_slow_view, name="dummy_and_slow_view"),
    path("report", ReportListView.as_view(), name="report_list"),
    path("report/create", ReportCreateView.as_view(), name="report_create"),
    path("report/detail/<int:pk>", ReportDetailView.as_view(), name="report_detail"),
]

Make 5 new templates. If you are not familiar with Django templates, I urge you to go read the official documentation: Django Docs: Templates

mainapp/templates/mainapp/report_base.html

<html>
<head>
    <title>{% block title %}{% endblock %}</title>
    <meta charset="UTF-8">
</head>

<body>
    {% block content %}{% endblock %}
</body>
</html>

mainapp/templates/mainapp/report_form.html

{% extends "mainapp/report_base.html" %}

{% block content %}
    <h1>Create a report</h1>
    <form method="post">
        {% csrf_token %}
        {{ form.as_p }}
        <button type="submit">Submit</button>
    </form>
{% endblock %}

mainapp/templates/mainapp/report_list.html

{% extends "mainapp/report_base.html" %}

{% block content %}
    <h1>Report List</h1>
    <a href="{% url "report_create" %}">Create a report</a>
    {% if reports.exists %}
    <ul>
        {% for report in reports %}
            <li>
                <a href="{% url "mainapp:report_detail" report.id %}">{{ report.title }}</a>
            </li>
        {% endfor %}
    </ul>
    {% else %}
        <p>No reports available</p>
    {% endif %}
{% endblock %}

mainapp/templates/mainapp/report_detail.html

{% extends "mainapp/report_base.html" %}

{% block content %}
    <h1>Report of complexity: {{report.complexity}}</h1>
    <p>Created at: {{ report.dt_created }}</p>
    {% if report.dt_started %}
        <p>Started at: {{ report.dt_started }}</p>
    {% endif %}
    {% if report.status == 2 %}
        <p>Report is ready</p>
        <p>Finished at: {{ report.finished_at }}</p>
    {% elif report.status == 0 %}
        <p>Report is waiting in queue</p>
    {% elif report.status == 1 %}
        <p>Report is generating</p>
    {% else %}
        <p>Task has failed to generate</p>
    {% endif %}
    {% if report.status == 0 or report.status == 1 %}
        <p>Refresh page in 5 seconds.</p>
        <script>
            setTimeout(function(){
                window.location.reload(1);
            }, 5000);
        </script>
    {% endif %}
    {% if report.result %}<p>{{ report.result|safe }}</p>{% endif %}
{% endblock %}

Now while the rest of templates are really simple, the last one needs some explaining:

The report is available in the report variable.

Statuses are defined in the nametuple REPORT_STATUSES.

They have indexes: pending is 0, running is 1 finished is 2 error is 3.

Maybe not the most convenient thing to work with in templates, but I wanted to show how you can use namedtuple.

If the report is ready - the result and finished time will be shown.

If the report has failed for whatever reason, it will state the error.

If the report is not yet ready it will be stated, but also a page will refresh for statuses 0 and 1 which is pending and running.

Pending status will be the one when the report is just created.

Running when a worker picked up the task and started working on it

Error is when the task has gracefully failed (meaning, reported the failure and saved that to DB).

Finished, after the task has finished and written the report into the DB.

Lastly, the template for report contents which we will use to generate to string and store in the report model field result: mainapp/templates/mainapp/report_content.html

<table style="border:1px solid orange">
    <tr>
        <td>
            <h1>The sales report: {{report.complexity}}.</h1>
            <h2>Requested at: {{ report.dt_created }}.</h2>
            <p>It is a very insightful report for sales and marketing team.</p>
        </td>
    </tr>
</table>

Modify our model, we need to add a save method, a generate method and the generate_data method. The whole file mainapp/models.py will look this way:

from django.db import models
from collections import namedtuple
import time
from datetime import datetime
from django.db import transaction
from django.template.loader import render_to_string
from functools import partial
from celery import current_app

REPORT_STATUSES = namedtuple("REPORT_STATUSES", "pending running finished error")._make(
    range(4)
)


class Report(models.Model):
    REPORT_STATUS_CHOICES = [
        (REPORT_STATUSES.pending, "Pending"),
        (REPORT_STATUSES.running, "Running"),
        (REPORT_STATUSES.finished, "Finished"),
        (REPORT_STATUSES.error, "Error"),
    ]
    dt_created = models.DateTimeField(auto_now_add=True)
    dt_started = models.DateTimeField(null=True, blank=True)
    dt_finished = models.DateTimeField(null=True, blank=True)
    complexity = models.PositiveIntegerField(default=10)
    status = models.IntegerField(
        choices=REPORT_STATUS_CHOICES, default=REPORT_STATUSES.pending
    )
    result = models.TextField(blank=True, null=True)

    def __str__(self):
        return f"{self.name} - {self.id}"

    class Meta:
        ordering = ["-dt_created"]
        verbose_name = "Report"
        verbose_name_plural = "Reports"

    def save(self, *args, **kwargs):
        """This method is overridden to start the report generation task when the report is saved."""
        if self.status == REPORT_STATUSES.pending:
            self.dt_started = None
            self.dt_finished = None
        super().save(*args, **kwargs)
        if self.status == REPORT_STATUSES.pending:
            transaction.on_commit(
                partial(
                    current_app.send_task,
                    "mainapp_generate_report",
                    kwargs={"report_id": self.id},
                )
            )

    def generate_data(self):
        """This is where the actual generation of the report data would happen."""
        time.sleep(self.complexity)
        self.result = render_to_string("mainapp/report_content.html", {"report": self})

    def generate(self):
        """This method is called by the Celery task or Django admin action.
        Contains some boilerplate code to reflect of the progress and datetimes."""
        self.status = REPORT_STATUSES.running
        self.dt_started = datetime.now()
        self.save(update_fields=["status", "dt_started"])
        self.generate_data()
        self.status = REPORT_STATUSES.finished
        self.dt_finished = datetime.now()
        self.save(update_fields=["status", "dt_finished", "result"])

Add a Celery task to mainapp/tasks.py:

from .models import Report

@shared_task(name="mainapp_generate_report")
def generate_report(report_id):
    report = Report.objects.get(id=report_id)
    report.generate()

Let's break down what's happening:

  1. save method:

    • This is an overridden method of a Django model.
    • When a report is saved with a 'pending' status, it resets the start and finish times.
    • After saving, if the status is still 'pending', it schedules a Celery task. 2. transaction.on_commit:

    • This ensures the task is only sent after the database transaction is committed.

    • It prevents potential issues where the task tries to access a record that hasn't been committed yet. 3. current_app.send_task:

    • This is a Celery method to send a task to the queue.

    • The task name is "mainapp_generate_report".
    • It passes the report's ID as an argument. 4. generate_data method:

    • This method simulates the actual report generation.

    • It uses time.sleep to mimic a time-consuming process.
    • The report content is rendered using a template. 5. generate method:

    • This is the main method of the Report model called by the Celery task.

    • It updates the report status to 'running' and sets the start time.
    • Calls generate_data to create the report content.
    • Updates the status to 'finished' and sets the finish time. 6. Celery task:

    • Has a name defined in the decorator @shared_task(name="mainapp_generate_report")

    • It retrieves the report object and calls its generate method.
    • The generate_report function is the task that Celery will execute.

When a report is saved with a 'pending' status, it triggers an asynchronous Celery task. This task runs the report generation process in the background, updating the report's status and timestamps along the way.

Let's update our main template to include the link to reports and then see how it works.

Edit mainapp/templates/mainapp/index.html to include a new link

<html>
<head>
    <title>Tasks</title>
    <meta charset="UTF-8">
</head>

<body>
    <h1>Tasks</h1>
    <ul>
        <li>
            <a href="{% url "dummy_and_slow_view" %}">Dummy and slow</a>
        </li>
        <li>
            <a href="{% url "report_list" %}">Reports</a>
        </li>
    </ul>
</body>
</html>

Restart your docker compose if it is running, CTRL-C to stop, docker compose up to start

In another terminal window make sure to run:

docker compose run web python manage.py makemigrations
docker compose run web python manage.py migrate

This creates and applies migrations.

If you are unfamiliar with this topic here is the relevant page in Django Docs: Migrations

Open our app in browser: http://localhost:9000/

image

Click on the "Reports" link. image

Click on the "Create a report" link

image The complexity is just a number of seconds the report will take to generate, leave it like that, it should be enough for the demonstration.

Click on the "Submit" button.

The page will say that report is in the queue, because our Celery task hasn't yet been started. image

After 5 seconds page refreshes and the report is being generated.

image

5 seconds later, another refresh, the report is ready. The content of the report is in the orange box.

image

This is it!

Task locks to prevent race conditions

Use locking to avoid race condition The most common case when I needed locking was scheduled task that runs every several minutes and it could take a long time to complete – longer than the interval between tasks.

When a task takes longer to finish than interval, they overlap and execute in parallel.

Most of the time this is an undesirable situation that can lead to bad things and system overload.

For locking, I use django-redis https://github.com/jazzband/django-redis#locks

Let's say you run a task every 10 minutes to prepare a report on data.

Task usually takes a couple of minutes to finish but can take longer if you have an unusually big amount of data to process.

You also want the lock to expire eventually if a celery worker executing that task silently went down without releasing the lock.

Also, you want to avoid unhandled exceptions in Sentry every now and then when tasks tried to overlap, but locking prevented that.

And to make it even more interesting, let's say we run a report generation on some piece of data that is related to an object ID of some kind. We need it for the sake of illustrating more complex locking.

Here is an example of a task with locking, meeting our requirements:

from django.core.cache import cache
@shared_task(name="main_prepare_a_massive_report")
def prepare_a_massive_report(data_id):
    try:
        with cache.lock(
            f"lock_report_{data_id}",  # use data_id in lock key to have per object task locking
            timeout=20*60,  # allow it to keep lock up to 20 minutes
            blocking_timeout=1,  # another task will wait and try to acquire the lock for 1 second
        ):
            prepare_report(data_id)
    except LockError as err:
        print(f"prepare_a_massive_report exception: {err}")

Of course, tasks will not be killed after 20 minutes if it is still running, and you might need to take additional steps to prevent it from running longer.

But if the worker silently died without releasing the lock, other tasks will not be able to execute for only up to 20 minutes.

Conclusion

Well done! You've made it to the end of our Django and Celery tutorial.

Let's recap what we've learned:

  • We explored what Celery is and why it's such a powerful tool for handling background tasks.
  • You've discovered how to create tasks that can run independently of your main application.
  • We've delved into various ways to call these tasks, giving you flexibility in your implementations.
  • Together, we built an example project that generates time-consuming reports, with an auto-refreshing page that shows a waiting/in-progress screen and displays the final report when it's ready.

This tutorial has equipped you with a solid foundation, but there's so much more to discover. Celery is an incredibly versatile tool and allows a lot of advanced ways to use it.

Good luck, happy coding and happy deploying!