There are some situations when processing some message from
RabbitMQ it fails because message depends on not several things. It
might be business logic itself, but in other cases it can be due to
some database downtime, network issues, etc.
Another possibility is that the message depends on a previous
message which has not been processed yet due to some tricky situation;
such as there was not enough time to store first message in database or
they arrive too close in time.
For these kind of situations we may want to retry processing the
second message.
It may have been seen that for these cases, if processing of
message is not within a try / catch block it keeps on been requeed
non-stop until message can go through.
On the other hand, if it is surrounded in a try / catch block you
may perform some logic in the catch code such as log the failure or
whatever you think is the best solution for you business case.
Because of that, lets try to get a nice solution for managing
these retries or requeue policy suitable for your use cases.
In the following Github repository there is an approach for
solving these situations.
https://github.com/rafareyeslopez/demo-spring-rabbit-retry
There has been created an example of the failure scenario
exlpained above. Lets say we have some kind of notifications we need to
process, they are supposed to arrive secuentialy: first a "new" status
of a notification (item), secondly its "update". Lets name those
messages by A for the "new" status and message B for the update.
In branch initial is the starting code where the "failure"
happens.
It has been forced to arrive the update of the element before the
creation (B before A).
Lets take a closer look to the project to explain each part,
although is a quite simple Spring Boot project producer / consumer
architecture.
First of all we have the Item class, with represents our element
that is received through a queue called "notification". It has just 3
fields: and ID, a type representing if it's a "new" one or an "update";
and a value itself that we are going to modify to demonstrate the
updating.
In the main class DemoSpringRabbitRetryApplication it is
configured the queue and the ObjectMapper as we are going to use JSON
for messages and they will be converted into our Item objects.
As a @PostConstruct action after creating the queue the
component Producer is called to start publishing messages
The Producer publish method first publishes a new Item to the
queue, waits for 1 second and then pusblish the update of that Item (A
then B). In that use case can be seen everything works as expected.
Secondly it publish first the "update" of an Item and afterwards
after some delay the "new" status of the second Item (B then A, other
way round as it would be expected).
On the other side, we have the Consumer to process the messages
placed into our queue.
It checks if is a "new" Item, if so then it just stores it into
database; otherwise it assumes is an "update", therfore it search for
the Item with same id (correlation ID) and in case it is found, updates
the Item in database; in case is not found it raises and
Exception.
Lets now see the application properties configuration
Docker is used to bring up RabbitMQ and database, in our case
PostgreSQL. Here is the docker compose file to set up the
infrastructure.
Lets run our application to see the result.
Start the infrastructuer by running docker-compose up -d command
on the eroot of the project. Afterwards start the Spring Boot project.
If you want RabbitMQ queues status can be checked in the management
tool here http://localhost:15672/#/queues
In case also want to check the elements sttored in database you
can enter into the container PostgreSQL by typing docker exec -it
demo-db psql -U dbuser demo, once in we can query table "item" to see
what it is being stored inside.
So after some seconds of running the console has the following
logs:
As can be seen first notification / Item to be processed worked
fine as the messages are in the order expected. Problems come in the
second case, where the "update" arrives before the "new", B before A as
explained before. What it happens is that message is requeued all the
time until it is succesfully processed. This might work but what
happens if the message A never arrives or is never processed for some
reason. We would end up with a log os log stack trace fillnig up our
disk and our consumer trying to process indefinitely a element that
cannot be processed.
Until here we have seen the problem, now lets dive into a
possible solution for this.
The first approach would be to try something really simple,
which is to use some "magic" from spring configuration in order to be
able to perform retries when consuming messages from RabbitMQ.
These changes are in branch first-approach
so the changes are just to add these new lines in the
application.properties
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=5000
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.max-interval=10000
spring.rabbitmq.listener.simple.retry.multiplier=2
With that configuration what we try to achieve is to perform some
controlled retries to be able to process the problematic message.
Having so and running the application we get the following result
As can be seen the message causing the issue is being retried per
the configuration previously mentioned. BUT, the retries of element B
(the update) does lt the message A (the new one that should come first)
do not let the other element come through. So in the end the retries
are exhausted and the update (element B) is lost!
Next, lets try another solution using in this case a Dead Letter
Queue.
What needs to be added is in application properties the following in order to disable the default requeue behaviour:
spring.rabbitmq.listener.simple.default-requeue-rejected=false
And change the queues configuration as follows:
Running the application with this version what happens is: after message is discarded because retries are exhausted it goes to the dead
letter queue we set up, the parking lot one; and stays there. Like that
we can visit that queue and decide how to proceed with those messages,
maybe to build some specific consumer for them or however suits your
needs,
References:
https://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR3/reference/html/_rabbitmq_binder.html
https://www.baeldung.com/spring-amqp-error-handling
https://programmerfriend.com/rabbit-mq-retry