Loading...

What is Appliku?

Icon
Simplest way to deploy Python/Django apps

Push code to Git repo, Appliku will build & deploy the app to your cloud servers.

Learn more .

Start Deploying
Icon
SpeedPy.com
Django SaaS Project Template

Start building your app right from what makes your app unique

Get SpeedPy.Com
Icon
Appliku SaaS Discord Community

The place where you can talk to like minded individuals who are at different stages of building their SaaS or other apps.

Join Community

Celery shared_task

Share post:

The "shared_task" decorator allows creation of Celery tasks for reusable apps as it doesn't need the instance of the Celery app.

It is also easier way to define a task as you don't need to import the Celery app instance.

shared_task

Shared task has a lot of parameters and you have to decide which one to use depending on type of task.

Some things to consider:

  • is it okay if task is being executed multiple times in certain conditions or should it run only once and risk not being executed?
  • should task retry, in which cases and how many times?
  • do you need to store result of the task?
  • Do you need access to task status outside the task?
  • Do you need to know if the task actually started?

Answering these questions will help you figure out the proper set of parameters you need to pass to @shared_task decorator.

One parameter I recommend to have always set is the explicit name for the task.

Naming advice is to have some prefix for module or purpose of the task so that very common name will not clash between different parts of the app or with other libraries.

# our_api/tasks.py
@shared_task(name="api_check_availability")
def check_availability():
    pass

This task can be called via import like this:

# our_api.tasks.py
from our_api.tasks import check_availability

check_availability.delay()
check_availability.apply_async()

If you are accessing it from models.py and tasks.py is importing anything from models – you will get a circular import error and tasks will not get initialized. Also if you are sending task from a totally different app, working with the same Celery instance – you can't use import and thus can't do .delay or .apply_async

For this case you can call the task by name.

Let's see the example for the calling task from within the models.py

# our_api/models.py
from celery import current_app
from django.db import transaction

class Post(models.Model):
    title = models.CharField(max_length=255)

    def generate_og_images(self):
        pass

    def save(self, *args, **kwargs):
        super().save(*args, **kwargs)
        transaction.on_commit(lambda: current_app.send_task("content_generate_og_images", kwargs={"post_pk": self.pk})



# our_api/tasks.py

from our_api.models import Post


@shared_task(name="content_generate_og_images")
def content_generate_og_images(post_pk):
    try:
            post = Post.objects.get(pk=post_pk)
    except Post.DoesnNotExist():
            return False

In this case we are importing the Post model from tasks. We can't import task from models to use .delay() or .apply_async() because this would lead to circular import, so we are just sending the task by name.

We will explore what arguments to set for each typical case.

The simplest shared_task

If you have a task which result you don't really care about and you would love to reduce the performance impact of running it you can define it like this:

@shared_task(
    name="simple_task",
        ignore_result=True)
def simple_task():
    pass

For such task you can't have retry strategy, since it is doesn't have bind=True, and thanks to ignore_result=True it will not even connect to the result backend since it is not needed.

shared_task with retry policy

This example is suitable for calling an external API. Since API can be down it is a good idea to retry it.

Also, this should be an idempotent task, like read operation. This implies that we are okay with potentially multiple calls to that API in case of failure on either side.

@shared_task(
    name="read_from_external_api",
    bind=True,
    acks_late=True,
    autoretry_for=(Exception,),
    max_retries=5,
    retry_backoff=True,
    retry_backoff_max=500,
    retry_jitter=True)
def read_from_external_api(url):
    result = requests.get(url)
        return result.json()

Here we have acks_late=True in case if task fails due to worker failure, retry on any Exception and retry up to 5 times.

shared_task with that must run at once at most

In this example we are sending a request that is very costly or for some other reason absolutely must not happen more than once.

@shared_task(
    name="expensive_api_call",
    bind=True,
    acks_late=False,)
def read_from_external_api(url):
    result = requests.post(url, {})
        return result.json()

Worker will acknoledge the task before completing it, so if the task fails due to worker failure, the request will not happen again. If the call to the API (the objective of the task) still needs to happen it should be done by other means, maybe manually by the user.

Share post:
Top