In this tutorial I will show you when to start using multiple queues in Celery and how to do this.
Our hypotetical app will have 3 categories of Celery tasks: - one that are basic tasks that power the interface of the app - long running tasks that process uploaded files - tasks that involve interaction with a 3rd party API and are large in numbers.
As an example it can be a tool that allows uploading of contact information and then checks if emails have MX records, to see if it makes sense to send emails on those addresses.
This way, first type of tasks must work fast in order for user to have a comfortable experience.
Second type of tasks can take a long time to complete and shouldn't block other types of tasks.
Third type of tasks has another characteristic: each of them usually runs fast, but there can be a lot of them, which can also cause other tasks being stuck at the end of they queue, waiting for this hoard to be processed.
Note: The goal of this article is to show how to work with 3 different type of celery tasks in multiple queues:
- Small in numbers, but high priority tasks, default
queue,
- long running tasks, long
queue,
- huge amount of small tasks, numerous
queue.
Other parts of the app are for illustration purposes and can be far from being optimal.
Let's start!
Source code¶
Repository with the source code for this tutorial: https://github.com/appliku/celery_multiple_queues
Setup the project¶
Let's start from getting fresh djangitos project template.
curl -sSL https://appliku.com/djangitos.zip > djangitos.zip
unzip djangitos.zip
mv djangitos-master celery_multiple_queues
cd celery_multiple_queues
cp start.env .env
Requirements¶
These are packages that we will be actively relying in this tutorial:
Django==3.2.7
django-redis==5.0.0
celery==5.1.2
celery-redbeat==2.0.0
arrow==1.1.1
django-environ==0.6.0
django-extensions==3.1.3
factory-boy==3.2.0
Faker==8.14.0
The rest of the requirements you can find here: https://github.com/appliku/djangitos/blob/master/requirements.txt
You can explore Celery setup in djangito/celeryapp.py
and in Procfile
where you find what commands are used to run Celery worker and scheduler.
By the way for scheduler we use celery-redbeat.
Create an app¶
Create an app myapp
docker-compose run web python manage.py startapp myapp
Add it to installed apps in djangito/settings.py
to the list PROJECT_APPS
:
PROJECT_APPS = [
'usermodel',
'ses_sns',
'myapp', # new
]
Create empty file myapp/tasks.py
.
Create myapp/urls.py
with the following content:
from django.urls import path
from myapp import views
urlpatterns = [
]
In djangito/urls.py
add myapp urls to the root URLConf:
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('', include('myapp.urls')), # new
path('sns/', include('ses_sns.urls')),
path('admin/', admin.site.urls),
path('ckeditor/', include('ckeditor_uploader.urls')),
]
Let's prepare the base template.
Create a file myapp/templates/myapp/base.html
with this content:
<!DOCTYPE html>
<html lang="en">
<head>
<title>{% block title %}{% endblock %}</title>
<meta charset="utf-8">
</head>
<body>
{% block content %}{% endblock %}
</body>
</html>
Now let's start building the view.
Create models and views¶
Our app needs 2 main models: - contact list uploads - contacts
Contact list uploads model will be used for uploading and processing files.
Contact model will have records of emails and names and will be linked to the ID from the upload model.
Put this code into myapp/models.py
:
from django.db import models
from myapp.tuples import CONTACT_UPLOAD_STATUSES
class ContactUpload(models.Model):
STATUS_CHOICES = (
(CONTACT_UPLOAD_STATUSES.pending, 'Pending'),
(CONTACT_UPLOAD_STATUSES.processing, 'Processing'),
(CONTACT_UPLOAD_STATUSES.finished, 'Finished'),
(CONTACT_UPLOAD_STATUSES.failed, 'Failed'),
)
contact_file = models.FileField(upload_to='uploads/%Y/%m/%d/')
created_dt = models.DateTimeField(auto_now_add=True)
status = models.IntegerField(
choices=STATUS_CHOICES,
default=CONTACT_UPLOAD_STATUSES.pending
)
class Meta:
verbose_name = 'Contact Upload'
verbose_name_plural = 'Contact Uploads'
ordering = ('-pk',)
class Contact(models.Model):
email = models.EmailField()
has_mx_records = models.BooleanField(default=None, null=True)
upload = models.ForeignKey(ContactUpload, on_delete=models.CASCADE)
class Meta:
verbose_name = 'Contact'
verbose_name_plural = 'Contacts'
ordering = ('name',)
Create a file myapp/tuples.py
where we define a namedtuple
called CONTACT_UPLOAD_STATUSES
:
from collections import namedtuple
CONTACT_UPLOAD_STATUSES = namedtuple(
'CONTACT_UPLOAD_STATUSES',
"pending processing finished failed")._make(range(4))
Let's create migrations for our new models and apply them.
docker-compose run web python manage.py makemigrations myapp
docker-compose run web python manage.py migrate
Now let's create views for uploading our contact lists.
We will need a view to list all uploads, view for creating upload and a detail view where we list all our contacts.
Add this to myapp/views.py
:
from django.views.generic import CreateView, DetailView, ListView
from myapp.models import ContactUpload
from myapp.tuples import CONTACT_UPLOAD_STATUSES
class ContactUploadListView(ListView):
model = ContactUpload
template_name = 'myapp/list.html'
class ContactUploadCreateView(CreateView):
template_name = 'myapp/create.html'
model = ContactUpload
fields = ('contact_file',)
class ContactUploadDetailView(DetailView):
template_name = 'myapp/detail.html'
model = ContactUpload
object: ContactUpload
def get_context_data(self, **kwargs):
kwargs = super().get_context_data(**kwargs)
if self.object.status == CONTACT_UPLOAD_STATUSES.finished:
kwargs['processing_finished'] = True
return kwargs
One thing to note here is get_context_data
in ContactUpploadDetailView
. We need this to decide if we want to show numbers of imported contacts, which I decided to do only if upload is complete and successful. To avoid comparison in templates with the value from the namedtuple
we do this in the view.
Urls in myapp/urls.py
will look this way:
from django.urls import path
from myapp import views
urlpatterns = [
path("", views.ContactUploadListView.as_view(), name="contact_upload_list"),
path("create", views.ContactUploadCreateView.as_view(), name="contact_upload_create"),
path("detail/<pk>", views.ContactUploadCreateView.as_view(), name="contact_upload_detail"),
]
Next we need 3 templates.
Upload form with multipart attribute to support file uploads in myapp/templates/myapp/create.html
:
{% extends "myapp/base.html" %}
{% block title %}Upload new file{% endblock %}
{% block content %}
<form action="" method="post" enctype="multipart/form-data">
{{ form.as_p }}
{% csrf_token %}
<button type="submit">Upload</button>
</form>
{% endblock %}
List of our uploads in myapp/templates/myapp/list.html
:
{% extends 'myapp/base.html' %}
{% block title %}List of Uploads{% endblock %}
{% block content %}
<a href="{% url "contact_upload_create" %}">Upload New File</a>
<table>
<tr>
<th>ID</th>
<th>FILE</th>
<th>DT ADDED</th>
<th>STATUS</th>
</tr>
{% for object in object_list %}
<tr>
<td><a href="{% url "contact_upload_detail" object.pk %}">{{ object.pk }}</a></td>
<td>{{ object.contact_file.name }}</td>
<td>{{ object.created_dt }}</td>
<td>{{ object.get_status_display }}</td>
</tr>
{% endfor %}
</table>
{% endblock %}
Details of our contact upload in myapp/detail.html
:
{% extends 'myapp/base.html' %}
{% block title %}Viewing Contacts Upload #{{ object.pk }}{% endblock %}
{% block content %}
<h1>{{ object.contact_file.name }}</h1>
<h2>Status: {{ object.get_status_display }}</h2>
{% if processing_finished %}
<h3>Total contacts found: {{ object.contact_set.all.count }}</h3>
{% else %}
<script>
setTimeout(function () {
window.location.reload();
}, 3000);
</script>
{% endif %}
<table>
<tr>
<th>Email</th>
<th>MX Found</th>
</tr>
{% for contact in object.contact_set.all %}
<tr>
<td>{{ contact.email }}</td>
<td>{{ contact.has_mx_records|yesno:"✅,❌,⁇" }}</td>
</tr>
{% endfor %}
</table>
{% endblock %}
I used emoji for simplicity here and readability of the column "MX Found" in the table. In production grade app I might use fontawesome or something like similar.
Now make sure your app is running, if not run docker-compose up
.
Open our app at http://0.0.0.0:8060/
Click on "Upload new file".
You will see a very simple form with a file upload widget.
We don't have anything to upload. Let's make a view to generate a fake contacts file.
A Django view to Generate CSV file with Faker¶
We will write a small view that generates fake data and responds with a FileResponse
.
Edit myapp/views.py
. Add the following imports and a new view:
from io import BytesIO
from django.http import FileResponse
from django.views import View
from faker import Faker
class GenerateFakeContactList(View):
def generate_data(self, number_contacts):
fake = Faker('en_US')
memory_file = BytesIO()
content = '\n'.join([fake.email() for i in range(number_contacts)]).encode('utf-8')
memory_file.write(content)
memory_file.seek(0)
return memory_file
def get(self, request, *args, **kwargs):
number_contacts = 100
if request.GET.get('number_contacts'):
number_contacts = int(request.GET.get('number_contacts'))
return FileResponse(self.generate_data(number_contacts), filename="output.csv", as_attachment=True)
Also add this new view to myapp/urls.py
from django.urls import path
from myapp import views
urlpatterns = [
path("", views.ContactUploadListView.as_view(), name="contact_upload_list"),
path("create", views.ContactUploadCreateView.as_view(), name="contact_upload_create"),
path("detail/<pk>", views.ContactUploadCreateView.as_view(), name="contact_upload_detail"),
path("csv", views.GenerateFakeContactList.as_view(), name="contact_generate"), # new
]
Now you can open it in browser and you should have your .csv downloaded.
http://0.0.0.0:8060/csv
Celery task to process an uploaded file¶
Now let's make 3 tasks.
- First one to process a file upon upload
- Second one to process each contact
- Third one to refresh cache of contact lists
Then we'll update the ContactUploadCreateView
to call the celery task when file is uploaded.
First task will go into our queue for long tasks, second will go to third queue for numerous tasks and third one will go to the default queue, that are high priority tasks that are responsible for user facing data.
Create a file myapp/tasks.py
:
import logging
import dns.resolver
from celery import shared_task, current_app
from django.core.cache import cache
from django.db import transaction
from django.urls import reverse
from myapp.models import ContactUpload, Contact
from myapp.tuples import CONTACT_UPLOAD_STATUSES
logger = logging.getLogger(__name__)
@shared_task(name="process_uploaded_file")
def process_uploaded_file(upload_id: int):
try:
contact_upload = ContactUpload.objects.get(pk=upload_id)
except ContactUpload.DoesNotExist:
return
contact_upload.status = CONTACT_UPLOAD_STATUSES.processing
contact_upload.save(update_fields=['status', ])
try:
file_data = contact_upload.contact_file.read()
lines = file_data.decode('utf-8').split('\n')
for line in lines:
contact = contact_upload.contact_set.create(email=line)
transaction.on_commit(
lambda: current_app.send_task(
'process_contact_mx_records',
kwargs={"contact_id": contact.id},
queue="numerous"))
contact_upload.status = CONTACT_UPLOAD_STATUSES.finished
contact_upload.save(update_fields=['status', ])
except Exception as e:
contact_upload.status = CONTACT_UPLOAD_STATUSES.failed
contact_upload.error_message = str(e)
contact_upload.save(update_fields=['error_message', 'status', ])
@shared_task(name="process_contact_mx_records")
def process_contact_mx_records(contact_id: int):
try:
contact = Contact.objects.get(pk=contact_id)
except Contact.DoesNotExist:
return False
domain = contact.email.split('@')[1]
try:
nameservers = dns.resolver.resolve(domain, rdtype='MX', search=True)
except Exception as e:
_ = e
logger.error(f"Exception while getting MX records for domain {domain}: {e}")
contact.has_mx_records = False
contact.save(update_fields=['has_mx_records', ])
return
logger.debug(f"{nameservers} {dir(nameservers)}")
contact.has_mx_records = len(list(nameservers)) > 0
contact.save(update_fields=['has_mx_records', ])
@shared_task(name="update_contact_lists_numbers")
def update_contact_lists_numbers():
with cache.lock("update_contact_list_numbers", timeout=60, blocking_timeout=1):
contact_lists = [{
"pk": c.pk,
"url": reverse('contact_upload_detail', args=(str(c.pk),)),
"file": c.contact_file.name,
"status": c.get_status_display()
} for c in ContactUpload.objects.all()]
cache.set("contact_lists", contact_lists, 60 * 2)
The task process_uploaded_file
will be called upon file upload with the ID of the ContactUpload
instance via long
queue.
As it goes over rows in the file it will create tasks process_contact_mx_records
with ID of contact via numerous
queue.
Finally update_contact_lists_numbers
we should add to scheduled tasks in djangito/celeryapp.py
in app.conf.beat_schedule
:
app.conf.beat_schedule = {
'update_contact_lists_numbers': {
'task': 'update_contact_lists_numbers',
'schedule': 60,
'options': {
'ignore_result': True,
'expires': 60,
'queue': 'default',
}
},
}
It will be executed every 60 seconds and expire after 60 seconds if it was not processed in time. It will be sent to the default
queue.
Let's update our views in myapp/views.py
:
from django.views.generic import CreateView, DetailView, TemplateView
from django.core.cache import cache
class ContactUploadListView(TemplateView):
model = ContactUpload
template_name = 'myapp/list.html'
def get_context_data(self, **kwargs):
kwargs = super().get_context_data(**kwargs)
kwargs['object_list'] = cache.get('contact_lists', [])
return kwargs
class ContactUploadCreateView(CreateView):
template_name = 'myapp/create.html'
model = ContactUpload
fields = ('contact_file',)
def get_success_url(self):
return reverse('contact_upload_detail', args=(str(self.object.pk),))
def form_valid(self, form):
response = super().form_valid(form)
transaction.on_commit(
lambda: current_app.send_task(
"process_uploaded_file",
kwargs={"upload_id": self.object.id}, queue="long"))
return response
Now our ContactUploadListView
is actually a TemplateView
and it only will use cache data, instead of ORM call.
This whole cache thing is to demonstrate that we can have priority tasks that affect user experience and must be ran without being block by other tasks. Another example of such high priority tasks can be sending emails for password resets. Nobody wants to wait several hours for their password reset instructions to arrive.
In ContactUploadCreateVoew
we have added get_success_url
method and override for form_valid
that on DB transaction commit will send a celery task for the new ContactUpload
instance.
Also update the template myapp/templates/myapp/list.html
for the new data format that we have:
{% extends 'myapp/base.html' %}
{% block title %}List of Uploads{% endblock %}
{% block content %}
<a href="{% url "contact_upload_create" %}">Upload New File</a>
<br>
<a href="{% url "contact_generate" %}">Download Fake CSV</a>
<br>
<table>
<tr>
<th>ID</th>
<th>FILE</th>
<th>STATUS</th>
</tr>
{% for object in object_list %}
<tr>
<td><a href="{{ object.url }}">{{ object.pk }}</a></td>
<td>{{ object.file }}</td>
<td>{{ object.status }}</td>
</tr>
{% endfor %}
</table>
{% endblock %}
Now that we have 2 new queues we want to have more separate celery workers.
Open docker-compose.yml
and add 2 blocks under services: celery-long
and celery-numerous
so the whole file looks like this:
version: '3.3'
services:
redis:
image: redis
ports:
- "6379:6379"
rabbitmq:
image: rabbitmq
environment:
- RABBITMQ_DEFAULT_USER=djangito
- RABBITMQ_DEFAULT_PASS=djangito
- RABBITMQ_DEFAULT_VHOST=djangito
ports:
- "21001:5672"
- "21002:15672"
db:
image: postgres
environment:
- POSTGRES_USER=djangito
- POSTGRES_PASSWORD=djangito
- POSTGRES_DB=djangito
ports:
- "21003:5432"
web:
build: .
restart: always
command: python manage.py runserver 0.0.0.0:8060
env_file:
- .env
ports:
- "127.0.0.1:8060:8060"
volumes:
- .:/code
links:
- db
- redis
- rabbitmq
depends_on:
- db
- redis
- rabbitmq
celery:
build: .
restart: always
command: celery -A djangito.celeryapp:app worker -Q default -n djangito.%%h --without-gossip --without-mingle --without-heartbeat --loglevel=INFO --max-memory-per-child=512000 --concurrency=1
env_file:
- .env
volumes:
- .:/code
links:
- db
- redis
- rabbitmq
depends_on:
- db
- redis
- rabbitmq
celery-long:
build: .
restart: always
command: celery -A djangito.celeryapp:app worker -Q long -n djangito.%%h --without-gossip --without-mingle --without-heartbeat --loglevel=INFO --max-memory-per-child=512000 --concurrency=1
env_file:
- .env
volumes:
- .:/code
links:
- db
- redis
- rabbitmq
depends_on:
- db
- redis
- rabbitmq
celery-numerous:
build: .
restart: always
command: celery -A djangito.celeryapp:app worker -Q numerous -n djangito.%%h --without-gossip --without-mingle --without-heartbeat --loglevel=INFO --max-memory-per-child=512000 --concurrency=1
env_file:
- .env
volumes:
- .:/code
links:
- db
- redis
- rabbitmq
depends_on:
- db
- redis
- rabbitmq
celery-beat:
build: .
restart: always
command: celery -A djangito.celeryapp:app beat -S redbeat.RedBeatScheduler --loglevel=DEBUG --pidfile /tmp/celerybeat.pid
env_file:
- .env
volumes:
- .:/code
links:
- db
- redis
- rabbitmq
depends_on:
- db
- redis
- rabbitmq
Take a closer look at commands for celery-long
and celery-numerous
, the -Q parameter says which queue worker will consume tasks from. They should match those that we use in send_task
queue
parameter. If you don't have worker that consumes messages from the queue - they will pile up consuming space, until database will crash because of not enough disk space.
In order to changes have any effect you need to stop your docker-compose running by pressing CTRL-C and start it again with docker-compose up
.
Go to http://0.0.0.0:8060/, click on "Upload New File" link and upload the demo file that we have generated, click "Upload" button.
You will be redirected to the detail page for this upload. At first it will show status: Pending or Processing (depending on wether or not Celery task managed to start working already).
Then thanks to a little piece of JS code, that appears if processing is not finished, the page will reload and you will see all the contacts from the file.
At this stage the task of processing the file has finished, but tasks for checking domains' MX records are not.
If you refresh the page again you will see a different picture – the column "MX Found" is full of green and red icons.
Let's look in terminal where we have docker-compose up
running and make sure that messages went into right queues.
Latest log records will start from celery-numberous_1
, and it means that process_contact_mx_records
went into the numerous
queue.
If you scroll up and search for process_uploaded_file
you will see that it went to the long
queue and the container that runs it is called celery-long_1
.
Also, every minute our scheduled task should be seen in logs for updating the cache:
Let's go to the main page and see that we have data in our cache and the page is rendered using that cache.
Last step is to prepare for deployment.
Open the Procfile
in the root of the project and add two lines for our Celery workers. They will be used during deployment with Appliku to run separate workers.
workerlong: celery -A djangito.celeryapp:app long -Q default -n djangito.%%h --without-gossip --without-mingle --without-heartbeat --loglevel=INFO --max-memory-per-child=512000 --concurrency=1
workernumerous: celery -A djangito.celeryapp:app numerous -Q default -n djangito.%%h --without-gossip --without-mingle --without-heartbeat --loglevel=INFO --max-memory-per-child=512000 --concurrency=1
Hope this helped to understand how to work with multiple queues in Celery and why you might need it.