If your app makes calls to 3rd party API and uses Celery tasks for that you might want to have some rate limiting.

Another case when you might need rate limiting is a set business rules that require to rate limit calls to an external service.

Celery doesn't provide any good solution for global rate limits for tasks of certain type.

There are hacks like using a single queue, but it is still hard to manage and it depends on the infrastructure.

In this article I want to show how you can manage rate limiting according to your business rules and leave Celery to be just an execution layer.

This article is not a tutorial that will end with a project that you can look on, but rather a guide how to build the solution for rate limiting.

Objective

Illustrate building an abstraction layer on top of calling Celery tasks which will manage sending tasks with API calls according to business rules.

I will show how to build database based scheduler that will send celery tasks when the time is right with help of a scheduled celery task.

And then I will show what advanced business rules you can implement when you have such layer in place.

Project Setup

Setting up Django and Celery project is outside the scope of the project.

I suggest using SpeedPy.Com Django SaaS Boilerplate in order to quickly start the project.

General approach to scheduling

The approach I want to show boils down to not using Celery to do the rate limiting.

Celery will only perform background task execution by queues and running scheduled tasks.

The scheduling part will be done using Django models and some Python code that will fetch the list of API calls to make, do some filtering according to business rules and send what is needed to Celery.

API call

Let's say we are making calls to some third party API.

Such an API can have rate limits of two types: - per IP address - per account used to access API.

If we have only one Celery worker, or multiple, but running on a single machine then we should treat limiting by IP address as a global limit.

If we need to circumvent the limit by IP then we should put workers on different machines and assign them to separate queues so scheduler can distribute tasks across queues.

Task Dispatcher

To avoid ambiguity let's call the part responsible for business rules and rate limiting – a Task Dispatcher.

How the Task Dispatcher works:

  • receives a task that should be executed
  • records it to a DB model with arguments for an API call
  • on a set schedule (once a minute/hours/day) fetches a predefined number of records(under the API rate limit) and sends Celery tasks to make calls in background

If Celery had a solution for rate limiting

If there were a good tool/solution to make it work in Celery out of the box or with additional package AND you don't have any more business logic then it would be great to use those tools.

Simple way to do implement it with only Celery is to have a cache key that counts the number of calls and is reset every minute. When tasks execute they check if they are under the rate limit (a number of executions in cache < threshold) it executes, if more - task quits and retries with a cooldown. I don't like this solution because Celery workers will be busy as crazy grabbing tasks from queue just to requeue it again all the time while threshold is reached. In addition to that, if you are using managed queue database (for example, SQS) you are charged for its usage. So this solution will increased your infrastructure costs through the roof.

How the Task Dispatcher works in detail

The option with making your own task dispatcher is longer, but there are advantages: - you can have a lot of flexibility in rules(more granular rate limiting) - you don't send tasks to Celery until they are ready to be executed - you don't overload workers - compared to rate limiting on workers per specific Celery queue - you don't depend on infrastructure and number of running workers

Let's get into more detail.

Call Queue Model

First you will need a Django model to store the queue of tasks. Let's call it CallQueue and it can look like this:

# models.py
from collections import namedtuple
from .managers import CallQueueManager


CALL_STATUSES = namedtuple('CALL_STATUSES', 'new dispatched error done')._make(range(4))

class CallQueue(models.Model):
    CALL_STATUS_CHOICES = (
            (CALL_STATUSES.new, 'New'),
            (CALL_STATUSES.dispatched, 'Dispatched'),
            (CALL_STATUSES.done, 'Done'),
            (CALL_STATUSES.error, 'Error'),
        )
    created_dt = models.DateTimeField(auto_now_add=True, db_index=True)
        status = models.IntegerField(choices=CALL_STATUS_CHOICES, default=CALL_STATUSES.new, db_index=True)
        dispatched_at_dt = models.DateTimeField(blank=True, null=True, db_index=True)
        finished_at_dt = models.DateTimeField(blank=True, null=True, db_index=True) # date & time when task was processed, in any status
        call_data = models.JSONField(default=dict)
        failed_count = models.IntegerField(default=0, db_index=True)
        error_message = models.TextField(blank=True, null=True)
        objects = CallQueueManager()

    class Meta:
            ordering = ('created_dt', )
# managers.py

from django.contrib.auth.base_user import BaseUserManager
from django.db import models
import arrow

class CallQueueManager(models.Manager):
    def send(self, call_data):
            # here is a great place to process the call and set additional fields, like queue, priority, etc (not covered in this code example), or even discard the call at completely.
            return self.create(call_data=call_data)

    def under_threshold(self):
            """
                Checks if in the last X minutes we have sent less task than the threshold.
                It is best to move the numbers into your project settings.
                """
                threshold_dt = arrow.now().shift(minutes=-60).datetime
                threshold = 100 # API rate limit 100 requests per hour
                return self.filter(dispatched_at_dt__gte=threshold_dt).count() < threshold

Send a call to the Call Queue

This structure allows you to queue the call for third party API by running this:

CallQueue.objects.send(call_data={"method": "send_email", params:{"to": 'user@example.com', 'subject': 'Welcome', 'message': 'Welcome to our app!'}}) # send an email
CallQueue.objects.send(call_data={"method": "post", params:{"url": 'https://example.com',  'data': {'some_field':'some_value'}}}) # send a API call

Dispensener task fetches tasks in the Call Queue

In order to process those tasks create a dispensener celery task that we'll be scheduled to run every minute.

It will fetch enough tasks from the CallQueue while also checking how many were sent recently. This makes it safe to call it as often as you want and not break the logic.

It might even be a better idea to call it more often than the rate limit reset cycle. E.g. if API rate limit is per day or per hour, you should still run dispensener more often, like every minute, to grab new tasks as they come and it still will send only those that fit into rate limit taking into account those that were already sent in previous invocations.

from celery import current_app
import arrow


@shared_task()
def dispensener():
    qs = CallQueue.objects.filter(status=CALL_STATUSES.new)


        for call_record in qs.iterator():  # this is useful for performance if you have a lot of the records in the queue

            if not CallQueue.objects.under_threshold():
                    break
        current_app.send_task("make_api_call", kwargs={"call_queue_id": call_record.id, call_data": call_record.call_data})
                call_record.status = CALL_STATUSES.dispatched
                call_record.dispatched_at_dt = arrow.now().datetime
                call_record.save(update_fields=['status','dispatched_at_dt']

In this task we check if we are under threshold before sending each task to celery. Here can be a starting point for performance optimization if you ever need it, because this implementation will hit DB on every check. An improvement can be made if you take the number of how many requests can we make until meeting threshold and use that number in the for loop to compare to the counter within the loop.

If we are under the threshold, it means we can send the task so we send it to a celery task named make_api_call with kwargs with the call_data. You can also add the celery queue argument to send_task, which can be either static predefined queue for this kind of tasks or it can change depending on rules. An example can be sending certain types of requests from different worker on different machine if that machine's IP address is whitelisted in the API provider.

Retry failed calls

Sometimes API calls can still fail, and you might want to retry them.

In this case you can use failed_count field and retry them for a set number of times.

You need to pick them up in the CallQueue QuerySet.

from celery import current_app
from django.db.models import Q
import arrow


@shared_task()
def dispensener():
    qs = CallQueue.objects.filter(Q(status=CALL_STATUSES.new) | Q(status=CALL_STATUSES.error, failed_count__lt=3))
        # ...

This will pick up tasks that are new or were previously failed less than 3 times.

IMPORTANT: we can't use the celery functionality for retries, because in this case we won't be able to keep track of number of calls. Celery-based retries will count toward API rate limit, but we won't see those calls in CallQueue. As a result we will hit the rate limit threshold, but will still send tasks.

Celery task for the Call Queue

Besides doing the actuall API call, the task should must also do the error handling and save the call queue status.

import arrow
import requests

@shared_task(name='maki_api_call')
def make_api_call(call_queue_id, call_data):
    call_record = CallQueue.objects.get(pk=call_queue_id)
    try:
        getattr(requests, call_data['method'])(**call_data['data'])
        call_record.status = CALL_STATUSES.done
        call_record.finished_at_dt = arrow.now().datetime
        call_record.save(update_fields=['status','finished_at_dt'])
    except Exception as e:
        call_record.status = CALL_STATUSES.error
        call_record.finished_at_dt = arrow.now().datetime
        call_record.error_message = str(e)
        call_record.save(update_fields=['status','finished_at_dt','error_message'])

Clean up the call queue

You might want to have some retention window for these tasks, so that your database don't grow crazy.

Tasks that are outside the retention window, like processed tasks 10 days old, should be deleted.

In order to do this let's make another task, that we will run daily or weekly. Best to schedule it at a time when the app usage is at its lowest.

import arrow


@shared_task(name='cleanup_call_queue`)
def cleanup_call_queue():
    retention_threshold_dt = arrow.now().shift(days=-7).datetime # set the date time threshold for a week ago

        CallQueue.objects.filter(dispatched_at_dt__lte=retention_threshold_dt, status__in=[CALL_STATUSES.done, CALL_STATUSES.error]).delete()

If you are using Postgres as your database you also need to be aware of how postgres works with tables with heavey UPDATE/DELETE operations. Postgres doesn't delete data from the disk right away on DELETE operations, it just marks them as deleted. Also, UPDATE operations don't update the data, they mark previous piece of data as deleted and add a new up to date piece of data.

So if a table has a lot of update/delete operations it is your duty to run VACUUM command every now and then to actually free up the space. This will also help Postgres with performance as it will not need to go through deleted records when running queries.

I would suggest running VACUUM FULL every now and then, but keep in mind this operation blocks the database until the operation is complete and depending on the amount of data you have it can take from few seconds to several minutes of downtime.

For example, at 24 GB of data VACUUM FULL took 7 minutes, but it freed up half of that space, but database performance after the cleaning was way better :)

Conclusion

The approach described here I've used on several projects.

The biggest one had close up to a million calls per day queued and processed by a dozen or two of business rules and sent tasks to 24 different celery queues.

Postgres database grew in size up to 24Gb and due to spikes in sent Celery tasks we had to upgrade the server running our RabbitMQ instance.

So while critics might say that building such queue in the DB will be a performance disaster, that queues are done with RabbitMQ or similar tools for a reason, in reality this approach should serve you well for a long time and serve a lot of tasks for you :)

You can add additional columns to the CallQueue and use the data to prioritize tasks, schedule them between queues and hosts, use different services to call to, or even make rate limiting per customer.

If you don't want to learn DevOps and want to focus on building your app, sign up for Appliku and it will take care about your servers, web server configuration and building & deploying your apps: https://app.appliku.com/