Celery is a powerful task queue that allows more complex workflow than just sending one-off tasks.

In this tutorial we will explore how to optimize performance of your app with running tasks in parallel.

For that Celery offers two options – groups and chords.

Typical cases for tasks you would like to run in parallel are: - waiting for responses from a slow external API resource, - running heavy computations on different nodes.

Objectives

For this tutorial we'll have several tasks where we will simulate long running code with time.sleep() with random delay.

Also to understand how retries work in groups and chords we'll simulate randomly throwing exceptions.

Both of these things are typical for working with any external API.

Source code:

https://github.com/appliku/celery_groups_and_chords

Setup the project

For this tutorial let's use the Djangitos template.

This tutorial requires Docker installed on your machine.

curl -sSL https://github.com/appliku/djangitos/archive/refs/heads/master.zip > djangitos.zip
unzip djangitos.zip


mv djangitos-master celery_groups_and_chords
cd celery_groups_and_chords
cp start.env .env
docker-compose build

If you are using PyCharm on Mac you can open project with open -a pycharm ..

In order to create all DB tables and create a user run these commands:

docker-compose run web python manage.py migrate
docker-compose run web python manage.py makesuperuser

Last command will print a password for the newly created super user, make sure you copy somewhere. We'll need it. To run the project execute the following command: docker-compose up.

Create an app

Let's create an app that will have a model ReportGroup. For every created report we will be executing groups of tasks. We will also create a ReportGroupTaskLog model to store tasks runs and timestamps.

We will create similar models ReportChord and ReportChordTaskLog for implementation using chords.

We will not create any views, but use Django Admin instead.

Run the following command to create the app:

docker-compose run web python manage.py startapp reports

We need to add reports app to INSTALLED_APPS.

In order to do that in Djangitos, where we use django-configuration, you need to open project/settings.py and add it into the list PROJECT_APPS so it looks like this:

class ProjectConfig(BaseConfig):
    PROJECT_APPS = BaseConfig.PROJECT_APPS + [
        # add your apps here
        'reports'
    ]

Example With Celery Groups

Create Django models for Celery Groups

Open the reports/models.py and let's add our models.

from django.db import models

from reports.tuples import REPORT_STATUS


class ReportGroup(models.Model):
    STATUS_CHOICES = (
        (REPORT_STATUS.processing, 'Processing'),
        (REPORT_STATUS.error, 'Error'),
        (REPORT_STATUS.finished, 'Finished'),
    )
    created_dt = models.DateTimeField(auto_now_add=True)
    status = models.IntegerField(choices=STATUS_CHOICES, default=REPORT_STATUS.processing)

    def __str__(self):
        return str(self.pk)

    class Meta:
        verbose_name = "Report"
        verbose_name_plural = "Reports"
        ordering = ('-pk',)


class ReportGroupTaskLog(models.Model):
    STATUS_CHOICES = (
        (REPORT_STATUS.processing, 'Processing'),
        (REPORT_STATUS.error, 'Error'),
        (REPORT_STATUS.finished, 'Finished'),
    )
    report = models.ForeignKey(Report, on_delete=models.CASCADE)
    task_name = models.CharField(max_length=255)
    status = models.IntegerField(choices=STATUS_CHOICES, default=REPORT_STATUS.processing)
    retry = models.IntegerField(default=0)
    created_dt = models.DateTimeField(auto_now_add=True)
        finished_dt = models.DateTimeField(null=True)

    def __str__(self):
        return f"{self.report_id} {self.task_name}"

    class Meta:
        verbose_name = "Report Task Log"
        verbose_name_plural = "Reports Task Log"
        ordering = ('created_dt',)

Create a file reports/tuples.py with a namedtuple for statuses:

from collections import namedtuple

REPORT_STATUS = namedtuple('REPORT_STATUS', 'processing error finished')._make(range(3))

Open reports/admin.py and let's include our code to display those models in the Django admin panel.

from django.contrib import admin

from reports.models import ReportGroupTaskLog, ReportGroup


class ReportGroupTaskLogInline(admin.TabularInline):
    model = ReportGroupTaskLog
    extra = 0


class ReportGroupAdmin(admin.ModelAdmin):
    inlines = [ReportGroupTaskLogInline]
    list_display = ['id', 'status', 'created_dt', ]
    list_filter = ['status', ]
        readonly_fields = ('status',)


admin.site.register(ReportGroup, ReportGroupAdmin)

Now it is time to make and apply migrations.

Run the following command:

docker-compose run web python manage.py makemigrations
docker-compose run web python manage.py migrate

Let's see the admin interface, open http://127.0.0.1:8060/admin/reports/reportgroup/ and login with email and password from the makesuperuser command logs.

Django Admin Group Report List

Now we need to send task on model creation.

Let's do this on model save.

We need to track if we have already sent a celery task.

So add a field to ReportGroup model is_celery_task_sent. We need this to make sure we send task only once per model and on creation of the object.

To send the task we need is the override for the save method to send the task and updating is_celery_task_sent.

We could also check that self.pk is set, but this, for example, won't work for UUID primary key, that's why we use a dedicated field.

class ReportGroup(models.Model):
    STATUS_CHOICES = (
        (REPORT_STATUS.processing, 'Processing'),
        (REPORT_STATUS.error, 'Error'),
        (REPORT_STATUS.finished, 'Finished'),
    )
    created_dt = models.DateTimeField(auto_now_add=True)
    status = models.IntegerField(choices=STATUS_CHOICES, default=REPORT_STATUS.processing)
    is_celery_task_sent = models.BooleanField(default=False)

        def do_celery(self):
        pass

    def save(self, *args, **kwargs):
        super().save(*args, **kwargs)
        if not self.is_celery_task_sent:
            self.is_celery_task_sent = True
            self.do_celery()
            self.save(update_fields=['is_celery_task_sent', ])

We also added the do_celery method where we'll send celery task.

In reports/admin.py in ReportGroupAdmin.readonly_fields also add is_celery_task_sent as well. Basically we don't need any fields editable on this model.

Same goes to our inline class, we don't need any fields editable. In addition, let's set can_delete to False to clean the page a big.

And to make our timestamps display seconds let's add methods to both admin models too.

# reports/admin.py

from django.contrib import admin

from reports.models import ReportGroupTaskLog, ReportGroup


class ReportGroupTaskLogInline(admin.TabularInline):
    model = ReportGroupTaskLog
    extra = 0
    can_delete = False
    readonly_fields = ('task_name', 'status', 'created_dt_precise', 'finished_dt_precise', 'retry',)
    fields = ('task_name', 'status', 'created_dt_precise', 'finished_dt_precise', 'retry')

    def created_dt_precise(self, obj: ReportGroupTaskLog):
        return obj.created_dt.strftime("%Y-%m-%d %H:%M:%S")

    def finished_dt_precise(self, obj: ReportGroupTaskLog):
        if not obj.finished_dt:
            return "---"
        return obj.finished_dt.strftime("%Y-%m-%d %H:%M:%S")


class ReportGroupAdmin(admin.ModelAdmin):
    inlines = [ReportGroupTaskLogInline]
    list_display = ['id', 'status', 'created_dt', ]
    list_filter = ['status', ]
    readonly_fields = ('id', 'status', 'is_celery_task_sent', 'created_dt_precise',)
    fields = ('id', 'status', 'is_celery_task_sent', 'created_dt_precise',)

    def created_dt_precise(self, obj: ReportGroup):
        return obj.created_dt.strftime("%Y-%m-%d %H:%M:%S")


admin.site.register(ReportGroup, ReportGroupAdmin)

Since we have updated our model fields, run makemigrations and migrate commands:

docker-compose run web python manage.py makemigrations
docker-compose run web python manage.py migrate

Creating celery tasks for group

We will create 5 tasks.

Three tasks will be for doing some hard work of time.sleep() and other two to update our report with finished or error state.

Also we'll make our tasks fail with a random chance to simulate API communication errors and how they will be handled by retries.

To illustrate a complete failure we'll make every fifth report fail in the longest task.

Create a file reports/tasks.py.

import random
import time
import arrow
from celery import shared_task

from reports.models import ReportGroup
from reports.tuples import REPORT_STATUS



@shared_task(
    name='report_group_short',
    bind=True,
    acks_late=True,
    autoretry_for=(Exception,),
    max_retries=5,
    retry_backoff=True,
    retry_backoff_max=500,
    retry_jitter=True,
)
def report_group_short(self, report_id):
    try:
        report = ReportGroup.objects.get(pk=report_id)
    except ReportGroup.DoesNotExist:
        return False
    log = report.reportgrouptasklog_set.create(
        task_name='report_group_short',
        status=REPORT_STATUS.processing,
        retry=self.request.retries
    )
    complexity = random.randint(1, 10)
    time.sleep(complexity)
    try:
        if random.random() < 0.1:
            raise ValueError("API Connectivity Failure")
    except Exception as e:
        log.finished_dt = arrow.now().datetime
        log.status = REPORT_STATUS.error
        log.save(update_fields=['status', 'finished_dt', ])
        raise e
    log.finished_dt = arrow.now().datetime
    log.status = REPORT_STATUS.finished
    log.save(update_fields=['status', 'finished_dt', ])
    return complexity


@shared_task(
    name='report_group_medium',
    bind=True,
    acks_late=True,
    autoretry_for=(Exception,),
    max_retries=5,
    retry_backoff=True,
    retry_backoff_max=500,
    retry_jitter=True,
)
def report_group_medium(self, report_id):
    try:
        report = ReportGroup.objects.get(pk=report_id)
    except ReportGroup.DoesNotExist:
        return False
    log = report.reportgrouptasklog_set.create(
        task_name='report_group_medium',
        status=REPORT_STATUS.processing,
        retry=self.request.retries
    )
    complexity = random.randint(10, 30)
    time.sleep(complexity)
    try:
        if random.random() < 0.1:
            raise ValueError("API Connectivity Failure")
    except Exception as e:
        log.finished_dt = arrow.now().datetime
        log.status = REPORT_STATUS.error
        log.save(update_fields=['status', 'finished_dt', ])
        raise e
    log.status = REPORT_STATUS.finished
    log.finished_dt = arrow.now().datetime
    log.save(update_fields=['status', 'finished_dt', ])
    return complexity


@shared_task(
    name='report_group_long',
    bind=True,
    acks_late=True,
    autoretry_for=(Exception,),
    max_retries=5,
    retry_backoff=True,
    retry_backoff_max=500,
    retry_jitter=True,
)
def report_group_long(self, report_id):
    try:
        report = ReportGroup.objects.get(pk=report_id)
    except ReportGroup.DoesNotExist:
        return False
    log = report.reportgrouptasklog_set.create(
        task_name='report_group_medium',
        status=REPORT_STATUS.processing,
        retry=self.request.retries
    )
    complexity = random.randint(10, 100)
    time.sleep(complexity)
    try:
        if random.random() < 0.1:
            raise ValueError("API Connectivity Failure")
        if report.id % 5 == 0:
            raise ValueError("Unrecoverable API error")
    except Exception as e:
        log.finished_dt = arrow.now().datetime
        log.status = REPORT_STATUS.error
        log.save(update_fields=['status', 'finished_dt', ])
        raise e
    log.status = REPORT_STATUS.finished
    log.finished_dt = arrow.now().datetime
    log.save(update_fields=['status', 'finished_dt', ])
    return complexity


@shared_task(
    name="report_group_finished",
    bind=True,
    acks_late=True,
    autoretry_for=(Exception,),
    max_retries=5,
    retry_backoff=True,
    retry_backoff_max=500,
    retry_jitter=True,
)
def report_group_finished(self, report_id):
    try:
        report = ReportGroup.objects.get(pk=report_id)
    except ReportGroup.DoesNotExist:
        return False
    log = report.reportgrouptasklog_set.create(
        task_name='report_group_finished',
        status=REPORT_STATUS.finished,
        retry=self.request.retries
    )
    report.status = REPORT_STATUS.finished
    report.save(update_fields=['status', ])
    log.finished_dt = arrow.now().datetime
    log.save(update_fields=['status', 'finished_dt', ])
    return True


@shared_task(
    name="report_group_error",
    bind=True,
    acks_late=True,
    autoretry_for=(Exception,),
    max_retries=5,
    retry_backoff=True,
    retry_backoff_max=500,
    retry_jitter=True,
)
def report_group_error(self, report_id):
    try:
        report = ReportGroup.objects.get(pk=report_id)
    except ReportGroup.DoesNotExist:
        return False
    log = report.reportgrouptasklog_set.create(
        task_name='report_group_error',
        status=REPORT_STATUS.finished,
        retry=self.request.retries
    )
    report.status = REPORT_STATUS.error
    report.save(update_fields=['status', ])
    log.finished_dt = arrow.now().datetime
    log.save(update_fields=['status', 'finished_dt', ])
    return True

Our processing tasks create a log record for them so we can keep track on when they were executed and what is their current state.

In the reports/models.py let's update the do_celery method:

# reports/models.py

from celery import signature, chord


class ReportGroup(models.Model):
    # ...
    def do_celery(self):
        s_short = signature("report_group_short", kwargs={"report_id": self.pk})
        s_medium = signature("report_group_medium", kwargs={"report_id": self.pk})
        s_long = signature("report_group_long", kwargs={"report_id": self.pk})
        tasks = [s_short, s_medium, s_long]
        s_error = signature("report_group_error", kwargs={"report_id": self.pk}, immutable=True)
        s_finished = signature("report_group_finished", kwargs={"report_id": self.pk}, immutable=True)
        job = group(*tasks)
        job.link(s_finished)
        job.link_error(s_error)
        transaction.on_commit(lambda: job.apply_async())

Here we create 5 signatures, 3 for our hard working, and the other two are callbacks to set status of report to failed or finished. Both callbacks are market immutable since we don't need any result passed to them. But keep in mind that group callback don't get any results from group anyway. I just wanted to have same signatures for both groups and chords example.

Head over to admin http://127.0.0.1:8060/admin/reports/reportgroup/ and create a new report. For convenience – click "save and continue editing" so you will stay on the page of the report.

Group Reports List

Create a report in Django Admin

On the report change page, refresh the page to see task logs to appear.

You will see that all of them are in Processing state.

Report Processing

Refresh again and you will see some that shorter task has already finished, others are still working.

Report partially finished

Also there will be the forth task, "report_group_finished".

Refresh once more:

Report hasn't finished, but callback already finished

Finally, with one retry last task has finished.

Report finished

Key takeaways?

  • Callback for groups are called when one of the tasks in the group finishes.
  • Callback is called only once per group
  • Callbacks don't receive results from the group tasks
  • You can't rely on the group callback to do something when whole group is finished

Celery Chords Example

Create models for chords

Now let's create similar models for chords.

They should have same structure, but we'll use chords instead of groups and call new tasks for chords.

Add this to the reports/models.py:

class ReportChord(models.Model):
    STATUS_CHOICES = (
        (REPORT_STATUS.processing, 'Processing'),
        (REPORT_STATUS.error, 'Error'),
        (REPORT_STATUS.finished, 'Finished'),
    )
    created_dt = models.DateTimeField(auto_now_add=True)
    status = models.IntegerField(choices=STATUS_CHOICES, default=REPORT_STATUS.processing)
    is_celery_task_sent = models.BooleanField(default=False)

    def __str__(self):
        return str(self.pk)

    class Meta:
        verbose_name = "Chord Report"
        verbose_name_plural = "Chord Reports"
        ordering = ('-pk',)

    def do_celery(self):
        s_short = signature("report_group_short", kwargs={"report_id": self.pk})
        s_medium = signature("report_group_medium", kwargs={"report_id": self.pk})
        s_long = signature("report_group_long", kwargs={"report_id": self.pk})
        tasks = [s_short, s_medium, s_long]
        s_error = signature("report_group_error", kwargs={"report_id": self.pk}, immutable=True)
        s_finished = signature("report_group_finished", kwargs={"report_id": self.pk}, immutable=True)
        s_finished.link_error(s_error)
        transaction.on_commit(lambda: chord(tasks)(s_finished))

    def save(self, *args, **kwargs):
        super().save(*args, **kwargs)
        if not self.is_celery_task_sent:
            self.is_celery_task_sent = True
            self.do_celery()
            self.save(update_fields=['is_celery_task_sent', ])


class ReportChordTaskLog(models.Model):
    STATUS_CHOICES = (
        (REPORT_STATUS.processing, 'Processing'),
        (REPORT_STATUS.error, 'Error'),
        (REPORT_STATUS.finished, 'Finished'),
    )
    report = models.ForeignKey(ReportChord, on_delete=models.CASCADE)
    task_name = models.CharField(max_length=255)
    status = models.IntegerField(choices=STATUS_CHOICES, default=REPORT_STATUS.processing)
    retry = models.IntegerField(default=0)
    created_dt = models.DateTimeField(auto_now_add=True)
    finished_dt = models.DateTimeField(null=True)

    def __str__(self):
        return f"{self.report_id} {self.task_name}"

    class Meta:
        verbose_name = "Chord Report Task Log"
        verbose_name_plural = "Chord Reports Task Log"
        ordering = ('created_dt',)

What has changed is the implementation of ReportChord.do_celery – we use chord. Add code for Django Admin to reports/admin.py for newly created models:

class ReportChordTaskLogInline(admin.TabularInline):
    model = ReportChordTaskLog
    extra = 0
    can_delete = False
    readonly_fields = ('task_name', 'status', 'created_dt_precise', 'finished_dt_precise', 'retry',)
    fields = ('task_name', 'status', 'created_dt_precise', 'finished_dt_precise', 'retry')

    def created_dt_precise(self, obj: ReportGroupTaskLog):
        return obj.created_dt.strftime("%Y-%m-%d %H:%M:%S")

    def finished_dt_precise(self, obj: ReportGroupTaskLog):
        if not obj.finished_dt:
            return "---"
        return obj.finished_dt.strftime("%Y-%m-%d %H:%M:%S")


class ReportChordAdmin(admin.ModelAdmin):
    inlines = [ReportChordTaskLogInline]
    list_display = ['id', 'status', 'created_dt', ]
    list_filter = ['status', ]
    readonly_fields = ('id', 'status', 'is_celery_task_sent', 'created_dt_precise',)
    fields = ('id', 'status', 'is_celery_task_sent', 'created_dt_precise',)

    def created_dt_precise(self, obj: ReportGroup):
        return obj.created_dt.strftime("%Y-%m-%d %H:%M:%S")


admin.site.register(ReportChord, ReportChordAdmin)

Celery Tasks for Chords

Add these imports and code for tasks that we'll use in ReportChord. The reason to add separate tasks is because we need to populate ReportChordTaskLogs instead of the ones for groups, otherwise they are same.

from reports.models import ReportGroup, ReportChord  # added ReportChord

# ....

@shared_task(
    name='report_chord_short',
    bind=True,
    acks_late=True,
    autoretry_for=(Exception,),
    max_retries=5,
    retry_backoff=True,
    retry_backoff_max=500,
    retry_jitter=True,
)
def report_chord_short(self, report_id):
    try:
        report = ReportChord.objects.get(pk=report_id)
    except ReportChord.DoesNotExist:
        return False
    log = report.reportchordtasklog_set.create(
        task_name='report_chord_short',
        status=REPORT_STATUS.processing,
        retry=self.request.retries
    )
    complexity = random.randint(1, 10)
    time.sleep(complexity)
    try:
        if random.random() < 0.1:
            raise ValueError("API Connectivity Failure")
    except Exception as e:
        log.finished_dt = arrow.now().datetime
        log.status = REPORT_STATUS.error
        log.save(update_fields=['status', 'finished_dt', ])
        raise e
    log.finished_dt = arrow.now().datetime
    log.status = REPORT_STATUS.finished
    log.save(update_fields=['status', 'finished_dt', ])
    return complexity


@shared_task(
    name='report_chord_medium',
    bind=True,
    acks_late=True,
    autoretry_for=(Exception,),
    max_retries=5,
    retry_backoff=True,
    retry_backoff_max=500,
    retry_jitter=True,
)
def report_chord_medium(self, report_id):
    try:
        report = ReportChord.objects.get(pk=report_id)
    except ReportChord.DoesNotExist:
        return False
    log = report.reportchordtasklog_set.create(
        task_name='report_chord_medium',
        status=REPORT_STATUS.processing,
        retry=self.request.retries
    )
    complexity = random.randint(10, 30)
    time.sleep(complexity)
    try:
        if random.random() < 0.1:
            raise ValueError("API Connectivity Failure")
    except Exception as e:
        log.finished_dt = arrow.now().datetime
        log.status = REPORT_STATUS.error
        log.save(update_fields=['status', 'finished_dt', ])
        raise e
    log.status = REPORT_STATUS.finished
    log.finished_dt = arrow.now().datetime
    log.save(update_fields=['status', 'finished_dt', ])
    return complexity


@shared_task(
    name='report_chord_long',
    bind=True,
    acks_late=True,
    autoretry_for=(Exception,),
    max_retries=5,
    retry_backoff=True,
    retry_backoff_max=500,
    retry_jitter=True,
)
def report_chord_long(self, report_id):
    try:
        report = ReportChord.objects.get(pk=report_id)
    except ReportChord.DoesNotExist:
        return False
    log = report.reportchordtasklog_set.create(
        task_name='report_chord_medium',
        status=REPORT_STATUS.processing,
        retry=self.request.retries
    )
    complexity = random.randint(10, 100)
    time.sleep(complexity)
    try:
        if random.random() < 0.1:
            raise ValueError("API Connectivity Failure")
        if report.id % 5 == 0:
            raise ValueError("Unrecoverable API error")
    except Exception as e:
        log.finished_dt = arrow.now().datetime
        log.status = REPORT_STATUS.error
        log.save(update_fields=['status', 'finished_dt', ])
        raise e

    log.status = REPORT_STATUS.finished
    log.finished_dt = arrow.now().datetime
    log.save(update_fields=['status', 'finished_dt', ])
    return complexity


@shared_task(
    name="report_chord_finished",
    bind=True,
    acks_late=True,
    autoretry_for=(Exception,),
    max_retries=5,
    retry_backoff=True,
    retry_backoff_max=500,
    retry_jitter=True,
)
def report_chord_finished(self, report_id):
    try:
        report = ReportChord.objects.get(pk=report_id)
    except ReportChord.DoesNotExist:
        return False
    log = report.reportchordtasklog_set.create(
        task_name='report_chord_finished',
        status=REPORT_STATUS.finished,
        retry=self.request.retries
    )
    report.status = REPORT_STATUS.finished
    report.save(update_fields=['status', ])
    log.finished_dt = arrow.now().datetime
    log.save(update_fields=['status', 'finished_dt', ])
    return True


@shared_task(
    name="report_chord_error",
    bind=True,
    acks_late=True,
    autoretry_for=(Exception,),
    max_retries=5,
    retry_backoff=True,
    retry_backoff_max=500,
    retry_jitter=True,
)
def report_chord_error(self, report_id):
    try:
        report = ReportChord.objects.get(pk=report_id)
    except ReportChord.DoesNotExist:
        return False
    log = report.reportchordtasklog_set.create(
        task_name='report_chord_error',
        status=REPORT_STATUS.finished,
        retry=self.request.retries
    )
    report.status = REPORT_STATUS.error
    report.save(update_fields=['status', ])
    log.finished_dt = arrow.now().datetime
    log.save(update_fields=['status', 'finished_dt', ])
    return True

Now make and apply migrations:

docker-compose run web python manage.py makemigrations
docker-compose run web python manage.py migrate

It is important to restart celery, since it doesn't reload on our code changes. In console where you have docker-compose up press CTRL-C and start it again. You might also want to run docker-compose stop if containers didn't really stop or you don't see any changes, then start it again with docker-compose up.

When containers are up go to new admin page for Chord Reports http://127.0.0.1:8060/admin/reports/reportchord/

and Click "Add Chord Report"

Click "Save and continue" and refresh the page after a couple of seconds.

Chord Report Started

After refresh you see that one task is finished and callback wasn't called: Chord Report Partially finished

Finally when all chord tasks have finished then "finished" callback was called:

Chord Report Finished

Now let's see example with hard fail report.

I have created the ReportChord that got ID 15.

After 6 attempts (we have 5 retries set in task definition) report_chord_long gave up and the error callback report_chord_error was called setting the whole report to failed.

Chord Hard Failure

Key Takeaways?

  • Callback for chords are called when all of the tasks in the a given chord finishes.
  • Callback is called only once per chord
  • You can rely on the chord callback to do something after whole chord is finished.