Friday, July 10, 2015

How to implement a custom message processor for wso2 ESB


Storing and forwarding those messages by using WSO2 ESB is a widely adopted approach. We call this Message Store and Message Forward (MSMF) pattern.

ESB has two components to achieve this target. Those are

Message Store

Used to store the incoming messages in to the ESB to JMS message Queue.

Message Processor

This keeps polling the JMS queue in predefined interval and sends the message to the target endpoint.


But by default there are two types of message processors are supported out of the box.
 
  Scheduled Message Forwarding Processor

       This message processor process's the message from queue and forward it to a target endpoint.            The implementation of this can be found in following location.

  Sampling Processor

       This message processor process's the message from queue and forward it to a target sequence.            The implementation of this can be found in following location.

You can define the maximum retry count in the message processor. Once the retry count exceeded the limit, message processor can be configured to either one of the thing.

1. Message processor can deactivate itself.
2. Or it can drop that specific message from the message queue and continue processing the rest of
    the messages in the queue.

I have requirement of message processor which is sort of extension of the second configuration defined above

Use case

I have a setup 2 activemq message queues. Those are stock_message_queue and 
failed_stock_message_queue.

Here after failing to send the message to the back end after the number of retry counts, rather than dropping the message, I have to move that message to failed_stock_message_queue. The initially obtained messages are placed in the stock_message_queue.

Therefore, as a solution we can implement a custom message processor. Custom message processor can be implemented by using MessageProcessor interface. Since the message processor goes through several life cycle stages, we have to consider a great deal of care in developing our implementation during high loads.

I will be extending the existing ScheduledMessageForwardingProcessor class. In the "checkAndDeactivateProcessor" method, I will be adding an additional check to invoke a sequence, once the maximum attempt count breached. Invoked sequence will be moving the message from the stock_message_queue to failed_stock_message_queue. 

This is the changed code snippets from original implementation of ScheduledMessageForwardingProcessor class and synapse configuration.








The complete synapse code, custom code and capp project can be found from here. The queue messages can be checked from accessing the web console of active-mq.


Built the custom store and place the jar in to /repository/components/lib

Restart the ESB server

Upload the capp from ESB management console.

No comments:

Post a Comment