Sunday, April 14, 2019

RabbitMQ Retry mechanism using Dead Letter Exchanges


Dead Letter Exchange takes care of Expired and Rejected Messages

Creating a Dead Letter Exchange:

Steps:
-Create new FANOUT exchange with the name "dead.letter. exchange".
-Create new queue "dead.letter.queue".
- Bind "dead.letter.queue" to "dead.letter. exchange".
Create new queue "test1" with the dead letter exchange set to "dead.letter. queue"
Send a message into "test1"
-Nack (with requeue = false) the message in "test1"

channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

Or through property
args.put("x-dead-letter-routing-key", "some-routing-key");

Dead-lettering a message modifies its headers:
-The exchange name is replaced with that of the latest dead-letter exchange,
-The routing key may be replaced with that specified in a queue performing dead lettering,
-If the above happens, the CC header will also be removed, and
-The BCC header will be removed as per Sender-selected distribution.

Spring retry template: Applicable for the consumers
spring:
  rabbitmq:
    listener:
      retry:
        enabled: true
        initial-interval: 2000
        max-attempts: 2
        multiplier: 1.5
        max-interval: 5000
       
Via Annotation

@Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAdviceChain(workMessagesRetryInterceptor());
        return factory;
    }
    @Bean
    public SimpleRetryPolicy rejectionRetryPolicy(){
        Map<Class<? extends Throwable> , Boolean> exceptionsMap = new HashMap<>();
        exceptionsMap.put(IllegalStateException.class, true);
        exceptionsMap.put(ArithmeticException.class, false);

        return new SimpleRetryPolicy(5 , exceptionsMap, true);
    }
    @Bean
    public RetryOperationsInterceptor workMessagesRetryInterceptor() {
        return RetryInterceptorBuilder.stateless()
                .retryPolicy(rejectionRetryPolicy())
                .backOffOptions(1000, 2, 10000)
                .build();
    }

The reason is a name describing why the message was dead-lettered and is one of the following:

rejected - the message was rejected with requeue=false,
expired - the TTL of the message expired; or
maxlen - the maximum allowed queue length was exceeded.

Failure of Messages:
You can keep some integer variable in the header or property. In case of failure or retry, increase the error count . Once it exceeds stop processing
Negative Acknowledgment:
public interface Channel 
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
 deliveryTag : the tag from the received
multiple : true to reject all messages up to and including
the supplied delivery tag; false to reject just the supplied
delivery tag.
requeue: true if the rejected message(s) should be requeued rather
than discarded/dead-lettered


No comments:

Post a Comment