Celery is a powerful task queue that allows more complex workflow than just sending one-off tasks.
In this tutorial we will explore how to optimize performance of your app with running tasks in parallel.
For that Celery offers two options – groups and chords.
Typical cases for tasks you would like to run in parallel are: - waiting for responses from a slow external API resource, - running heavy computations on different nodes.
Objectives¶
For this tutorial we'll have several tasks where we will simulate long running code with time.sleep()
with random delay.
Also to understand how retries work in groups and chords we'll simulate randomly throwing exceptions.
Both of these things are typical for working with any external API.
Source code:¶
https://github.com/appliku/celery_groups_and_chords
Setup the project¶
For this tutorial let's use the Djangitos template.
This tutorial requires Docker installed on your machine.
curl -sSL https://github.com/appliku/djangitos/archive/refs/heads/master.zip > djangitos.zip
unzip djangitos.zip
mv djangitos-master celery_groups_and_chords
cd celery_groups_and_chords
cp start.env .env
docker-compose build
If you are using PyCharm on Mac you can open project with open -a pycharm .
.
In order to create all DB tables and create a user run these commands:
docker-compose run web python manage.py migrate
docker-compose run web python manage.py makesuperuser
Last command will print a password for the newly created super user, make sure you copy somewhere. We'll need it.
To run the project execute the following command: docker-compose up
.
Create an app¶
Let's create an app that will have a model ReportGroup
. For every created report we will be executing groups of tasks. We will also create a ReportGroupTaskLog
model to store tasks runs and timestamps.
We will create similar models ReportChord
and ReportChordTaskLog
for implementation using chords.
We will not create any views, but use Django Admin instead.
Run the following command to create the app:
docker-compose run web python manage.py startapp reports
We need to add reports
app to INSTALLED_APPS
.
In order to do that in Djangitos, where we use django-configuration
, you need to open project/settings.py and add it into the list PROJECT_APPS so it looks like this:
class ProjectConfig(BaseConfig):
PROJECT_APPS = BaseConfig.PROJECT_APPS + [
# add your apps here
'reports'
]
Example With Celery Groups¶
Create Django models for Celery Groups¶
Open the reports/models.py
and let's add our models.
from django.db import models
from reports.tuples import REPORT_STATUS
class ReportGroup(models.Model):
STATUS_CHOICES = (
(REPORT_STATUS.processing, 'Processing'),
(REPORT_STATUS.error, 'Error'),
(REPORT_STATUS.finished, 'Finished'),
)
created_dt = models.DateTimeField(auto_now_add=True)
status = models.IntegerField(choices=STATUS_CHOICES, default=REPORT_STATUS.processing)
def __str__(self):
return str(self.pk)
class Meta:
verbose_name = "Report"
verbose_name_plural = "Reports"
ordering = ('-pk',)
class ReportGroupTaskLog(models.Model):
STATUS_CHOICES = (
(REPORT_STATUS.processing, 'Processing'),
(REPORT_STATUS.error, 'Error'),
(REPORT_STATUS.finished, 'Finished'),
)
report = models.ForeignKey(Report, on_delete=models.CASCADE)
task_name = models.CharField(max_length=255)
status = models.IntegerField(choices=STATUS_CHOICES, default=REPORT_STATUS.processing)
retry = models.IntegerField(default=0)
created_dt = models.DateTimeField(auto_now_add=True)
finished_dt = models.DateTimeField(null=True)
def __str__(self):
return f"{self.report_id} {self.task_name}"
class Meta:
verbose_name = "Report Task Log"
verbose_name_plural = "Reports Task Log"
ordering = ('created_dt',)
Create a file reports/tuples.py
with a namedtuple
for statuses:
from collections import namedtuple
REPORT_STATUS = namedtuple('REPORT_STATUS', 'processing error finished')._make(range(3))
Open reports/admin.py
and let's include our code to display those models in the Django admin panel.
from django.contrib import admin
from reports.models import ReportGroupTaskLog, ReportGroup
class ReportGroupTaskLogInline(admin.TabularInline):
model = ReportGroupTaskLog
extra = 0
class ReportGroupAdmin(admin.ModelAdmin):
inlines = [ReportGroupTaskLogInline]
list_display = ['id', 'status', 'created_dt', ]
list_filter = ['status', ]
readonly_fields = ('status',)
admin.site.register(ReportGroup, ReportGroupAdmin)
Now it is time to make and apply migrations.
Run the following command:
docker-compose run web python manage.py makemigrations
docker-compose run web python manage.py migrate
Let's see the admin interface, open http://127.0.0.1:8060/admin/reports/reportgroup/ and login with email and password from the makesuperuser
command logs.
Now we need to send task on model creation.
Let's do this on model save.
We need to track if we have already sent a celery task.
So add a field to ReportGroup
model is_celery_task_sent
. We need this to make sure we send task only once per model and on creation of the object.
To send the task we need is the override for the save method to send the task and updating is_celery_task_sent
.
We could also check that self.pk
is set, but this, for example, won't work for UUID primary key, that's why we use a dedicated field.
class ReportGroup(models.Model):
STATUS_CHOICES = (
(REPORT_STATUS.processing, 'Processing'),
(REPORT_STATUS.error, 'Error'),
(REPORT_STATUS.finished, 'Finished'),
)
created_dt = models.DateTimeField(auto_now_add=True)
status = models.IntegerField(choices=STATUS_CHOICES, default=REPORT_STATUS.processing)
is_celery_task_sent = models.BooleanField(default=False)
def do_celery(self):
pass
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
if not self.is_celery_task_sent:
self.is_celery_task_sent = True
self.do_celery()
self.save(update_fields=['is_celery_task_sent', ])
We also added the do_celery
method where we'll send celery task.
In reports/admin.py
in ReportGroupAdmin.readonly_fields
also add is_celery_task_sent
as well. Basically we don't need any fields editable on this model.
Same goes to our inline class, we don't need any fields editable. In addition, let's set can_delete
to False
to clean the page a big.
And to make our timestamps display seconds let's add methods to both admin models too.
# reports/admin.py
from django.contrib import admin
from reports.models import ReportGroupTaskLog, ReportGroup
class ReportGroupTaskLogInline(admin.TabularInline):
model = ReportGroupTaskLog
extra = 0
can_delete = False
readonly_fields = ('task_name', 'status', 'created_dt_precise', 'finished_dt_precise', 'retry',)
fields = ('task_name', 'status', 'created_dt_precise', 'finished_dt_precise', 'retry')
def created_dt_precise(self, obj: ReportGroupTaskLog):
return obj.created_dt.strftime("%Y-%m-%d %H:%M:%S")
def finished_dt_precise(self, obj: ReportGroupTaskLog):
if not obj.finished_dt:
return "---"
return obj.finished_dt.strftime("%Y-%m-%d %H:%M:%S")
class ReportGroupAdmin(admin.ModelAdmin):
inlines = [ReportGroupTaskLogInline]
list_display = ['id', 'status', 'created_dt', ]
list_filter = ['status', ]
readonly_fields = ('id', 'status', 'is_celery_task_sent', 'created_dt_precise',)
fields = ('id', 'status', 'is_celery_task_sent', 'created_dt_precise',)
def created_dt_precise(self, obj: ReportGroup):
return obj.created_dt.strftime("%Y-%m-%d %H:%M:%S")
admin.site.register(ReportGroup, ReportGroupAdmin)
Since we have updated our model fields, run makemigrations
and migrate
commands:
docker-compose run web python manage.py makemigrations
docker-compose run web python manage.py migrate
Creating celery tasks for group¶
We will create 5 tasks.
Three tasks will be for doing some hard work of time.sleep()
and other two to update our report with finished
or error
state.
Also we'll make our tasks fail with a random chance to simulate API communication errors and how they will be handled by retries.
To illustrate a complete failure we'll make every fifth report fail in the longest task.
Create a file reports/tasks.py
.
import random
import time
import arrow
from celery import shared_task
from reports.models import ReportGroup
from reports.tuples import REPORT_STATUS
@shared_task(
name='report_group_short',
bind=True,
acks_late=True,
autoretry_for=(Exception,),
max_retries=5,
retry_backoff=True,
retry_backoff_max=500,
retry_jitter=True,
)
def report_group_short(self, report_id):
try:
report = ReportGroup.objects.get(pk=report_id)
except ReportGroup.DoesNotExist:
return False
log = report.reportgrouptasklog_set.create(
task_name='report_group_short',
status=REPORT_STATUS.processing,
retry=self.request.retries
)
complexity = random.randint(1, 10)
time.sleep(complexity)
try:
if random.random() < 0.1:
raise ValueError("API Connectivity Failure")
except Exception as e:
log.finished_dt = arrow.now().datetime
log.status = REPORT_STATUS.error
log.save(update_fields=['status', 'finished_dt', ])
raise e
log.finished_dt = arrow.now().datetime
log.status = REPORT_STATUS.finished
log.save(update_fields=['status', 'finished_dt', ])
return complexity
@shared_task(
name='report_group_medium',
bind=True,
acks_late=True,
autoretry_for=(Exception,),
max_retries=5,
retry_backoff=True,
retry_backoff_max=500,
retry_jitter=True,
)
def report_group_medium(self, report_id):
try:
report = ReportGroup.objects.get(pk=report_id)
except ReportGroup.DoesNotExist:
return False
log = report.reportgrouptasklog_set.create(
task_name='report_group_medium',
status=REPORT_STATUS.processing,
retry=self.request.retries
)
complexity = random.randint(10, 30)
time.sleep(complexity)
try:
if random.random() < 0.1:
raise ValueError("API Connectivity Failure")
except Exception as e:
log.finished_dt = arrow.now().datetime
log.status = REPORT_STATUS.error
log.save(update_fields=['status', 'finished_dt', ])
raise e
log.status = REPORT_STATUS.finished
log.finished_dt = arrow.now().datetime
log.save(update_fields=['status', 'finished_dt', ])
return complexity
@shared_task(
name='report_group_long',
bind=True,
acks_late=True,
autoretry_for=(Exception,),
max_retries=5,
retry_backoff=True,
retry_backoff_max=500,
retry_jitter=True,
)
def report_group_long(self, report_id):
try:
report = ReportGroup.objects.get(pk=report_id)
except ReportGroup.DoesNotExist:
return False
log = report.reportgrouptasklog_set.create(
task_name='report_group_medium',
status=REPORT_STATUS.processing,
retry=self.request.retries
)
complexity = random.randint(10, 100)
time.sleep(complexity)
try:
if random.random() < 0.1:
raise ValueError("API Connectivity Failure")
if report.id % 5 == 0:
raise ValueError("Unrecoverable API error")
except Exception as e:
log.finished_dt = arrow.now().datetime
log.status = REPORT_STATUS.error
log.save(update_fields=['status', 'finished_dt', ])
raise e
log.status = REPORT_STATUS.finished
log.finished_dt = arrow.now().datetime
log.save(update_fields=['status', 'finished_dt', ])
return complexity
@shared_task(
name="report_group_finished",
bind=True,
acks_late=True,
autoretry_for=(Exception,),
max_retries=5,
retry_backoff=True,
retry_backoff_max=500,
retry_jitter=True,
)
def report_group_finished(self, report_id):
try:
report = ReportGroup.objects.get(pk=report_id)
except ReportGroup.DoesNotExist:
return False
log = report.reportgrouptasklog_set.create(
task_name='report_group_finished',
status=REPORT_STATUS.finished,
retry=self.request.retries
)
report.status = REPORT_STATUS.finished
report.save(update_fields=['status', ])
log.finished_dt = arrow.now().datetime
log.save(update_fields=['status', 'finished_dt', ])
return True
@shared_task(
name="report_group_error",
bind=True,
acks_late=True,
autoretry_for=(Exception,),
max_retries=5,
retry_backoff=True,
retry_backoff_max=500,
retry_jitter=True,
)
def report_group_error(self, report_id):
try:
report = ReportGroup.objects.get(pk=report_id)
except ReportGroup.DoesNotExist:
return False
log = report.reportgrouptasklog_set.create(
task_name='report_group_error',
status=REPORT_STATUS.finished,
retry=self.request.retries
)
report.status = REPORT_STATUS.error
report.save(update_fields=['status', ])
log.finished_dt = arrow.now().datetime
log.save(update_fields=['status', 'finished_dt', ])
return True
Our processing tasks create a log record for them so we can keep track on when they were executed and what is their current state.
In the reports/models.py
let's update the do_celery
method:
# reports/models.py
from celery import signature, chord
class ReportGroup(models.Model):
# ...
def do_celery(self):
s_short = signature("report_group_short", kwargs={"report_id": self.pk})
s_medium = signature("report_group_medium", kwargs={"report_id": self.pk})
s_long = signature("report_group_long", kwargs={"report_id": self.pk})
tasks = [s_short, s_medium, s_long]
s_error = signature("report_group_error", kwargs={"report_id": self.pk}, immutable=True)
s_finished = signature("report_group_finished", kwargs={"report_id": self.pk}, immutable=True)
job = group(*tasks)
job.link(s_finished)
job.link_error(s_error)
transaction.on_commit(lambda: job.apply_async())
Here we create 5 signatures, 3 for our hard working, and the other two are callbacks to set status of report to failed or finished. Both callbacks are market immutable since we don't need any result passed to them. But keep in mind that group callback don't get any results from group anyway. I just wanted to have same signatures for both groups and chords example.
Head over to admin http://127.0.0.1:8060/admin/reports/reportgroup/ and create a new report. For convenience – click "save and continue editing" so you will stay on the page of the report.
On the report change page, refresh the page to see task logs to appear.
You will see that all of them are in Processing state.
Refresh again and you will see some that shorter task has already finished, others are still working.
Also there will be the forth task, "report_group_finished".
Refresh once more:
Finally, with one retry last task has finished.
Key takeaways?¶
- Callback for groups are called when one of the tasks in the group finishes.
- Callback is called only once per group
- Callbacks don't receive results from the group tasks
- You can't rely on the group callback to do something when whole group is finished
Celery Chords Example¶
Create models for chords¶
Now let's create similar models for chords.
They should have same structure, but we'll use chords instead of groups and call new tasks for chords.
Add this to the reports/models.py
:
class ReportChord(models.Model):
STATUS_CHOICES = (
(REPORT_STATUS.processing, 'Processing'),
(REPORT_STATUS.error, 'Error'),
(REPORT_STATUS.finished, 'Finished'),
)
created_dt = models.DateTimeField(auto_now_add=True)
status = models.IntegerField(choices=STATUS_CHOICES, default=REPORT_STATUS.processing)
is_celery_task_sent = models.BooleanField(default=False)
def __str__(self):
return str(self.pk)
class Meta:
verbose_name = "Chord Report"
verbose_name_plural = "Chord Reports"
ordering = ('-pk',)
def do_celery(self):
s_short = signature("report_group_short", kwargs={"report_id": self.pk})
s_medium = signature("report_group_medium", kwargs={"report_id": self.pk})
s_long = signature("report_group_long", kwargs={"report_id": self.pk})
tasks = [s_short, s_medium, s_long]
s_error = signature("report_group_error", kwargs={"report_id": self.pk}, immutable=True)
s_finished = signature("report_group_finished", kwargs={"report_id": self.pk}, immutable=True)
s_finished.link_error(s_error)
transaction.on_commit(lambda: chord(tasks)(s_finished))
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
if not self.is_celery_task_sent:
self.is_celery_task_sent = True
self.do_celery()
self.save(update_fields=['is_celery_task_sent', ])
class ReportChordTaskLog(models.Model):
STATUS_CHOICES = (
(REPORT_STATUS.processing, 'Processing'),
(REPORT_STATUS.error, 'Error'),
(REPORT_STATUS.finished, 'Finished'),
)
report = models.ForeignKey(ReportChord, on_delete=models.CASCADE)
task_name = models.CharField(max_length=255)
status = models.IntegerField(choices=STATUS_CHOICES, default=REPORT_STATUS.processing)
retry = models.IntegerField(default=0)
created_dt = models.DateTimeField(auto_now_add=True)
finished_dt = models.DateTimeField(null=True)
def __str__(self):
return f"{self.report_id} {self.task_name}"
class Meta:
verbose_name = "Chord Report Task Log"
verbose_name_plural = "Chord Reports Task Log"
ordering = ('created_dt',)
What has changed is the implementation of ReportChord.do_celery
– we use chord
.
Add code for Django Admin to reports/admin.py
for newly created models:
class ReportChordTaskLogInline(admin.TabularInline):
model = ReportChordTaskLog
extra = 0
can_delete = False
readonly_fields = ('task_name', 'status', 'created_dt_precise', 'finished_dt_precise', 'retry',)
fields = ('task_name', 'status', 'created_dt_precise', 'finished_dt_precise', 'retry')
def created_dt_precise(self, obj: ReportGroupTaskLog):
return obj.created_dt.strftime("%Y-%m-%d %H:%M:%S")
def finished_dt_precise(self, obj: ReportGroupTaskLog):
if not obj.finished_dt:
return "---"
return obj.finished_dt.strftime("%Y-%m-%d %H:%M:%S")
class ReportChordAdmin(admin.ModelAdmin):
inlines = [ReportChordTaskLogInline]
list_display = ['id', 'status', 'created_dt', ]
list_filter = ['status', ]
readonly_fields = ('id', 'status', 'is_celery_task_sent', 'created_dt_precise',)
fields = ('id', 'status', 'is_celery_task_sent', 'created_dt_precise',)
def created_dt_precise(self, obj: ReportGroup):
return obj.created_dt.strftime("%Y-%m-%d %H:%M:%S")
admin.site.register(ReportChord, ReportChordAdmin)
Celery Tasks for Chords¶
Add these imports and code for tasks that we'll use in ReportChord. The reason to add separate tasks is because we need to populate ReportChordTaskLogs instead of the ones for groups, otherwise they are same.
from reports.models import ReportGroup, ReportChord # added ReportChord
# ....
@shared_task(
name='report_chord_short',
bind=True,
acks_late=True,
autoretry_for=(Exception,),
max_retries=5,
retry_backoff=True,
retry_backoff_max=500,
retry_jitter=True,
)
def report_chord_short(self, report_id):
try:
report = ReportChord.objects.get(pk=report_id)
except ReportChord.DoesNotExist:
return False
log = report.reportchordtasklog_set.create(
task_name='report_chord_short',
status=REPORT_STATUS.processing,
retry=self.request.retries
)
complexity = random.randint(1, 10)
time.sleep(complexity)
try:
if random.random() < 0.1:
raise ValueError("API Connectivity Failure")
except Exception as e:
log.finished_dt = arrow.now().datetime
log.status = REPORT_STATUS.error
log.save(update_fields=['status', 'finished_dt', ])
raise e
log.finished_dt = arrow.now().datetime
log.status = REPORT_STATUS.finished
log.save(update_fields=['status', 'finished_dt', ])
return complexity
@shared_task(
name='report_chord_medium',
bind=True,
acks_late=True,
autoretry_for=(Exception,),
max_retries=5,
retry_backoff=True,
retry_backoff_max=500,
retry_jitter=True,
)
def report_chord_medium(self, report_id):
try:
report = ReportChord.objects.get(pk=report_id)
except ReportChord.DoesNotExist:
return False
log = report.reportchordtasklog_set.create(
task_name='report_chord_medium',
status=REPORT_STATUS.processing,
retry=self.request.retries
)
complexity = random.randint(10, 30)
time.sleep(complexity)
try:
if random.random() < 0.1:
raise ValueError("API Connectivity Failure")
except Exception as e:
log.finished_dt = arrow.now().datetime
log.status = REPORT_STATUS.error
log.save(update_fields=['status', 'finished_dt', ])
raise e
log.status = REPORT_STATUS.finished
log.finished_dt = arrow.now().datetime
log.save(update_fields=['status', 'finished_dt', ])
return complexity
@shared_task(
name='report_chord_long',
bind=True,
acks_late=True,
autoretry_for=(Exception,),
max_retries=5,
retry_backoff=True,
retry_backoff_max=500,
retry_jitter=True,
)
def report_chord_long(self, report_id):
try:
report = ReportChord.objects.get(pk=report_id)
except ReportChord.DoesNotExist:
return False
log = report.reportchordtasklog_set.create(
task_name='report_chord_medium',
status=REPORT_STATUS.processing,
retry=self.request.retries
)
complexity = random.randint(10, 100)
time.sleep(complexity)
try:
if random.random() < 0.1:
raise ValueError("API Connectivity Failure")
if report.id % 5 == 0:
raise ValueError("Unrecoverable API error")
except Exception as e:
log.finished_dt = arrow.now().datetime
log.status = REPORT_STATUS.error
log.save(update_fields=['status', 'finished_dt', ])
raise e
log.status = REPORT_STATUS.finished
log.finished_dt = arrow.now().datetime
log.save(update_fields=['status', 'finished_dt', ])
return complexity
@shared_task(
name="report_chord_finished",
bind=True,
acks_late=True,
autoretry_for=(Exception,),
max_retries=5,
retry_backoff=True,
retry_backoff_max=500,
retry_jitter=True,
)
def report_chord_finished(self, report_id):
try:
report = ReportChord.objects.get(pk=report_id)
except ReportChord.DoesNotExist:
return False
log = report.reportchordtasklog_set.create(
task_name='report_chord_finished',
status=REPORT_STATUS.finished,
retry=self.request.retries
)
report.status = REPORT_STATUS.finished
report.save(update_fields=['status', ])
log.finished_dt = arrow.now().datetime
log.save(update_fields=['status', 'finished_dt', ])
return True
@shared_task(
name="report_chord_error",
bind=True,
acks_late=True,
autoretry_for=(Exception,),
max_retries=5,
retry_backoff=True,
retry_backoff_max=500,
retry_jitter=True,
)
def report_chord_error(self, report_id):
try:
report = ReportChord.objects.get(pk=report_id)
except ReportChord.DoesNotExist:
return False
log = report.reportchordtasklog_set.create(
task_name='report_chord_error',
status=REPORT_STATUS.finished,
retry=self.request.retries
)
report.status = REPORT_STATUS.error
report.save(update_fields=['status', ])
log.finished_dt = arrow.now().datetime
log.save(update_fields=['status', 'finished_dt', ])
return True
Now make and apply migrations:
docker-compose run web python manage.py makemigrations
docker-compose run web python manage.py migrate
It is important to restart celery, since it doesn't reload on our code changes. In console where you have docker-compose up
press CTRL-C and start it again. You might also want to run docker-compose stop
if containers didn't really stop or you don't see any changes, then start it again with docker-compose up
.
When containers are up go to new admin page for Chord Reports http://127.0.0.1:8060/admin/reports/reportchord/
and Click "Add Chord Report"
Click "Save and continue" and refresh the page after a couple of seconds.
After refresh you see that one task is finished and callback wasn't called:
Finally when all chord tasks have finished then "finished" callback was called:
Now let's see example with hard fail report.
I have created the ReportChord that got ID 15.
After 6 attempts (we have 5 retries set in task definition) report_chord_long
gave up and the error callback report_chord_error
was called setting the whole report to failed.
Key Takeaways?¶
- Callback for chords are called when all of the tasks in the a given chord finishes.
- Callback is called only once per chord
- You can rely on the chord callback to do something after whole chord is finished.