Asynchronous processing of database events in a robust and lightweight manner using django-pgpubsub.

Paul Gilmartin
Level Up Coding
Published in
6 min readMay 13, 2022

--

A common pattern in modern web development is the requirement to process data asynchronously after some user action or database event. In this article, we’ll describe via a concrete example a traditional approach to solving this problem for a Django/Postgres based application using django signals and Celery. We’ll then proceed to discuss some of the shortcomings of this approach and demonstrate how using django-pgpubsub can offer a lightweight and more robust solution.

Easy: a platform for sharing and reviewing scientific articles

Easy is a platform on which users can write and post scientific articles and other users can leave comments on that article. The product owners at Easy want to introduce a new feature: whenever a comment is left on an article, the author of that article receives an email to let them know about the new comment.

At this point, Easy ’s technical infrastructure is that of a single django instance served by a single Postgres database. Given this information, let’s explore how to add support for this new requirement.

The Set-up

We have two simple Django models:

class Article(models.Model):
content = models.TextField()
author = models.ForeignKey(User)


class Comment(models.Model):
content = models.TextField()
article = models.ForeignKey(Article)
author = models.ForeignKey(User)

Whenever a user leaves a comment on a post, a Comment object is created referencing the logged in user and the Article instance on which the comment was left. Thus, to solve the new requirement, we need to find a way to send an email to the correct Userwhenever a Comment is object is inserted in to the database.

Signals

Django’s Signals allow us to invoke a callback when an action occurs. In our case, the action is the creation of a comment and the callback is a function to email the author of the article on which the comment was left. Django’s built-in post_save signal is perfect for this:

from django.db.models.signals import post_save
from django.dispatch import receiver
from email_utils import email


@receiver(post_save, sender=Comment)
def email_article_author(sender, instance, **kwargs):
article_author = instance.article.author
email(article_author)

With this simple addition, we have actually already designed a solution the new requirement. This solution, however, has one major flaw: the email is sent in the same thread as the comment creation. This means that the user who commented needs to wait for the email to send before they can be notified that their comment has been saved. Moreover, it introduces another point of failure when creating comments: if comments happen to be created in an atomic transaction, then any error encountered when attempting to send the email will rollback the creation of the comment.

Emailing Asynchronously Using Celery

We need to move the email logic out of the user’s thread and have it happen asynchronously. One could achieve this using python Thread objects, but that will quickly eat up resources. A common alternative is to use some sort of distributed message processing framework, the most popular choice for django applications being Celery. Celery allows us to define a secondary process dedicated to acting on messages it receives from the user’s thread. We won’t go in to the implementation in detail here, but it could look something like this:

# tasks.py
app = Celery('tasks', broker='rabbitmq')

@app.task
def email_user(user):
email(user)

# signals.py
@receiver(post_save, sender=Comment)
def email_article_author(sender, instance, **kwargs):
article_author = instance.article.author
# send a message to email_user task running in another process
email_user.delay(article_author)

With this solution in place, our email logic is now decoupled from comment creation logic, solving the aforementioned issues with the synchronous solution. The are however still some serious flaws with the above solution:

  • Signals can be missed: A post_save signal will always be called after a regular Comment.objects.create call. It will not however be called when we create in bulk via Comment.objects.bulk_create . Thus, if at some point Easy needed to create comments in bulk (perhaps for example they wished to leave automated comments on articles which have been inactive for a certain period of time), our solution would mean those comments were created without sending a mail to the author of the articles.
  • Communicating with celery adds another point of failure: whilst we removed emailing as a point of failure when adding a comment, we introduced communicating with celery (or rather the broker) as one.
  • Adding a Celery and a broker is operationally heavy: setting up and maintaining the celery framework can be operationally expensive. It may be considered excessive for such a simple task as sending an email.
  • Communicating with Celery is not atomic with respect to the comment transaction: if our comment is created in an atomic transaction and that transaction happens to rollback after our email_user task is asynchronously invoked, we end up in a scenario where we’ve emailed the author when there is no new comment on their article. This however can be solved using Django’s transaction.on_commit function, but it’s very easy to forget to include this.

Emailing Asynchronously using django-pgpubsub

Let’s now explore how Easy could have added this new feature using django-pgpubsub(pgpubsub), which provides a framework for building an asynchronous and distributed message processing network on top of a Django application using a PostgreSQL database. This is achieved by leveraging Postgres' LISTEN/NOTIFY protocol to build a message queue at the database layer.

Since Easy are already running Django on Postgres, there is no additional operational work required to start using pgpubsub (after installing via pip install django-pgpubsub ). This means we can get straight in to writing the business logic (for a more detailed explanation of the objects and terminology used here, see the docs):

import pgpubsub

# channels.py
@dataclass
class CommentChannel(pgpubsub.TriggerChannel):
model = Comment
lockable_notifications = True

# listeners.py
@pgpubsub.post_insert_listener(CommentChannel)
def email_user(old: Comment, new: Comment):
email(comment.article.author)

Note that have now ditched our post_save signal from the previous solution. As we pointed out, signals can easily be missed (e.g. by a bulk_create ). Instead, we are using a Postgres trigger : defining the listener as it is above makes use of the django-pgtrigger library to write a Postgres trigger to our database, the job of which is to notify our channel whenever a Comment is inserted in to the database. Triggers are far more robust than signals for detecting database write events; application level triggers can easily be missed, whereas triggers will always be executed.

The above is all the code we need for our desired functionality. Now all we need to do is start our listening process, the job of which is to listen to notifications coming from our trigger via our CommentChannel channel and hand them off to be processed by our email_user listener function. The devs at Easy decide they would like to dedicate two of the server’s processes to listening for and processing emails. This is achieved via the command

./manage.py listen --processes 2

which uses python’s multiprocessing library to spin up two processes dedicated to listening for notifications and sending emails.

Some highlights of this solution:

  • We can be sure django-pgpubsub’s exactly once messaging capabilities will mean we can have two processes listening to the same channel in parallel without the fear that the same email will ever be sent twice.
  • Since the Postgres NOTIFY protocol respects the atomicity of the transaction in which it is invoked, an email will be sent if and only if the a Comment is successfully saved in the database.
  • The built in recovery option allows us to replay later any emails which failed to be sent.
  • We have not introduced another point of failure or technology; we’re still using only Django and Postgres.
  • As previously explained, we’re no longer have to worry about potentially missing signals as we’re using a Postgres trigger instead.

For more of my work see https://github.com/PaulGilmartin/.

--

--