Hi All,
In this blog we will see how to use Request/Reply model in JMS.
On a broad level,JMS supports two messaging models : Point-to-Point (PTP) and Publish-Subscribe. These are easier to create and there are many examples available.Here we will see a third kind of model called as Request-Reply model. This is an “overlay” model since it is implemented on the top of other two models.
Let us see how to create this model in JMS : We will use PTP model as the basic model. While sending the message, the Producer/Sender can specify the Reply To queue . Using this Reply To queue the Receiver/Consumer can send a reply back to the Producer.
To create a Reply To queue in the producer :
Destination replyQueue = session.createTemporaryQueue(); message.setJMSReplyTo(replyQueue);To get the Reply To queue in the consumer :
Destination replyDestination = message.getJMSReplyTo();Now let us see the full code for Producer :
public void sendMessage(String name) throws JustJavaRuntimeException { try { Connection connection = null; Session session = null; Destination destination = null; MessageProducer messageProducer = null; connectionFactory = (ConnectionFactory) ctx.lookup(connectionFactoryName); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination =(Destination) ctx.lookup(requestQueueName); messageProducer = session.createProducer(destination); TextMessage message = session.createTextMessage(name); //Create Reply To Queue connection.start(); Destination replyQueue = session.createTemporaryQueue(); message.setJMSReplyTo(replyQueue); replyConsumer = session.createConsumer(replyQueue); replyConsumer.setMessageListener(new ResponseListener()); messageProducer.send(message, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, 1800000); System.out.println("++++++++++++++++++++++++++++++++++++++++++"); System.out.println("Message sent to Bean"); System.out.println("ReplyQueue name is : "+replyQueue.toString()); System.out.println("++++++++++++++++++++++++++++++++++++++++++"); synchronized (this) { try { wait(5000); } catch (InterruptedException e) { throw new JustJavaRuntimeException(e); } } messageProducer.close(); session.close(); connection.close(); } catch (JMSException je) { throw new JustJavaRuntimeException(je); } catch (NamingException ne) { throw new JustJavaRuntimeException(ne); } }One thing you might notice here :
replyConsumer.setMessageListener(new ResponseListener());Well replyConsumer is a MessageConsumer which is assigned a message listener.This is for listening back the reply we get from the original Consumer.
Few things which are necessary for building Request/Reply model using JMS are :
//Create Reply To Queue connection.start(); ..... synchronized (this) { try { wait(5000); } catch (InterruptedException e) { throw new JustJavaRuntimeException(e); }Well you need to do connection.start() ; This starts a connection’s delivery of incoming messages.
Also we are usiing wait(timeout) here . We need to wait since otherwise the connection would close after sending the message to the original Consumer.We don’t want connection to close till we receive a message from the Consumer back.So I have kept a delay here.
We can also use the MessageConsumer’s receive() method for synchronous receive.
Here is the code for ResponseListener :
class ResponseQueueListener implements MessageListener { public void onMessage(Message m) { try { if (m instanceof TextMessage) { TextMessage replyMessage = (TextMessage) m; System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++"); System.out.println("Received reply ResponseQueueListener"); System.out.println("\tTime: " + System.currentTimeMillis() + " ms"); System.out.println("\tMessage ID: " + replyMessage.getJMSMessageID()); System.out.println("\tCorrel. ID: " + replyMessage.getJMSCorrelationID()); System.out.println("\tReply to: " + replyMessage.getJMSReplyTo()); System.out.println("\tContents: " + replyMessage.getText()); System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++"); } } catch (JMSException e) { e.printStackTrace(); } } }Now let us see the code for Consumer. I have used MessageDrivenBean as Consumer.
@MessageDriven(name = "AsyncMDBean", activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/RequestQueue") }) public class AsyncMDBean implements MessageListener { private Session session; Context ctx; public AsyncMDBean() { try { ctx = new InitialContext(); } catch (NamingException e) { throw new JustJavaRuntimeException(e); } } public void onMessage(Message message) { String text = null; try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; text = textMessage.getText(); System.out.println("****************************************************"); System.out.println("Received message in AsyncMDBean. Name: "+text); System.out.println("****************************************************"); //Send the reply Destination replyDestination = message.getJMSReplyTo(); Connection connection = null; ConnectionFactory connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactory"); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer replyProducer = session.createProducer(replyDestination); TextMessage replyMessage = session.createTextMessage(); replyMessage.setText("ABCD"); replyProducer.send(replyMessage); System.out.println("****************************************************"); System.out.println("Sent reply"); System.out.println("\tTime: " + System.currentTimeMillis() + " ms"); System.out.println("\tMessage ID: " + replyMessage.getJMSMessageID()); System.out.println("\tCorrel. ID: " + replyMessage.getJMSCorrelationID()); System.out.println("\tReply to: " + replyMessage.getJMSReplyTo()); System.out.println("\tContents: " + replyMessage.getText()); System.out.println("****************************************************"); } else { System.err.println("Expected a Text Message"); } System.out.println("*******************************************"); System.out.println("Leaving AsyncMDBean. Name: "+text ); System.out.println("*******************************************"); } catch (Throwable t) { t.printStackTrace(); } } }Well thats it !!!
Please let me how did you find this code ….. I have deployed it successfully on JBoss 4.2.3 … There were some issues on Glassfish.
10 Comments
HI,
I am also looking at the same issue, but in my case i am sending the message from SLSB EJB, container managed transaction.
getting some exception becuase of the same transaction.
the sequence is as follows…
1.Servlet controller calls SLSB
2.SLSB sends a message (setting with temporary queue as reply queue).
3.MDB consumes the message and sends a reply message.
4.SLSB receives the reply message and will do some business logic.
so could you please post me some POC with this case.
advance thanks
conti…it is working in a JUnit test case but not working as in application.
I am using Jboss4.2.2
Given Exampple of JMS Request Reply Pattern will not work in CMT of EJB3…
why?
Sesh,
What exception you are getting ? Can you please give the details ?
Rahul,
Can you post your code ?
hi justjava,
i am creating a similar request jms reply pattern with MDB as a consumer. What I want to ask you is if jndi.properties need to be set in MDB just like normal jms producer. I see that you just use the normal
Context ctx = new InitialContext();
please let me know.
Hi wangdu,
I used glassfish for all these examples and didn’t do anything with jndi properties . What is the issue u r facing . I need to search for the example again and try to run it .
Thanks for the reply. The issue i am facing is that the MDB (which is replier/responser) is not able to get the connection factory. I am using Jboss AS. I have tried:
1. no properties when instantiating the InitialContext.
2. with properties i.e new InitialContext(env);
3. then finally i tried to use also the activconfig annotation where i have set the propertyName to “connectionFactoryJndiName” which gives error: no such property found.
Could you try to do the same example on jboss and let me know how u did it? thanks
The response message is sended after commit? How can i do to ensure the message is sended if commit is executed and not sended if rollback?
Thanks
Hi
Nice JMS article. Could you have this available as a download?