RabbitMQ Delay Scheduler and TTL Implementation
This document outlines the steps and concepts involved in implementing a delay scheduler and Time-To-Live (TTL) for messages in RabbitMQ. By the end of this guide, you will understand how to configure RabbitMQ to handle delayed message scheduling and TTL effectively.
Prerequisites
RabbitMQ Installed:
Ensure RabbitMQ is installed locally or running as a Docker container.
Enable the RabbitMQ Management Plugin for monitoring and debugging.
RabbitMQ Dockerfile (for custom plugins):
FROM rabbitmq:management COPY ./rabbitmq_delayed_message_exchange-4.0.2.ez /opt/rabbitmq/plugins/ RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Docker Compose Setup:
version: '3.8' services: rabbitmq: build: . ports: - "5672:5672" - "15672:15672" environment: RABBITMQ_DEFAULT_USER: guest RABBITMQ_DEFAULT_PASS: guest
Concepts
1. Delay Scheduler in RabbitMQ
RabbitMQ does not natively support delayed message scheduling out of the box. However, with the rabbitmq_delayed_message_exchange
plugin, you can achieve this functionality. This plugin introduces a new exchange type, x-delayed-message
, which allows messages to be delayed before being routed to a queue.
2. Time-To-Live (TTL)
TTL defines the lifespan of a message in RabbitMQ. If a message exceeds its TTL without being consumed, it is either dead-lettered (if a dead-letter exchange is configured) or dropped.
Implementation Steps
Step 1: Enable rabbitmq_delayed_message_exchange
(In our Case, we have done this through Docker, just run Docker compose up —» docker-compose-up —build)
Copy the plugin file
rabbitmq_delayed_message_exchange-4.0.2.ez
into the/opt/rabbitmq/plugins/
directory of your RabbitMQ container.Enable the plugin:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Restart the RabbitMQ service.
or
If we use Docker , then
Copy the plugin file
rabbitmq_delayed_message_exchange-4.0.2.ez
into the project directory any where.Enable the plugin:
docker-compose-up —build
Step 2: Create a Custom Service for RabbitMQ in Your Application
Below is an example RabbitMQService
implementation using Node.js and NestJS:
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import * as amqp from 'amqplib';
@Injectable()
export class RabbitMQService implements OnModuleInit, OnModuleDestroy {
private connection: amqp.Connection;
private channel: amqp.Channel;
private ready = false;
async onModuleInit() {
await this.connect();
}
private async connect() {
try {
this.connection = await amqp.connect('amqp://localhost:5672');
this.channel = await this.connection.createChannel();
this.ready = true;
console.log('RabbitMQ connection and channel established');
} catch (error) {
console.error('Error connecting to RabbitMQ', error);
}
}
private async checkConnection() {
if (!this.ready) {
console.log('Connection lost. Reconnecting...');
await this.reconnect();
}
}
private async reconnect() {
try {
await this.connect();
} catch (error) {
console.error('Reconnection failed', error);
}
}
public async sendMessage(queue: string, message: string) {
await this.checkConnection(); // Ensure connection before sending the message
if (!this.ready) {
throw new Error('RabbitMQ channel is not initialized.');
}
//For delay we used exchange
const exchange = 'delayed_exchange';
await this.channel.assertExchange(exchange, 'x-delayed-message', {
arguments: {
'x-delayed-type': 'direct',
},
});
await this.channel.assertQueue(queue, { durable: false });
//for exchange we will not use sendToQueue
// this.channel.sendToQueue(queue, Buffer.from(message));
// and we will use bind and publish for exchange
await this.channel.bindQueue(queue, exchange, '');
this.channel.publish(exchange, '', Buffer.from(message), {
headers: { 'x-delay': 10000 },
});
console.log(`Message sent to queue ${queue}: ${message}`);
}
async close() {
if (this.channel) {
await this.channel.close();
}
if (this.connection) {
await this.connection.close();
}
}
async onModuleDestroy() {
await this.close();
}
}
Step 3: Configure TTL in RabbitMQ Queues
TTL can be set when declaring a queue. Example:
await this.channel.assertQueue('my_queue', {
durable: true,
arguments: { 'x-message-ttl': 60000 }, // TTL of 60 seconds
});
Step 4: Run RabbitMQ and Application
Start RabbitMQ with Docker Compose:
docker-compose up --build
Start your Node.js application.
Testing the Implementation
Delay Scheduler
Send a delayed message:
await rabbitMQService.sendDelayedMessage('test_queue', 'Hello after 10 seconds!', 10000);
Observe the message appearing in the queue after the specified delay.
TTL
Set up a queue with a short TTL:
await this.channel.assertQueue('ttl_queue', { arguments: { 'x-message-ttl': 5000 }, });
Publish a message to the queue and observe it being removed after 5 seconds.
Debugging Tips
Use the RabbitMQ Management UI (
http://localhost:15672
) to monitor queues, exchanges, and messages.Check logs for errors related to exchange or queue declarations.
Conclusion
By enabling the rabbitmq_delayed_message_exchange
plugin and configuring TTL, RabbitMQ can effectively handle delayed message scheduling and message expiration. This setup ensures more control over message routing and processing in your applications.