Sunday, April 14, 2019

RabbitMQ Retry mechanism using Dead Letter Exchanges


Dead Letter Exchange takes care of Expired and Rejected Messages

Creating a Dead Letter Exchange:

Steps:
-Create new FANOUT exchange with the name "dead.letter. exchange".
-Create new queue "dead.letter.queue".
- Bind "dead.letter.queue" to "dead.letter. exchange".
Create new queue "test1" with the dead letter exchange set to "dead.letter. queue"
Send a message into "test1"
-Nack (with requeue = false) the message in "test1"

channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

Or through property
args.put("x-dead-letter-routing-key", "some-routing-key");

Dead-lettering a message modifies its headers:
-The exchange name is replaced with that of the latest dead-letter exchange,
-The routing key may be replaced with that specified in a queue performing dead lettering,
-If the above happens, the CC header will also be removed, and
-The BCC header will be removed as per Sender-selected distribution.

Spring retry template: Applicable for the consumers
spring:
  rabbitmq:
    listener:
      retry:
        enabled: true
        initial-interval: 2000
        max-attempts: 2
        multiplier: 1.5
        max-interval: 5000
       
Via Annotation

@Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAdviceChain(workMessagesRetryInterceptor());
        return factory;
    }
    @Bean
    public SimpleRetryPolicy rejectionRetryPolicy(){
        Map<Class<? extends Throwable> , Boolean> exceptionsMap = new HashMap<>();
        exceptionsMap.put(IllegalStateException.class, true);
        exceptionsMap.put(ArithmeticException.class, false);

        return new SimpleRetryPolicy(5 , exceptionsMap, true);
    }
    @Bean
    public RetryOperationsInterceptor workMessagesRetryInterceptor() {
        return RetryInterceptorBuilder.stateless()
                .retryPolicy(rejectionRetryPolicy())
                .backOffOptions(1000, 2, 10000)
                .build();
    }

The reason is a name describing why the message was dead-lettered and is one of the following:

rejected - the message was rejected with requeue=false,
expired - the TTL of the message expired; or
maxlen - the maximum allowed queue length was exceeded.

Failure of Messages:
You can keep some integer variable in the header or property. In case of failure or retry, increase the error count . Once it exceeds stop processing
Negative Acknowledgment:
public interface Channel 
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
 deliveryTag : the tag from the received
multiple : true to reject all messages up to and including
the supplied delivery tag; false to reject just the supplied
delivery tag.
requeue: true if the rejected message(s) should be requeued rather
than discarded/dead-lettered


RabbitMQ : High Availability (HA) Queues


In this blog, We will talk about a very important feature provided by RabbitMQ, High Availability suing data replication. 

The content of the queue is located at only one node  by default.
-Exchanges and bindings are present on all nodes.
-Queue can be mirrored to have one master and rest of the slave nodes.
-If the node that hosts queue master fails, the node having most updated data will become master node.

Options to control mirroring:
  • Ha- mode:
    • exactly -> count
    • Allànone
    • Nodes
Mirroring to all nodes  puts additional load on the cluster nodes. This will result into higher network I/O , disk I/O and disk space usage:

Recommended replication factor:  2 in a 3 nodes cluster and 3 in 5 node cluster.
Dashboard for Queue will show the master ,mirrors  and the policy details.




Saturday, April 6, 2019

RabbitMq Federation Support


RabbitMQ  federation


Use Cases for RMQ Federation: 

RMQà Exchanges--à(based on some rules) Queue


I. Multiple cluster of rabbitmq at various geographical location:
Consumer needs to listen to central rabbitmq cluster only. If the consumer starts listening to multiple clusters it would suffer latency problem.
 Federation: Move all the messages from different clusters to central cluster via federation.

 II: distributing load from one queue to multiple clusters. If the consumers are at different locations and there is a one queue where data is being pushed.
Put the same messages over all the clusters.

III. Migrate data from one cluster to second cluster without any downtime

IV. Reduce Latency for message consumption
Consume messages by setting up local cluster where the consumer resides

Type of Federation:
Exchange federation:
It is possible to distribute messages to other cluster . Messages published to federate exchanges will be published to local and downstream  server as well.  Mainly used when the same message you want to consume at multiple places.
 With a federated exchange, queues can be connected to the queue on the upstream(source) node. 

In addition, an exchange on the downstream(destination) node will receive a copy of messages that are published to the upstream node.

Federated exchanges are a similar to exchange-to-exchange bindings, 
in that, they can (optionally) subscribe to a limited set of messages from an upstream exchange.


Queue Federation:
Messages would be distributed over two clusters. Messages here are not duplicated. It is useful to distribute load  or migration of clusters.
 With a federated queue, consumers can be connected to the queue on both the upstream(source) and downstream(destination) nodes.


Upstream server:  where producer is pushing the data.
Downstream servr:  Server where messages need to be forward. From upstream server to the destination.

Pre-requisite:
  1. Two RMQ servers/cluster
  2. Management plugin enabled for dashboard
  3. Connectivity between the two clsute server over amqp protocol on  port 15672

References:
https://www.rabbitmq.com/federation-reference.html
https://www.rabbitmq.com/federated-exchanges.html
https://jasonrowe.com/2018/01/26/rabbitmq-federation-examples/
https://jee-appy.blogspot.com/2018/08/setup-rabbitmq-exchange-federation.html
https://www.youtube.com/watch?v=-ysZPO_3HF0