Simple Django Background Task Management


Simple Django Background Task Management

I’ve been coding Django since v1.2 (no the dinosaurs were not still around) and Celery has been around for about as long for us to shove tasks to the background. It’s just, well, emm. I don’t like Celery. It seems to make life a little difficult. I think some libraries and applications try to put a buffer between you and anything complex and this results in a more complex solution.

In a previous post, I talked about message brokers and what they can do. In this article, I’m going to show how you can use the message broker directly, thus making background tasks make more sense to you and in many ways easier.

Workers

If you want tasks to run outside of Django, in the background, then they need to be run on a process, preferably a python one, and maybe with access to the ORM and Django’s settings.

A Worker is just a process that waits around for a message, performs a task and then goes back to waiting.

Here is an outline of a worker that will be able to access Django and the ORM

import os
import django
import time

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
django.setup()



while True:
try:
# your task
except Exception as e:
# log exceptions and restart
time.sleep(30)

You can start the worker in Ubuntu Linux type systems by setting up a service in /etc/systemd/system/ looking like this

[Unit]
Description=worker-one
After=network.target

[Service]
User=webapps
Group=webapps
WorkingDirectory=/path/to/your/application
ExecStart=/path/to/your/venv/bin/python -m /path/to/your/worker/worker_one.py

[Install]
WantedBy=multi-user.target

All of this is a pretty broad explanation, but from this you can see that its quite simple really to create and start python processes to run your tasks. You can create one type to run all tasks, or different types to run different tasks. So you can create a a worker only to send emails, with a different worker to send SMS messages.

You can create workers that run on different machines with no access to Django using one coded like this.

import time


while True:
try:
# your task
except Exception as e:
# log exceptions and restart
time.sleep(30)

Getting your Worker to listen for Tasks

To distribute task messages, I’m using RabbitMQ. Unless you want to go real big on messages, this is the best choice. There are a few choices on python libraries to work with RabbitMQ. Pika is the default, but I’m going async so its aio-pika.

import asyncio
import aio_pika
import json
import os
import django
import time

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_application.settings')
django.setup()



async def process_message(message: aio_pika.abc.AbstractIncomingMessage) -> None:
async with message.process():
msg = message.body.decode()
data = json.loads(msg)
print(data)
await asyncio.sleep(1)


async def main() -> None:
conn = await aio_pika.connect_robust("amqp://guest:guest@127.0.0.1/")

queue = await channel.declare_queue()
exchange = await channel.declare_exchange("my_exchange", type="topic")
await queue.bind(exchange, routing_key="default")
await queue.consume(process_message)

try:
await asyncio.Future()
finally:
await conn.close()


while True:
try:
asyncio.run(main())
except Exception as e:
# log exceptions and restart
time.sleep(30)

main() is started by asyncio.run(). This creates an event loop into which main() is started. await queue.consume(process_message) defines the function to run when a message comes in. With this, I’m just printing it.

Also note that with this code, the messages are automatically acknowledged. See my previous post on what that means, but in most cases, you will not want to acknowledge until the message processing is completed.

Thats it. Your simple worker will sit there and wait for messages.

Publishing messages

Getting a Django view to publish a message is a little more complex when using aio-pika since its needs to run in an async coroutine.

Create a function somewhere in you django project looking like this

import aio_pika

async def publish_task(message, routing_key="default") -> None:
conn = await aio_pika.connect_robust("amqp://guest:guest@127.0.0.1/")

async with conn:

channel = await conn.channel()
exchange = await channel.declare_exchange("my_exchange", type="topic")
await exchange.publish(aio_pika.Message(body=message), routing_key=routing_key)

The message passed must be a string. if you have more complex data, serialize with json to send. The routing_key and exchange name will need to match the ones set in your worker. You don’t need lots of exchanges, but if you want your workers to process specific tasks, assign them to a specific routing key and publish to that key.

Now in your Django View, just add

asyncio.run(publish_task(message, routing_key))

That simple line will publish a message an then the right workers will pick up the message and process them.

Why do I like doing Task Management Like This.

For me, I like libraries that help me do a task, not take over it. Using this, I can start a few or many workers. I can distribute tasks across multiple instances. I like the separation between the code bases as well. The Task to send emails had no real need to be part of Django. Django just needs to say, send this, and forget about it.

You can also have better performance if you play with the worker, and get the main() function to start multiple coroutine workers(async tasks), allowing them to process tasks even faster. Thus if you have lots of emails being sent out, with lots of IO times waiting for responses from servers, its really easy to create a worker that can deal with those.

The same route will also work for Flask, FastAPI and all other web frameworks