When running background jobs, for example with Celery, you might need to prioritize execution of some tasks over others.
Let's go with example that you have free and premium users and you want prioritize processing tasks for premium users over free users.
While Celery has some functionality for implementing prioritization it doesn't work with all brokers.
Also, I prefer solutions that are agnostic of specific tooling if possible, so let's discard that built-in feature right away.
The approach of priotizing tasks that I want to show will be agnostic of specific message brokers(RabbitMQ vs Redis, etc) and will also work with any other background tasks tools that support separate queues.
Model Structure and Sending a task with priority¶
Let's say you receive a request for a task from a user.
The task will be to generate a report, which takes a while to complete, thus the need for background task.
Report model will have a user foreign key.
The user model will have a flag "is_premium". if it is set to True, it means the user is premium, otherwise free. This will dictate the chosen priority.
Here will be our overall structure of models.
from celery import current_app
from django.db import transaction
class User(AbstractUser):
is_premium = models.BooleanField(default=False)
# ... the rest of the user model
class Report(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE)
is_complete = models.BooleanField() # this should be set to true when report is complete
content = models.TextField() # here goes result of our task
# ... The rest of the report model
def save(self, *args, **kwargs):
need_to_send_task = not self.pk
super().save(*args, **kwargs)
if need_to_send_task:
queue = "premium" if self.user.is_premium else "free"
transaction.on_commit(
lambda: current_app.send_task("generate_report", kwargs={"report_id": self.pk,}, queue=queue))
Please keep in mind that only fields that are necessary for the purpose of this article are listed here.
What important things you need to pay attention to: - when an instance of Report is being created we send a task to celery - before sending the task we pick a queue depending on the tier the user belongs to - we use transation.on_commit to make sure task is sent when the report record is written to DB, otherwise task can start executing before record is available in the DB outside the transaction.
Since Report model sends task via the save
method, we don't need to modify any views for that.
It can be called as easy as Report.objects.create(user=request.user)
or using CreateView
or CreateAPIView
Setting up Celery for prioritizing tasks¶
Since we rely on multiple queues we need to make celery aware of them.
We need to run 2 celery workers so that premium workload will not be affected by tasks from free users.
If you want to keep them 100% separate, then it the first option is very simple:
Prioritizing by complete isolation¶
You run 2 workers and assign them their own queues.
celery worker -A project.celeryapp:app -Q premium
celery worker -A project.celeryapp:app -Q free
(Please refer to the celery docs about this command, it may differ depending on version of Celery you are using).
In this example each worker will only process tasks from the queue assigned.
Not only it is simple, but you may also want to run premium users on more powerful hardware/server instance thus giving even more juice to premium users.
You can even add more concurrency to premium users by adding this argument: --concurrency=10
Prioritizing but partially mixing them¶
If you run them on the single server, or if the hardware is the same and it doesn't matter where to process premium tasks, then it might make sense to allow the worker for free tasks to also pick up premium tasks.
celery worker -A project.celeryapp:app -Q premium
celery worker -A project.celeryapp:app -Q free,premium
Since we are already using some RAM for free workers why not allow free
worker to also process premium tasks. This way if you don't have much of free workload it will help with premium tasks as well.
Conclusion¶
This approach is simple enough and it is agnostic of message broker, and even easily can be carried over to any other background task processing tooling.
Hope it was useful :)