Background tasks with Celery without pain

by Published: Last updated: June 23rd, 2020

Almost every application needs to run certain tasks outside of the request-response cycle.

Simplest examples can be sending an email, sending a request to an external service, processing large amounts of data.

When working on Python/Django apps the most reliable and popular choice is to use Celery.

Some might say that Celery is too much for small projects or some simple library would be a better choice.

After 12+ years of building apps and sites with Django I made the following conclusions:

- Projects tend to evolve: What was simple, became complex and now screams for a better solution. Not only you will have to re-implement the existing code with a more powerful tool, but also spend time on the migration process.

- The amount of data and number of visitors grow, so your app must be ready for this.

- Simple tools may be a fit for the current stage of a particular project, but they still require learning how to use them, wasting your precious developer's time that can be spent on building actual product features.

- Using different tools on different projects creates more mental overhead when jumping between projects. And if you have to hire an additional developer they will have to learn several tools for a single task.

My choice is to have Celery in all projects by default

Benefits of this approach:
 - Similar setup across all projects.

 - Always ready to start using background tasks and not thinking of the "stage" of the project.

 - No different tools across several projects: always the same setup, no additional time needed to acquire the required knowledge.

 - No time wasted on deciding what tools to pick for the next project.

Celery is very a powerful tool.

This is a highly practical and very opinionated tutorial on how to use Celery effectively and without shooting yourself in the foot.

Not only Celery became my default tool of choice when it comes to running background tasks, but I also have made choices on how to use certain features of it.

First, in this tutorial, I will talk briefly about each important part/feature of Celery and the best way to use it.

The second part of the tutorial will be writing an app with Celery tasks.

The knowledge I share here is based solely on my experience and your mileage may vary.

Celery tips

Always use the name argument in @shared_task decorator

To avoid confusion with how Celery automatically detects tasks and names them I found that always using the name decorator in @shared_task decorator is a good practice. Not only you don't need to think of how to reference your task, but it will remain the same if you refactor your code in the future and move functions around.

Also, I recommend adding a prefix to tasks to group them by purpose. When you started using the name argument you gave up dotted names for tasks. To avoid a mess when the number of tasks grows – you should pick some prefixes. It is also useful in case when you add another (micro)service (another app) that uses the same Celery instance.

Let's have an example here.

Here we add a task to send an email:

@shared_task("main_send_email")
def send_email(to, subject, content):
    pass

Here, in another module or even project we create a task for sending telegram notifications:

@shared_task("tg_notify_user")
def notify_user_by_id(user_id, message):
    pass

Use current_app.send_task instead of apply_async or delay

If we check StackOverflow, we can find that a very common issue for new Celery users is circular imports. It happens when tasks.py imports something from models.py and at the same time, developers want to call Celery tasks from tasks.py.

People share a lot of tricks to work around this problem.

I found that the easiest way is to stay away from importing tasks completely and use current_app.send_task instead.

In this example, in models.py we call a task "main_send_email" without importing it.

from celery import current_app
class Order(models.Model):
    # ...
    def notify_buyer(self):
        current_app.send_task(
            "main_send_email",
            args=(self.user.email, "Your order", "Is created", ),
            queue="default",
        )

In this case, you don't have to think how to import it, thanks to our naming convention – you know exactly how this task is called.

Bonus points: You can decouple this "main_send_email" task into another project/service and it will keep working.

Downsides:

This convenience comes with a price.

Your IDE will not be able to help you with argument names and the name of the function if you have a typo there.

When you run tests setting CELERY_ALWAYS_EAGER will not get your task called.

So it gives you the advantage to avoid tasks import, but you have to be extra careful with making the call right.

If you fail here, you will only learn about the problem from celery worker logs.

Explicitly specify the queue for the task

I noticed that you have several queues and you call a task without specifying the queue – it doesn't get executed.

current_app.send_task(
    "main_send_email",
    args=(self.user.email, "Your order", "Is created", ),
    queue="default",
)

Ignore result unless you really need it

I found that I hardly ever need a result from the task. But RESULT_BACKEND is hit every time you send a task.

I learned it the hard way when some tasks started failing due to connection failure to Redis.

So to avoid unnecessary connections to Redis add ignore_result=True when sending a task.

current_app.send_task(
    "main_send_email",
    args=(self.user.email, "Your order", "Is created", ),
    queue="default",
    ignore_result=True,
)

Don't rely on RESULT_BACKEND to retrieve the task result

When you need to receive results for a task Celery gives you a way to retrieve it via RESULT_BACKEND.

It is useful when you are chaining tasks when one task needs to use the result of the previous task as its argument.

In most cases, I found that it is not needed, and trying to remember what code is needed for retrieving the result is not worth the effort.

Let's go over an example.

Imagine, that you need to retrieve information about an object, which ID you know from a 3rd party service.

It is better done outside the request-response cycle because it can take a while to finish or can fail from the first try.

One option is to store the task ID and check if it has finished and then retrieve the result.

An easier way might be to check the existence of this information in a cache.

result = cache.get(f"server_{server_id}")
if not result:
    return "Retrieving the information, please wait"
return result

If it is empty, the task hasn't finished and we don't have information.

Task connects to the 3rd party service, retrieves information and stores it in the cache with a reasonable TTL like this:

cache.set(f"server_{server_id}", value_to_store, 60 * 5)

Use locking to avoid race condition

The most common case when I needed locking was scheduled task that runs every several minutes and it could take a long time to complete – longer than the interval between tasks.

When a task takes longer to finish than interval, they overlap and execute in parallel.

Most of the time this is an undesirable situation that can lead to bad things and system overload.

For locking, I use django-redis https://github.com/jazzband/django-redis#locks

Let's say you run a task every 10 minutes to prepare a report on data.

Task usually takes a couple of minutes to finish but can take longer if you have an unusually big amount of data to process.

You also want the lock to expire eventually if a celery worker executing that task silently went down without releasing the lock.

Also, you want to avoid unhandled exceptions in Sentry every now and then when tasks tried to overlap, but locking prevented that.

And to make it even more interesting, let's say we run a report generation on some piece of data that is related to an object ID of some kind. We need it for the sake of illustrating more complex locking.

Here is an example of a task with locking, meeting our requirements:

from django.core.cache import cache
@shared_task(name="main_prepare_a_massive_report")
def prepare_a_massive_report(data_id):
    try:
        with cache.lock(
            f"lock_report_{data_id}",  # use data_id in lock key to have per object task locking
            timeout=20*60,  # allow it to keep lock up to 20 minutes
            blocking_timeout=1,  # another task will wait and try to acquire the lock for 1 second
        ):
            prepare_report(data_id)
    except LockError as err:
        print(f"prepare_a_massive_report exception: {err}")

Of course, tasks will not be killed after 20 minutes if it is still running, and you might need to take additional steps to prevent it from running longer.

But if the worker silently died without releasing the lock, other tasks will not be able to execute for only up to 20 minutes.

Separate celery task from the actual logic

Now that you see how much code is needed for a Celery task here is the advice: make a separate file where you have your actual logic for the file.

Just like you saw in the example with locking, it calls "prepare_report(data_id)".

Not only it is good to prevent your tasks.py grow in the number of lines.

Keeping actual logic separate from the Celery task overhead helps writing tests for that logic.

You don't want to deal with locks, timeouts, and so on while testing the primary logic.

 

 


Recent posts

How to Deploy Django Application from GitHub Repository to EC2 With SSL


How to deploy your app without pain on EC2 with Appliku