Wednesday, February 27, 2013

JMS using ActiveMQ


We discussed and learnt about JMS using EJB3.0 in our previous post Message Driven Bean in EJB3.0. Now in this post, we will explore plain java based JMS implementation using  Apache ActiveMQ.


Java Message Service (JMS) is a Java Message Oriented Middleware (MOM) API for sending messages between two or more clients. 
There are four main components which make the JMS system.



1. Client - A JMS client is an application that uses the services of the message broker. There are two types   of clients, Consumer and Producer. Producers create messages and send or publish them to the broker for delivery to a specified destination.Consumers retrieve messages from a destination.
2. Message Broker(Server) - A JMS Broker is a system which provides clients with connectivity and message storage/delivery services. It maintains a list of queues and topics for which applications can connect to and send and receive messages.
3. Destination - Destinations are the place where message get stored for clients. They can be either queues or topics. 
4. Message - A messages is an object that contains the required heading fields, optional properties, and data payload being transferred between JMS clients.

In JMS, a producer connects to an MQ server and sends a message to a destination(queue or topic). A consumer also connects to the MQ server and listens to destination(a queue or topics) for messages of interest.

To develop and test a JMS application, Message Broker is required. For our example, we will use the Apache ActiveMQ server (activemq.apache.org), which is an open source MQ server. 


Setting Up ActiveMQ 
Step 1: Download the latest release from ActiveMQ web site (activemq.apache.org/download.html) and extract the archive into a folder on your computer.

Step 2: Upon extraction, navigate to the bin folder, and run the activemq command  as
                                     apache-activemq-5.5.1\bin>activemq.bat                                          
The server will be started, and upon completion, you will see that the ActiveMQ server is listening to port 61616 for a JMS connection.

Step 3: Open your browser and type the following address http://localhost:8161/admin/
             Browser will open admin page of ActiveMQ as below image

Step 4: We can create queue and topic using ActiveMQ admin page. Click on Queues menu visible after Home|Queues. Then type the queue name in text box and click on Create button.
A queue should be visible as shown in the below figure.                



How To DO
To send or receive a JMS message, we have to perform the following tasks in our code.
  •  Create a JMS connection factory on a message broker.
  •  Create a JMS destination, which can be either a queue or a topic.
  •  Open a JMS connection from the connection factory.
  •  Obtain a JMS session from the connection.
  •  Send/receive the JMS message with a message producer/consumer.
  •  Handle JMSException, which is a checked exception that must be handled.
  •  Close the JMS session and connection.

To send/receive JMS messages to/from a JMS message broker, we have to include the library of the message broker in our classpath. download


MessageProducerTest.java
package com.sarf.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import com.sarf.data.MessageObject;

public class MessageProducerTest{

  public void sendMessage(MessageObject mail) {
    //Step 1: Create Connection Factory
    ConnectionFactory cf = 
       new ActiveMQConnectionFactory("tcp://localhost:61616");
    //Step 2: Define Destination
    Destination destination = new ActiveMQQueue("mail.queue");
    Connection conn = null;
    try{
 //Step 3: Create connection from connection factory
 conn = cf.createConnection();
 //Step 4: Create session
 Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
 //Step 5: Create Message Producer for destination queue
 MessageProducer producer = session.createProducer(destination);
 //Step 6: Create Message
 MapMessage message = session.createMapMessage();
 message.setString("mailId", mail.getMailId());
 message.setString("message",mail.getMessage());
    //Step 7: Send message
 producer.send(message);
 session.close();
 }catch(Exception e) {
   e.printStackTrace();
 }
   }
}
In the preceding sendMessage() method, we are creating ConnectionFactory using message broker URL and Destination objects for user created queue (mail.queue). Then we are creating a connection, session, and message producer to send message to destination queue. In step 6, we are creating MapMessage object which contains our actual message to be sent. At the final step 7, we are sending the message.


MessageConsumerTest.java
package com.sarf.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import com.sarf.data.MessageObject;

public class MessageConsumerTest{
 public MessageObject receiveMessage() {
 ConnectionFactory cf =
            new ActiveMQConnectionFactory("tcp://localhost:61616");
        Destination destination = new ActiveMQQueue("mail.queue");
        Connection conn = null;
        try {
            conn = cf.createConnection();
            Session session =
                conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageConsumer consumer = session.createConsumer(destination);
            conn.start();
            MapMessage message = (MapMessage) consumer.receive();
            MessageObject mail = new MessageObject();
            mail.setMailId(message.getString("mailId"));
            mail.setMessage(message.getString("message"));
            session.close();
            return mail;
        }catch(Exception e) {
          e.printStackTrace();
 }finally{
      if(conn != null) {
      try{conn.close();}catch(JMSException e){}
            }
 }
      return null;
   }
}
In the preceding receiveMessage() method, we are creating ConnectionFactory using message broker URL and Destination objects for user created queue (mail.queue). Then we are creating a connection, session, and message consumer to consume message from destination queue.

MessageObject.java
This class represent POJO message object that we are sending to destination and consuming it using consumer.
package com.sarf.data; 

public class MessageObject {
  private String mailId;
  private String message;
    
  public MessageObject(){}; 
  public MessageObject(String mailId, String message) {
   super();
   this.mailId = mailId;
   this.message = message;
  }
  public String getMailId() {
   return mailId;
  }
  public void setMailId(String mailId) {
   this.mailId = mailId;
  }
  public String getMessage() {
   return message;
  }
  public void setMessage(String message) {
   this.message = message;
  }
}