Dead Letter Exchange takes care of Expired and Rejected Messages
Creating a Dead Letter Exchange:
Steps:
-Create new FANOUT exchange with the name "dead.letter. exchange".
-Create new queue "dead.letter.queue".
- Bind "dead.letter.queue" to "dead.letter. exchange".
Create new queue "test1" with the dead letter exchange set to "dead.letter. queue"
Send a message into "test1"
-Nack (with requeue = false) the message in "test1"
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);
Or through property
args.put("x-dead-letter-routing-key", "some-routing-key");
Dead-lettering a message modifies its headers:
-The exchange name is replaced with that of the latest dead-letter exchange,
-The routing key may be replaced with that specified in a queue performing dead lettering,
-If the above happens, the CC header will also be removed, and
-The BCC header will be removed as per Sender-selected distribution.
Spring retry template: Applicable for the consumers
spring:
rabbitmq:
listener:
retry:
enabled: true
initial-interval: 2000
max-attempts: 2
multiplier: 1.5
max-interval: 5000
Via Annotation
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAdviceChain(workMessagesRetryInterceptor());
return factory;
}
@Bean
public SimpleRetryPolicy rejectionRetryPolicy(){
Map<Class<? extends Throwable> , Boolean> exceptionsMap = new HashMap<>();
exceptionsMap.put(IllegalStateException.class, true);
exceptionsMap.put(ArithmeticException.class, false);
return new SimpleRetryPolicy(5 , exceptionsMap, true);
}
@Bean
public RetryOperationsInterceptor workMessagesRetryInterceptor() {
return RetryInterceptorBuilder.stateless()
.retryPolicy(rejectionRetryPolicy())
.backOffOptions(1000, 2, 10000)
.build();
}
The reason is a name describing why the message was dead-lettered and is one of the following:
rejected - the message was rejected with requeue=false,
expired - the TTL of the message expired; or
maxlen - the maximum allowed queue length was exceeded.
Failure of Messages:
You can keep some integer variable in the header or property. In case of failure or retry, increase the error count . Once it exceeds stop processing
Negative Acknowledgment:
public interface Channel
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
deliveryTag : the tag from the received
multiple : true to reject all messages up to and including
the supplied delivery tag; false to reject just the supplied
delivery tag.
requeue: true if the rejected message(s) should be requeued rather
than discarded/dead-lettered