Understanding Message Queue using Python RQ
Introduction
In this article, you will learn about the importance of message queue systems, when and why you should use them, and how to integrate them into your existing Django application. You will explore practical scenarios where message queues can enhance performance and reliability, compare popular libraries such as RQ and Celery, and get a step-by-step guide for implementation.
Problem Statement and Solution
You own a popular pizza shop with a long queue of customers waiting for their turn. The main reason for this queue is the pizza-making time.
Similarly, servers experience heavy loads when handling time-intensive code. A better way to manage this is by processing tasks asynchronously or using a message queue system.
A message queue system is a robust solution for handling such scenarios. It involves producers who produce messages, brokers who manage message delivery, and consumers who process the message. This system ensures that tasks are handled efficiently and reliably, even under heavy loads, by distributing the workload across multiple consumers and enabling retries in case of failures.
To reduce the waiting time at your pizza shop, there are multiple solutions:
- Increase the number of chefs (workers) to handle multiple orders simultaneously.
- Increase the number of shops to distribute customers among different shops.
- Give tokens to customers so they can sit down after placing their order and collect their pizza once it’s ready.
When & Why to use the message queue system?
It can be used in heavy computing tasks that can hamper your main application thread as follows:
- Sending bulk emails: Let’s say you want to mail a document to multiple users, here sending the mail is a time-intensive task letting the user wait until all mails are delivered will hamper the user experience.
- Report generation: Report generation is an extensive process since it involves aggregating huge amounts of data, instead of making users wait you can mail them the report once it is generated.
- Data processing: In cases of huge data processing, the load on your main application thread will hamper the experience of other users.
- Running Machine Learning (ML) models: In the case of multiple people sharing the same resource, queueing each request is the ideal way to allow users to track their status.
- PDF Generation: This feature is frequently used and may impact server health due to the expensive operation of generating PDFs when there are multiple simultaneous requests.
Introducing queue based system will help in the following ways:
- The above requests can be handled in the background or with a separate microservice, which will help reduce the load from the main application thread.
- Easy to scale, you can increase the number of worker threads accordingly.
- This allows you to decouple task generation and processing logic.
- Ensures reliability with features like retries and dead letter queue (to store failed jobs) ensuring data is persisted in case of any failure.
What is a message queue system?
It can be broken into 3 major parts:
- Producer: Its responsibility is to produce messages/tasks like sending a mail or generating a report and enqueue them in a message queue.
- Broker: It ensures that the message is delivered to the consumer for it to process, it stores the message until received by the consumer. It is based on a priority queue with each message having its priority and in case of the same priority, it follows FIFO order. (Eg: Redis, RabbitMQ, AWS SQS, etc.).
- Consumer: It consumes the message and processes the task mentioned. If multiple consumers are listening to the same queue, they share the load using Round Robin / Weighted Load Balancing technique. It can either be a background thread or a separate microservice.
Example: OTT platforms send subscription renewal reminder emails. Sending bulk emails simultaneously can affect server performance to avoid it. A producer gets a list of email IDs with reminder messages and enqueues in the message broker when an email microservice can pick from that queue and send emails individually.
In case of an increase in load for consumers, we can increase the number of consumers by easily increasing the number of worker threads (OS Threads) and even if that is not sufficient multiple instances of consumers can be hosted.
Comparing Message Queue Libraries (RQ v/s Celery)
Comparing popular Python message queue libraries RQ and Celery
Integrate into your existing Django Application
We will enqueue all orders in ORDER_QUEUE and then we can run multiple workers to process these orders simultaneously (using round-robin algo). It is a better way than letting the main application thread handle all orders.
1. Install message queue library python-rq, celery (we will use python-rq)
pip install rq
2. Add Redis and Queue configurations to application settings so they can be initialised while initialising the Django application
app/settings.py
---------------------------
from redis import Redis
from rq import Queue
# Redis configs
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/"
# Redis Queue (Message Broker)
ORDER_QUEUE_NAME = "order_queue"
ORDER_QUEUE = Queue(ORDER_QUEUE_NAME, connection=Redis(REDIS_HOST, REDIS_PORT))
QUEUES = [ORDER_QUEUE_NAME]
3. Add a function to enqueue jobs in ORDER_QUEUE, enqueue takes f argument as a reference to function and kwargs with arguments passed while calling create_order. If an exception is raised while executing create_order then the worker moves these jobs to the scheduler queue(if retry is configured else they are moved to the failed state) which reschedules these jobs as per their execution time. After maximum retries, if it still throws an error it is moved to a failed state. In case of successful execution, the job is moved to a finished state
viewset.py
----------------------
class OrderViewSet(viewsets.ViewSet):
@action(detail=False, methods=["post"], url_path="bulk-create")
def process_bulk_order(self, request):
order_cnt = request.data["order_cnt"]
for order_id in range(order_cnt):
OrderService.create_order(order_id)
return Response()
@action(detail=False, methods=["post"], url_path="enqueue")
def enqueue_order(self, request):
order_cnt = request.data["order_cnt"]
for order_id in range(order_cnt):
ORDER_QUEUE.enqueue(
f=self.create_order,
order_id=order_id,
retry=Retry(max=3, interval=[10, 30, 60]),
on_failure=self.report_failure,
)
return Response()
@staticmethod
def create_order(order_id):
#Method executed by the worker
pass
@staticmethod
def report_failure(
job: Job,
connection: Redis,
exc_type: Type[Exception],
exc_value: Exception,
traceback: traceback,
) -> None:
#Custom logic to handle the failed job
pass
Here there are 2 endpoints, bulk-create which linearly process all orders and have a time complexity of O(order_count * time_to_create_each_order) whereas the other endpoint enqueue just enqueues order to the message broker and has a time complexity of O(order_count).
4. Run workers to process the enqueued jobs, -c app.settings ensures that while initialising worker thread this configuration will be used, — with-scheduler is used to process scheduled jobs (RQ’s Retry object reschedules failed jobs).
Multiple workers can be run simultaneously, (can be managed by supervisor).
rq worker -c app.settings --with-scheduler
5. We can monitor the above message queue system using rq-dashboard.
pip install rq-dashboard
rq-dashboard
Find the entire implementation of the above example at this Github repo: https://github.com/garvitgupta13/pizzeria