In these series of tutorials we'll build set of tasks to illustrate how you can solve real issues from simple to quite complex.

This will not be a typical tutorial where we setup Celery, learn how to send tasks and so on.

Instead, I will try to give actionable answers and advice on how to solve certain problems with Celery.

In this article we'll solve a problem when a view is taking too long to generate the response by moving actual data generation into the Celery task.

Source code

Repository with the source code for this tutorial: https://github.com/appliku/tutorial_celery_data_for_view

Setup the project

Let's start from getting fresh djangitos project template.

curl -sSL https://appliku.com/djangitos.zip > djangitos.zip
unzip djangitos.zip


mv djangitos-master celerytutorial
cd celerytutorial
cp start.env .env

Requirements

These are packages that we will be actively relying in this tutorial:

Django==3.2.7
django-redis==5.0.0
celery==5.1.2
celery-redbeat==2.0.0
arrow==1.1.1
django-environ==0.6.0
django-extensions==3.1.3
factory-boy==3.2.0

The rest of the requirements you can find here: https://github.com/appliku/djangitos/blob/master/requirements.txt

You can explore Celery setup in djangito/celeryapp.py and in Procfile where you find what commands are used to run Celery worker and scheduler.

By the way for scheduler we use celery-redbeat.

Create an app

Create an app myapp

docker-compose run web python manage.py startapp myapp

Add it to installed apps in djangito/settings.py to the list PROJECT_APPS:

PROJECT_APPS = [
    'usermodel',
    'ses_sns',
    'myapp',  # new
]

Create empty file myapp/tasks.py. Create myapp/urls.py with the following content:

from django.urls import path
from myapp import views

urlpatterns = [

]

In djangito/urls.py add myapp urls to the root URLConf:

from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('myapp/', include('myapp.urls')),  # new
    path('sns/', include('ses_sns.urls')),
    path('admin/', admin.site.urls),
    path('ckeditor/', include('ckeditor_uploader.urls')),
]

Let's prepare the base template.

Create a file myapp/templates/myapp/base.html with this content:

<!DOCTYPE html>
<html lang="en">
<head>
    <title>{% block title %}{% endblock %}</title>
    <meta charset="utf-8">
</head>

<body>
{% block content %}{% endblock %}
</body>
</html>

Now let's start building the view.

Create a baseline view and models

Often view is expected to return a response in under 30 seconds tops.

For a number of reasons to generate a response your view might need some more time.

Long time to generate a response can be taken by: - preparing a massive report involving complex or numerous queries on a large set of database rows - interacting with a third party API - generally suboptimal performance of the view which you can't improve.

What to do?

In short: - Decouple the logic from a view into a separate function - create a task that calls this function - invoke this task from the view.

Let's solve this problem with an example of a report.

Let's say we have a model PageView.

Define it in myapp/models.py:

from django.db import models


class PageView(models.Model):
    url = models.CharField(max_length=255)
    created_dt = models.DateTimeField(auto_now_add=True, db_index=True)
    response_time_ms = models.IntegerField(default=0)

    class Meta:
        verbose_name = 'Page View'
        verbose_name_plural = 'Page Views'
        ordering = ('-pk',)

    def __str__(self):
        return self.url

Create migrations and apply them:

docker-compose run web python manage.py makemigrations myapp
docker-compose run web python manage.py migrate

Make a view in myapp/views.py:

import arrow
from django.shortcuts import render
from myapp import models


def last_30_days_page(request, *args, **kwargs):
    threshold_dt = arrow.now().shift(days=-30).datetime
    qs = models.PageView.objects.all()
    slow_views = 0
    page_view: models.PageView
    for page_view in qs.iterator():
        if page_view.response_time_ms < 1000:
            continue
        if page_view.created_dt < threshold_dt:
            continue
        if page_view.url.find('blog') == -1:
            continue
        slow_views += 1

    return render(
        request,
        template_name="myapp/last_30_days_report.html",
        context={
            "slow_views": slow_views
        })

Note: this view is on purpose inefficient and ugly as many views that generate reports or otherwise complex and not pretty.

And a template for this view:

{% extends "myapp/base.html" %}

{% block title %}Last 30 days{% endblock %}

{% block content %}
    Number of slow page views in 30 days: {{ slow_views }}
{% endblock %}

Add this view to myapp/urls.py:

from django.urls import path
from myapp import views

urlpatterns = [
    path('last_30_days', views.last_30_days_page, name='last_30_days'),
]

Open the URL for our report: http://0.0.0.0:8060/myapp/last_30_days

It will say: Number of page views in 30 days: 0

Generating fake data

To see some results it will be great if we add some data. And we'll need a lot of data to see benefits of employing Celery tasks.

In order to do that we'll use factory_boy https://factoryboy.readthedocs.io/en/stable/

Create a file myapp/factories.py:

import arrow
import factory
from myapp import models


class PageViewFactory(factory.django.DjangoModelFactory):
    class Meta:
        model = models.PageView

    created_dt = factory.Faker(
        'date_between_dates',
        date_start=arrow.now().shift(days=-60).datetime,
        date_end=arrow.now().datetime
    )
    url = factory.Faker('uri_path', deep=3)
    response_time_ms = factory.Faker('pyint', min_value=20, max_value=40000)

Create a management command to generate data myapp/management/commands/generate_data.py:

from django.core.management.base import BaseCommand

from myapp.factories import PageViewFactory
from myapp.models import PageView


class Command(BaseCommand):
    def handle(self, *args, **options):
        PageView.objects.all().delete()
        PageViewFactory.create_batch(size=100000)

Run this command:

docker-compose run web python manage.py generate_data

A little patience here, it will take a while.

You can change the size= argument to increase or decrease the number of objects you want to create. If it is taking too long to generate the data press CTRL-C to stop the process and decrease the number.

Now if you refresh the page it will say:

Number of page views in 30 days: 100000

If we open browser's developer tools we'll see that this page takes pretty long time to generate response. In my case it takes a whole second!

Generate report with a Celery task and caching

Let's solve this problem by a Celery task and cache.

Create a file myapp/helpers.py where we put the logic from the view:

import arrow

from myapp import models


def calculate_last_30_days():
    threshold_dt = arrow.now().shift(days=-30).datetime
    qs = models.PageView.objects.all()
    slow_views = 0
    page_view: models.PageView
    for page_view in qs.iterator():
        if page_view.response_time_ms < 1000:
            continue
        if page_view.created_dt < threshold_dt:
            continue
        if page_view.url.find('blog') == -1:
            continue
        slow_views += 1
    return slow_views

Create a file myapp/tasks.py with the task that will call the function from helpers.py:

from celery import shared_task
from django.core.cache import cache

from myapp.helpers import calculate_last_30_days


@shared_task(name="generate_last_30_days_views")
def generate_last_30_days_views():
    cache.set("last_30_days", calculate_last_30_days(), 60)

Let's add another view to myapp/views.py:

def last_30_days_page_cached(request, *args, **kwargs):
    slow_views = cache.get("last_30_days")
    if slow_views is None:
        current_app.send_task("generate_last_30_days_views", queue="default", ignore_result=True)
    return render(
        request,
        template_name="myapp/last_30_days_report_cached.html",
        context={
            "slow_views": slow_views
        })

For this view add another template myapp/templates/myapp/last_30_days_cached.html:

{% extends "myapp/base.html" %}

{% block title %}Last 30 days{% endblock %}

{% block content %}
    {% if slow_views == None %}
        Please wait, we are calculating stats
        <script>
            setTimeout(() => {
                window.location.reload()
            }, 5000);
        </script>
    {% else %}
        Number of slow page views in 30 days: {{ slow_views }}
    {% endif %}
{% endblock %}

Add this view to myapp/urls.py:

from django.urls import path
from myapp import views

urlpatterns = [
    path('last_30_days_cached', views.last_30_days_page_cached, name='last_30_days_cached'),  # new
    path('last_30_days', views.last_30_days_page, name='last_30_days'),

]

Open the page for this new view: http://0.0.0.0:8060/myapp/last_30_days_cached

At first you will see the message "Please wait, we are calculating stats".

In 5 seconds the page will refresh and if celery task has finished – it will display the data.

For example: Number of slow page views in 30 days: 2289.

It seems that we solved the problem, right?

Actually no.

If the generation of the report is taking longer than refresh interval, or if multiple users will open the page at the time when report data is not ready – the view will create multiple tasks.

Multiple tasks will pile up the queue and this will degrade performance of the app, or if you have enough concurrent works – will strangle the database with concurrent expensive queries.

We don't want that.

We need to implement locking.

Add locking using django-redis locks

Let's create a third view in myapp/views.py:

def last_30_days_page_cached_locks(request, *args, **kwargs):
    slow_views = cache.get("last_30_days")
    if slow_views is None:
        current_app.send_task(
            "generate_last_30_days_views_locks",
            queue="default",
            ignore_result=True)
    return render(
        request,
        template_name="myapp/last_30_days_report_cached.html",
        context={
            "slow_views": slow_views
        })

Note that the only change is what celery task we'll be calling.

Add this view to the myapp/urls.py:

from django.urls import path
from myapp import views

urlpatterns = [
    path('last_30_days_cached_locks',  # new
         views.last_30_days_page_cached_locks,
         name='last_30_days_cached_locks'),
    path('last_30_days_cached', views.last_30_days_page_cached, name='last_30_days_cached'),
    path('last_30_days', views.last_30_days_page, name='last_30_days'),

]

And add the task to myapp/tasks.py:

@shared_task(name="generate_last_30_days_views_locks")
def generate_last_30_days_views_locks():
    with cache.lock("30_days_report", timeout=60, blocking_timeout=1):
        cache.set("last_30_days", calculate_last_30_days(), 60)

By the way, don't forget to kill docker-compose by pressing CTRL-C and restarting it docker-compose up because unlike django runserver, Celery doesn't restart on code change.

Go to new view:

http://0.0.0.0:8060/myapp/last_30_days_cached_locks

You will see the same behaviour but this time it will use locking and not overload your database and celery workers by excessive data generation.

One other note here - it will create additional tasks every time you open the page and data is not ready, but they will be skipped when task will be processed if there is a task already running.

This is far from ideal, but will does 80% of the job since we solved the problem with view taking to long to respond and prevented concurrently running tasks for generating the data.

Let's now improve this further preventing tasks sent if the data is being generated at the moment of the request

Use a cache key to not send_task if a celery task already running

Add another task to myapp/tasks.py:

@shared_task(name="generate_last_30_days_v3")
def generate_last_30_days_days_v3():
    with cache.lock("30_days_report_locks_cache", timeout=60, blocking_timeout=1):
        cache.set("last_30_days_working", 1, 60)
        cache.set("last_30_days", calculate_last_30_days(), 60)
        cache.delete("last_30_days_working")

Add another view to myapp/views.py:

def last_30_days_page_v3(request, *args, **kwargs):
    slow_views = cache.get("last_30_days")
    last_30_days_working = cache.get("last_30_days_working")
    if slow_views is None and not last_30_days_working:
        current_app.send_task(
            "generate_last_30_days_v3",
            queue="default",
            ignore_result=True)
    return render(
        request,
        template_name="myapp/last_30_days_report_cached.html",
        context={
            "slow_views": slow_views
        })

Add it to myapp/urls.py:

from django.urls import path
from myapp import views

urlpatterns = [
    path('last_30_days_v3',
         views.last_30_days_page_v3,
         name='last_30_days_v3'),
    ...

Don't forget to stop docker-compose by pressing CTRL-C and then restarting it with docker-compose up to make sure celery is aware our new task.

Let's make sure our task code works by opening the new view: http://0.0.0.0:8060/myapp/last_30_days_v3

Now how it is different from the previous code? When our task is working, we are setting the cache key to indicate that data is being generated at the moment. When the view is requested it checks for both actual data and if task is being executed at the moment. If data is not there and task is not running then it will send_task. After the task finishes generating the data it will .delete the "last_30_days_working" cache key. Also this cache key is set to expire in 60 seconds in case if data generation raises exception and it will not cleanup. Handling possible exceptions is out of the scope of this article.

This approach is almost perfect: - We have ensured that only one process of generating the data is running. - We also ensured that data generation is already in progress there is no need to send more tasks.

This is better than previous example, but if task is hasn't started working yet - we'll still be sending more tasks.

Let's fix that.

Use cache key to mark that celery task is already sent

In this version we'll add another cache key to mark that the task was sent and we will use this key as well in the view to decide if we need to send_task. When the task starts working it will delete the key and set another one that it is working. Create another task in myapp/tasks.py:

@shared_task(name="generate_last_30_days_v4")
def generate_last_30_days_v4():
    with cache.lock("30_days_report_locks_cache", timeout=60, blocking_timeout=1):
        cache.delete("last_30_days_task_sent")
        cache.set("last_30_days_working", 1, 60)
        cache.set("last_30_days", calculate_last_30_days(), 60)
        cache.delete("last_30_days_working")

Add another view in myapp/view.py:

def last_30_days_page_v4(request, *args, **kwargs):
    slow_views = cache.get("last_30_days")
    last_30_days_task_sent = cache.get("last_30_days_task_sent")
    last_30_days_working = cache.get("last_30_days_working")
    if slow_views is None and not last_30_days_working and not last_30_days_task_sent:
        cache.set("last_30_days_task_sent", 1, 60)
        current_app.send_task(
            "generate_last_30_days_views_locks",
            queue="default",
            ignore_result=True)
    return render(
        request,
        template_name="myapp/last_30_days_report_cached.html",
        context={
            "slow_views": slow_views
        })

New path in myapp/urls.py:

urlpatterns = [
    path('last_30_days_v4',
         views.last_30_days_page_v4,
         name='last_30_days_v4'),
     ...

As always don't forget to restart the docker-compose CTRL-C and docker-compose up.

Let's see that it works at http://0.0.0.0:8060/myapp/last_30_days_v4

We should see that you data is being generated, then page reloads and you get the number.

Conclusion

In this tutorial you've seen how to solve a problem with a view that is taking to long to repond, by gradually implementing celery tasks and improving them with locks and caching.

There are several other ways to solve the initial problem. In this article I wanted to show step by step process and what are tradeoffs on every of these steps.