Skip navigation

Category Archives: JMS

Hi All,

In this blog we will see how to use Request/Reply model in JMS.

On a broad level,JMS supports two messaging models : Point-to-Point (PTP) and Publish-Subscribe. These are easier to create and there are many examples available.Here we will see a third kind of model called as Request-Reply model. This is an “overlay” model since it is implemented on the top of other two models.

Let us see how to create this model in JMS : We will use PTP model as the basic model. While sending the message, the Producer/Sender can specify the Reply To queue . Using this Reply To queue the Receiver/Consumer can send a reply back to the Producer.

To create a Reply To queue in the producer :

Destination replyQueue = session.createTemporaryQueue();
message.setJMSReplyTo(replyQueue);

To get the Reply To queue in the consumer :

Destination replyDestination = message.getJMSReplyTo();

Now let us see the full code for Producer :

public void sendMessage(String name) throws JustJavaRuntimeException {
	try {
		Connection connection = null;
		Session session = null;
		Destination destination = null;
		MessageProducer messageProducer = null;
		connectionFactory = (ConnectionFactory) ctx.lookup(connectionFactoryName);
		connection = connectionFactory.createConnection();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		destination =(Destination) ctx.lookup(requestQueueName);
		messageProducer = session.createProducer(destination);
		TextMessage message = session.createTextMessage(name);

		//Create Reply To Queue
		connection.start();
		Destination replyQueue = session.createTemporaryQueue();
		message.setJMSReplyTo(replyQueue);
		replyConsumer = session.createConsumer(replyQueue);
		replyConsumer.setMessageListener(new ResponseListener());
		messageProducer.send(message, javax.jms.DeliveryMode.PERSISTENT,   javax.jms.Message.DEFAULT_PRIORITY, 1800000);
		System.out.println("++++++++++++++++++++++++++++++++++++++++++");
		System.out.println("Message sent to Bean");
		System.out.println("ReplyQueue name is : "+replyQueue.toString());
		System.out.println("++++++++++++++++++++++++++++++++++++++++++");

		synchronized (this) {
		try {
		    wait(5000);
		} catch (InterruptedException e) {
		    throw new JustJavaRuntimeException(e);
		}
	}

	messageProducer.close();
	session.close();
	connection.close();

	} catch (JMSException je) {
		throw new JustJavaRuntimeException(je);
	} catch (NamingException ne) {
		throw new JustJavaRuntimeException(ne);
	}
}

One thing you might notice here :

replyConsumer.setMessageListener(new ResponseListener());

Well replyConsumer is a MessageConsumer which is assigned a message listener.This is for listening back the reply we get from the original Consumer.

Few things which are necessary for building Request/Reply model using JMS are :

//Create Reply To Queue
connection.start();
.....
synchronized (this) {
try {
    wait(5000);
} catch (InterruptedException e) {
    throw new JustJavaRuntimeException(e);
}

Well you need to do connection.start() ; This starts a connection’s delivery of incoming messages.

Also we are usiing wait(timeout) here . We need to wait since otherwise the connection would close after sending the message to the original Consumer.We don’t want connection to close till we receive a message from the Consumer back.So I have kept a delay here.

We can also use the MessageConsumer’s receive() method for synchronous receive.

Here is the code for ResponseListener :

class ResponseQueueListener implements MessageListener {
	public void onMessage(Message m) {
		try {
 			if (m instanceof TextMessage) {
				TextMessage replyMessage = (TextMessage) m;
				System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++");
				System.out.println("Received reply ResponseQueueListener");
				System.out.println("\tTime:       " + System.currentTimeMillis() + " ms");
				System.out.println("\tMessage ID: " + replyMessage.getJMSMessageID());
				System.out.println("\tCorrel. ID: " + replyMessage.getJMSCorrelationID());
				System.out.println("\tReply to:   " + replyMessage.getJMSReplyTo());
				System.out.println("\tContents:   " + replyMessage.getText());
				System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++");
			}
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}

Now let us see the code for Consumer. I have used MessageDrivenBean as Consumer.

@MessageDriven(name = "AsyncMDBean", activationConfig = {
	@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
	@ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/RequestQueue")

})
public class AsyncMDBean implements MessageListener {

	private Session session;
	Context ctx;

	public AsyncMDBean() {
 	   	try {
			ctx = new InitialContext();
		} catch (NamingException e) {
			throw new JustJavaRuntimeException(e);
		}
	}

	public void onMessage(Message message) {
		String text = null;
		try {

            	if (message instanceof TextMessage) {

			TextMessage textMessage = (TextMessage) message;
			text = textMessage.getText();
			System.out.println("****************************************************");
			System.out.println("Received message in AsyncMDBean. Name: "+text);
			System.out.println("****************************************************");

			//Send the reply
			Destination replyDestination = message.getJMSReplyTo();
			Connection connection = null;
			ConnectionFactory connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
			connection = connectionFactory.createConnection();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			MessageProducer replyProducer = session.createProducer(replyDestination);
			TextMessage replyMessage = session.createTextMessage();
			replyMessage.setText("ABCD");
			replyProducer.send(replyMessage);

			System.out.println("****************************************************");
			System.out.println("Sent reply");
			System.out.println("\tTime:       " + System.currentTimeMillis() + " ms");
			System.out.println("\tMessage ID: " + replyMessage.getJMSMessageID());
			System.out.println("\tCorrel. ID: " + replyMessage.getJMSCorrelationID());
			System.out.println("\tReply to:   " + replyMessage.getJMSReplyTo());
			System.out.println("\tContents:   " + replyMessage.getText());
			System.out.println("****************************************************");

        	} else {
		       	System.err.println("Expected a Text Message");
        	}

        	System.out.println("*******************************************");
        	System.out.println("Leaving AsyncMDBean. Name: "+text );
        	System.out.println("*******************************************");
        	} catch (Throwable t) {
            		t.printStackTrace();
        	}
	}
}

Well thats it !!!

Please let me how did you find this code ….. I have deployed it successfully on JBoss 4.2.3 … There were some issues on Glassfish.

Advertisements