RabbitMQ Delay Scheduler and TTL Implementation

This document outlines the steps and concepts involved in implementing a delay scheduler and Time-To-Live (TTL) for messages in RabbitMQ. By the end of this guide, you will understand how to configure RabbitMQ to handle delayed message scheduling and TTL effectively.


Prerequisites

  1. RabbitMQ Installed:

    • Ensure RabbitMQ is installed locally or running as a Docker container.

    • Enable the RabbitMQ Management Plugin for monitoring and debugging.

  2. RabbitMQ Dockerfile (for custom plugins):

     FROM rabbitmq:management
     COPY ./rabbitmq_delayed_message_exchange-4.0.2.ez /opt/rabbitmq/plugins/
     RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
  3. Docker Compose Setup:

     version: '3.8'
    
     services:
    
       rabbitmq:
         build: .
         ports:
           - "5672:5672"
           - "15672:15672"
         environment:
           RABBITMQ_DEFAULT_USER: guest
           RABBITMQ_DEFAULT_PASS: guest
    

Concepts

1. Delay Scheduler in RabbitMQ

RabbitMQ does not natively support delayed message scheduling out of the box. However, with the rabbitmq_delayed_message_exchange plugin, you can achieve this functionality. This plugin introduces a new exchange type, x-delayed-message, which allows messages to be delayed before being routed to a queue.

2. Time-To-Live (TTL)

TTL defines the lifespan of a message in RabbitMQ. If a message exceeds its TTL without being consumed, it is either dead-lettered (if a dead-letter exchange is configured) or dropped.


Implementation Steps

Step 1: Enable rabbitmq_delayed_message_exchange (In our Case, we have done this through Docker, just run Docker compose up —» docker-compose-up —build)

  1. Copy the plugin file rabbitmq_delayed_message_exchange-4.0.2.ez into the /opt/rabbitmq/plugins/ directory of your RabbitMQ container.

  2. Enable the plugin:

     rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
  3. Restart the RabbitMQ service.

or

If we use Docker , then

  1. Copy the plugin file rabbitmq_delayed_message_exchange-4.0.2.ez into the project directory any where.

  2. Enable the plugin:

     docker-compose-up —build
    

Step 2: Create a Custom Service for RabbitMQ in Your Application

Below is an example RabbitMQService implementation using Node.js and NestJS:

import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import * as amqp from 'amqplib';

@Injectable()
export class RabbitMQService implements OnModuleInit, OnModuleDestroy {
  private connection: amqp.Connection;
  private channel: amqp.Channel;
  private ready = false;

  async onModuleInit() {
    await this.connect();
  }

  private async connect() {
    try {
      this.connection = await amqp.connect('amqp://localhost:5672');
      this.channel = await this.connection.createChannel();
      this.ready = true;
      console.log('RabbitMQ connection and channel established');
    } catch (error) {
      console.error('Error connecting to RabbitMQ', error);
    }
  }

  private async checkConnection() {
    if (!this.ready) {
      console.log('Connection lost. Reconnecting...');
      await this.reconnect();
    }
  }

  private async reconnect() {
    try {
      await this.connect();
    } catch (error) {
      console.error('Reconnection failed', error);
    }
  }

  public async sendMessage(queue: string, message: string) {
    await this.checkConnection(); // Ensure connection before sending the message

    if (!this.ready) {
      throw new Error('RabbitMQ channel is not initialized.');
    }

    //For delay we used exchange
    const exchange = 'delayed_exchange';
    await this.channel.assertExchange(exchange, 'x-delayed-message', {
      arguments: {
        'x-delayed-type': 'direct',
      },
    });
    await this.channel.assertQueue(queue, { durable: false });

    //for exchange we will not use sendToQueue
    // this.channel.sendToQueue(queue, Buffer.from(message));

    // and we will use bind and publish for exchange

    await this.channel.bindQueue(queue, exchange, '');
    this.channel.publish(exchange, '', Buffer.from(message), {
      headers: { 'x-delay': 10000 },
    });
    console.log(`Message sent to queue ${queue}: ${message}`);
  }

  async close() {
    if (this.channel) {
      await this.channel.close();
    }
    if (this.connection) {
      await this.connection.close();
    }
  }

  async onModuleDestroy() {
    await this.close();
  }
}

Step 3: Configure TTL in RabbitMQ Queues

TTL can be set when declaring a queue. Example:

await this.channel.assertQueue('my_queue', {
  durable: true,
  arguments: { 'x-message-ttl': 60000 }, // TTL of 60 seconds
});

Step 4: Run RabbitMQ and Application

  1. Start RabbitMQ with Docker Compose:

     docker-compose up --build
    
  2. Start your Node.js application.


Testing the Implementation

Delay Scheduler

  1. Send a delayed message:

     await rabbitMQService.sendDelayedMessage('test_queue', 'Hello after 10 seconds!', 10000);
    
  2. Observe the message appearing in the queue after the specified delay.

TTL

  1. Set up a queue with a short TTL:

     await this.channel.assertQueue('ttl_queue', {
       arguments: { 'x-message-ttl': 5000 },
     });
    
  2. Publish a message to the queue and observe it being removed after 5 seconds.


Debugging Tips

  • Use the RabbitMQ Management UI (http://localhost:15672) to monitor queues, exchanges, and messages.

  • Check logs for errors related to exchange or queue declarations.


Conclusion

By enabling the rabbitmq_delayed_message_exchange plugin and configuring TTL, RabbitMQ can effectively handle delayed message scheduling and message expiration. This setup ensures more control over message routing and processing in your applications.