Monday, March 8, 2021

Spring RabbitMQ retry failed messages

There are some situations when processing some message from RabbitMQ it fails because message depends on not several things. It might be business logic itself, but in other cases it can be due to some database downtime, network issues, etc.

Another possibility is that the message depends on a previous message which has not been processed yet due to some tricky situation; such as there was not enough time to store first message in database or they arrive too close in time.

For these kind of situations we may want to retry processing the second message.

It may have been seen that for these cases, if processing of message is not within a try / catch block it keeps on been requeed non-stop until message can go through.

On the other hand, if it is surrounded in a try / catch block you may perform some logic in the catch code such as log the failure or whatever you think is the best solution for you business case.

Because of that, lets try to get a nice solution for managing these retries or requeue policy suitable for your use cases.

In the following Github repository there is an approach for solving these situations.

https://github.com/rafareyeslopez/demo-spring-rabbit-retry

There has been created an example of the failure scenario exlpained above. Lets say we have some kind of notifications we need to process, they are supposed to arrive secuentialy: first a "new" status of a notification (item), secondly its "update". Lets name those messages by A for the "new" status and message B for the update.

In branch initial is the starting code where the "failure" happens.

It has been forced to arrive the update of the element before the creation (B before A).

Lets take a closer look to the project to explain each part, although is a quite simple Spring Boot project producer / consumer architecture.

First of all we have the Item class, with represents our element that is received through a queue called "notification". It has just 3 fields: and ID, a type representing if it's a "new" one or an "update"; and a value itself that we are going to modify to demonstrate the updating.




In the main class DemoSpringRabbitRetryApplication it is configured the queue and the ObjectMapper as we are going to use JSON for messages and they will be converted into our Item objects. 

As a @PostConstruct action after creating the queue the component Producer is called to start publishing messages




The Producer publish method first publishes a new Item to the queue, waits for 1 second and then pusblish the update of that Item (A then B). In that use case can be seen everything works as expected.

Secondly it publish first the "update" of an Item and afterwards after some delay the "new" status of the second Item (B then A, other way round as it would be expected).





On the other side, we have the Consumer to process the messages placed into our queue.

It checks if is a "new" Item, if so then it just stores it into database; otherwise it assumes is an "update", therfore it search for the Item with same id (correlation ID) and in case it is found, updates the Item in database; in case is not found it raises and Exception. 




Lets now see the application properties configuration






Docker is used to bring up RabbitMQ and database, in our case PostgreSQL. Here is the docker compose file to set up the infrastructure.




Lets run our application to see the result.

Start the infrastructuer by running docker-compose up -d command on the eroot of the project. Afterwards start the Spring Boot project. If you want RabbitMQ queues status can be checked in the management tool here http://localhost:15672/#/queues

In case also want to check the elements sttored in database you can enter into the container PostgreSQL by typing docker exec -it demo-db psql -U dbuser demo, once in we can query table "item" to see what it is being stored inside.

So after some seconds of running the console has the following logs:




As can be seen first notification / Item to be processed worked fine as the messages are in the order expected. Problems come in the second case, where the "update" arrives before the "new", B before A as explained before. What it happens is that message is requeued all the time until it is succesfully processed. This might work but what happens if the message A never arrives or is never processed for some reason. We would end up with a log os log stack trace fillnig up our disk and our consumer trying to process indefinitely a element that cannot be processed.


Until here we have seen the problem, now lets dive into a possible solution for this.

The first approach would be to try something really simple, which is to use some "magic" from spring configuration in order to be able to perform retries when consuming messages from RabbitMQ.

These changes are in branch first-approach

so the changes are just to add these new lines in the application.properties


spring.rabbitmq.listener.simple.retry.enabled=true spring.rabbitmq.listener.simple.retry.initial-interval=5000 spring.rabbitmq.listener.simple.retry.max-attempts=3 spring.rabbitmq.listener.simple.retry.max-interval=10000 spring.rabbitmq.listener.simple.retry.multiplier=2



With that configuration what we try to achieve is to perform some controlled retries to be able to process the problematic message.


Having so and running the application we get the following result



As can be seen the message causing the issue is being retried per the configuration previously mentioned. BUT, the retries of element B (the update) does lt the message A (the new one that should come first) do not let the other element come through. So in the end the retries are exhausted and the update (element B) is lost!



Next, lets try another solution using in this case a Dead Letter Queue.

What needs to be added is in application properties the following in order to disable the default requeue behaviour:


spring.rabbitmq.listener.simple.default-requeue-rejected=false


And change the queues configuration as follows:




Running the application with this version what happens is: after message is discarded because retries are exhausted it goes to the dead letter queue we set up, the parking lot one; and stays there. Like that we can visit that queue and decide how to proceed with those messages, maybe to build some specific consumer for them or however suits your needs,



References:

https://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR3/reference/html/_rabbitmq_binder.html

https://www.baeldung.com/spring-amqp-error-handling

https://programmerfriend.com/rabbit-mq-retry


No comments:

Post a Comment