Wednesday, March 17, 2021

Spark calculation of average time users active in Java

Here we are going to start some series of using Apache Spark with Java to perform some calculations to get some analytics.

In this first one we are to calculate the average time a user is subscribed to some service. This is a quite easy task to perform when using a relation database, but it may become complicated or quite time consuming if dataset is quite large or if data is stored in another type of data source.

To start with an easy example we are going to get the data from a MySQL table. The point is, data can be gathered from wherever we want, a large file in CSV format, Apache Cassandra...

As we are using a relational database the query for getting the information desired would be a simple group by sentence, but lets see how it would be using Spark.

In the following repository is the source code of the example:

https://github.com/rafareyeslopez/spark-demo


What is done to achieve the result is to run firstly a map function for get the data in the format required, so we build a Pair RDD where the key is the user ID and the value is the substraction of the timestamp when user left the service and the timestamp when user joined the service (transformed into days).

final JavaPairRDD mapToPair = usersMap.mapToPair(row -> new Tuple2<>(row._1(), TimeUnit.MILLISECONDS.toDays( row._3() == null ? Calendar.getInstance().getTimeInMillis() - row._2().getTime() : row._3().getTime() - row._2().getTime() )));


Next the amount os users is being count, this is and easy one

final long count = mapToPair.count();


And lastly the reduce is done, left just is to calculate the average.

// Get the sum of days in service
 final Long reduce = mapToPair.values().reduce((value1, value2) -> value1 + value2);

// Print out the average
 System.out.println("Average days subscribed " + reduce / count);


Just to note, Spark has a function to perform groupBy BUT it may cause some issues as is a well know caveat that needs to be taken into account.

Moreover, this calculation has been done using map / reduce, in the future some example using SparkSQL will be shown.

Here is the class that runs the Spark Job to perform the calculation

Monday, March 8, 2021

Spring RabbitMQ retry failed messages

There are some situations when processing some message from RabbitMQ it fails because message depends on not several things. It might be business logic itself, but in other cases it can be due to some database downtime, network issues, etc.

Another possibility is that the message depends on a previous message which has not been processed yet due to some tricky situation; such as there was not enough time to store first message in database or they arrive too close in time.

For these kind of situations we may want to retry processing the second message.

It may have been seen that for these cases, if processing of message is not within a try / catch block it keeps on been requeed non-stop until message can go through.

On the other hand, if it is surrounded in a try / catch block you may perform some logic in the catch code such as log the failure or whatever you think is the best solution for you business case.

Because of that, lets try to get a nice solution for managing these retries or requeue policy suitable for your use cases.

In the following Github repository there is an approach for solving these situations.

https://github.com/rafareyeslopez/demo-spring-rabbit-retry

There has been created an example of the failure scenario exlpained above. Lets say we have some kind of notifications we need to process, they are supposed to arrive secuentialy: first a "new" status of a notification (item), secondly its "update". Lets name those messages by A for the "new" status and message B for the update.

In branch initial is the starting code where the "failure" happens.

It has been forced to arrive the update of the element before the creation (B before A).

Lets take a closer look to the project to explain each part, although is a quite simple Spring Boot project producer / consumer architecture.

First of all we have the Item class, with represents our element that is received through a queue called "notification". It has just 3 fields: and ID, a type representing if it's a "new" one or an "update"; and a value itself that we are going to modify to demonstrate the updating.




In the main class DemoSpringRabbitRetryApplication it is configured the queue and the ObjectMapper as we are going to use JSON for messages and they will be converted into our Item objects. 

As a @PostConstruct action after creating the queue the component Producer is called to start publishing messages




The Producer publish method first publishes a new Item to the queue, waits for 1 second and then pusblish the update of that Item (A then B). In that use case can be seen everything works as expected.

Secondly it publish first the "update" of an Item and afterwards after some delay the "new" status of the second Item (B then A, other way round as it would be expected).





On the other side, we have the Consumer to process the messages placed into our queue.

It checks if is a "new" Item, if so then it just stores it into database; otherwise it assumes is an "update", therfore it search for the Item with same id (correlation ID) and in case it is found, updates the Item in database; in case is not found it raises and Exception. 




Lets now see the application properties configuration






Docker is used to bring up RabbitMQ and database, in our case PostgreSQL. Here is the docker compose file to set up the infrastructure.




Lets run our application to see the result.

Start the infrastructuer by running docker-compose up -d command on the eroot of the project. Afterwards start the Spring Boot project. If you want RabbitMQ queues status can be checked in the management tool here http://localhost:15672/#/queues

In case also want to check the elements sttored in database you can enter into the container PostgreSQL by typing docker exec -it demo-db psql -U dbuser demo, once in we can query table "item" to see what it is being stored inside.

So after some seconds of running the console has the following logs:




As can be seen first notification / Item to be processed worked fine as the messages are in the order expected. Problems come in the second case, where the "update" arrives before the "new", B before A as explained before. What it happens is that message is requeued all the time until it is succesfully processed. This might work but what happens if the message A never arrives or is never processed for some reason. We would end up with a log os log stack trace fillnig up our disk and our consumer trying to process indefinitely a element that cannot be processed.


Until here we have seen the problem, now lets dive into a possible solution for this.

The first approach would be to try something really simple, which is to use some "magic" from spring configuration in order to be able to perform retries when consuming messages from RabbitMQ.

These changes are in branch first-approach

so the changes are just to add these new lines in the application.properties


spring.rabbitmq.listener.simple.retry.enabled=true spring.rabbitmq.listener.simple.retry.initial-interval=5000 spring.rabbitmq.listener.simple.retry.max-attempts=3 spring.rabbitmq.listener.simple.retry.max-interval=10000 spring.rabbitmq.listener.simple.retry.multiplier=2



With that configuration what we try to achieve is to perform some controlled retries to be able to process the problematic message.


Having so and running the application we get the following result



As can be seen the message causing the issue is being retried per the configuration previously mentioned. BUT, the retries of element B (the update) does lt the message A (the new one that should come first) do not let the other element come through. So in the end the retries are exhausted and the update (element B) is lost!



Next, lets try another solution using in this case a Dead Letter Queue.

What needs to be added is in application properties the following in order to disable the default requeue behaviour:


spring.rabbitmq.listener.simple.default-requeue-rejected=false


And change the queues configuration as follows:




Running the application with this version what happens is: after message is discarded because retries are exhausted it goes to the dead letter queue we set up, the parking lot one; and stays there. Like that we can visit that queue and decide how to proceed with those messages, maybe to build some specific consumer for them or however suits your needs,



References:

https://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR3/reference/html/_rabbitmq_binder.html

https://www.baeldung.com/spring-amqp-error-handling

https://programmerfriend.com/rabbit-mq-retry


Tuesday, February 2, 2021

Loading all CSV files from directory to MySQL

For loading multiple CSV files into a table in MySQL the following command can be used under Linux shell:

for f in *.csv; do     mysql -e "load data LOCAL infile '"$f"' into table schema.my_table" -u your_username --password=your_password; done;

Monday, September 23, 2019

Expose entity ID using Spring Rest Repository

In case IDs of an entity needs to be exposer via Spring RestRepository the following configuration class can be used:

@Configuration
public class RestConfig implements RepositoryRestConfigurer {

/*
* (non-Javadoc)
*
* @see org.springframework.data.rest.webmvc.config.RepositoryRestConfigurer#
* configureRepositoryRestConfiguration(org.springframework.data.rest.core.
* config.RepositoryRestConfiguration)
*/
@Override
public void configureRepositoryRestConfiguration(final RepositoryRestConfiguration config) {
config.exposeIdsFor(MyEntity.class);
}

}

Friday, August 30, 2019

Deployment of MySQL in Kubernetes

Here is a simple example of deployment for a MySQL database instance in Kubernetes.

https://github.com/rafareyeslopez/mysql-kubernetes

Deployment of PostgreSQL in Kubernetes

Here is a simple example of deployment for a PostgreSQL database instance in Kubernetes.

As said this is a simple example, some things can be improved like using "secrets" for the credentials.

https://github.com/rafareyeslopez/postgresql-kubernetes

Thursday, March 7, 2019

Using PostgreSQL instead of MySQL , low memory required (Docker images)

In case for testing purposes a relational database is needed and do not have a lot of memory free I have done a test using Mysql docker image and PostgreSQL image. PostgreSQL has an alpine image wich MySQL at the moment does not.

The memory consumption is much less for PostgreSQL as can be seen below:

For MySQL one:



For PostgreSQL:




To run postgres with docker command execute:

 docker run --name some-postgres -p 5432:5432 -e POSTGRES_PASSWORD=password -e POSTGRES_USER=user -e POSTGRES_DB=database -d postgres:alpine