Thursday, March 5, 2009

JMS using Apache ActiveMQ

1)First download apache ActiveMQ latest from http://activemq.apache.org/download.html.
then unzip the file and in that bin start the activemq server using /bin/activemq.bat.

2)Compile the following classes ,and in one command prompt execute the Publisher class and in the other command prompt execute Consumer class.(execute the publisher class many times , that message will get in the consumer class console).


The Consumer class is given below:

package jmsclient;
import java.io.IOException;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import dto.MessageVO;

/**
* This class is used to Receive message
* @author jijo mathew
*
*/


public class Consumer implements MessageListener, ExceptionListener {

private boolean running;

private Session session;
private Destination destination;
private MessageProducer replyProducer;
private boolean pauseBeforeShutdown;
private boolean verbose = true;
private int maxiumMessages;
//private String subject = "TOOL.DEFAULT";
private String subject = "TESTMSG"; //TOPIC name -- consumers focus this TOPIC
private boolean topic=true; //true means TOPIC false means QUEUE
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private boolean transacted=false; //transaction enabled or not
private boolean durable=true; //enable persistance
private String clientId="jijomathew"; //this should be unique
private int ackMode = Session.AUTO_ACKNOWLEDGE;
private String consumerName = "jijomathew";
private long sleepTime;
private long receiveTimeOut; // keep messages for this time period
public static void main(String[] args) {
Consumer consumer = new Consumer();
consumer.run();

}
public void run() {
try {
running = true;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
Connection connection = connectionFactory.createConnection("system","manager");
if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
connection.setClientID(clientId);
}
connection.setExceptionListener(this);
connection.start();
System.out.println("Connected");

session = connection.createSession(transacted, ackMode);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}

replyProducer = session.createProducer(null);
replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

MessageConsumer consumer = null;
if (durable && topic) {
consumer = session.createDurableSubscriber((Topic)destination, consumerName);
} else {
consumer = session.createConsumer(destination);
}

if (maxiumMessages > 0) {
consumeMessagesAndClose(connection, session, consumer);
} else {
if (receiveTimeOut == 0) {
consumer.setMessageListener(this);
} else {
consumeMessagesAndClose(connection, session, consumer, receiveTimeOut);
}
}

} catch (Exception e) {
e.printStackTrace();
}
}

public void onMessage(Message message) {
try {

if (message instanceof TextMessage) {
//TextMessage txtMsg = (TextMessage)message;
ObjectMessage objMsg = (ObjectMessage)message;
if (verbose) {
MessageVO messageVO = (MessageVO)objMsg.getObject();
String msg = messageVO.getMessage();
System.out.println(msg);
}
} else {
ObjectMessage objMsg = (ObjectMessage)message;
if (verbose) {
MessageVO messageVO = (MessageVO)objMsg.getObject();
String msg = messageVO.getMessage();
System.out.println("Received: " + msg);
}
}

if (message.getJMSReplyTo() != null) {
replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: " + message.getJMSMessageID()));
}

if (transacted) {
session.commit();
} else if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
message.acknowledge();
}

} catch (JMSException e) {
e.printStackTrace();
} finally {
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
}
}
}
}

public synchronized void onException(JMSException ex) {
System.out.println("JMS Exception occured."+ex);
running = false;
}

synchronized boolean isRunning() {
return running;
}

protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
for (int i = 0; i < maxiumMessages && isRunning();) {
Message message = consumer.receive(1000);
if (message != null) {
i++;
onMessage(message);
}
}
System.out.println("connection is closing");
consumer.close();
session.close();
connection.close();
}

protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) throws JMSException, IOException {
Message message;
while ((message = consumer.receive(timeout)) != null) {
onMessage(message);
}

System.out.println("connection is closing");
consumer.close();
session.close();
connection.close();
}

public void setAckMode(String ackMode) {
if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {
this.ackMode = Session.CLIENT_ACKNOWLEDGE;
}
if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
this.ackMode = Session.AUTO_ACKNOWLEDGE;
}
if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;
}
if ("SESSION_TRANSACTED".equals(ackMode)) {
this.ackMode = Session.SESSION_TRANSACTED;
}
}

public void setClientId(String clientID) {
this.clientId = clientID;
}

public void setConsumerName(String consumerName) {
this.consumerName = consumerName;
}

public void setDurable(boolean durable) {
this.durable = durable;
}

public void setMaxiumMessages(int maxiumMessages) {
this.maxiumMessages = maxiumMessages;
}

public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) {
this.pauseBeforeShutdown = pauseBeforeShutdown;
}

public void setPassword(String pwd) {
this.password = pwd;
}

public void setReceiveTimeOut(long receiveTimeOut) {
this.receiveTimeOut = receiveTimeOut;
}

public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}

public void setSubject(String subject) {
this.subject = subject;
}

public void setTopic(boolean topic) {
this.topic = topic;
}

public void setQueue(boolean queue) {
this.topic = !queue;
}

public void setTransacted(boolean transacted) {
this.transacted = transacted;
}

public void setUrl(String url) {
this.url = url;
}

public void setUser(String user) {
this.user = user;
}

public void setVerbose(boolean verbose) {
this.verbose = verbose;
}

}

The Message class is given:
package dto;


import java.io.Serializable;
/**
* @author jijo mathew
*
*/
public class MessageVO implements Serializable{

private static final long serialVersionUID = 1L;
/**
*
*/
private String message="";

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}
}







The publisher class is given below:
package publisher;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;

import dto.MessageVO;

/**
* This class is used to send message
* @author jijo mathew
*
*/

public class Publisher {

private Destination destination;
private int messageCount = 10;
private long sleepTime;
private boolean verbose = true;
private int messageSize = 255;
private long timeToLive;
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TESTMSG";
private boolean topic=true;
private boolean transacted;
private boolean persistent=true;
public static void main(String[] args) {
Publisher publisher = new Publisher();
publisher.start();
}
public void start() {
Connection connection = null;

try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection("system","manager");
connection.start();
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
MessageProducer producer = session.createProducer(destination);
if (persistent) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
if (timeToLive != 0) {
producer.setTimeToLive(timeToLive);
}
sendMessage(session, producer);
System.out.println("Messages sent.");
ActiveMQConnection c = (ActiveMQConnection)connection;
c.getConnectionStats().dump(new IndentPrinter());

} catch (Exception e) {
e.printStackTrace();
} finally {
try {
connection.close();
} catch (Throwable ignore) {
ignore.printStackTrace();
}
}
}

protected void sendMessage(Session session, MessageProducer producer) throws Exception {
for (int i = 0; i < messageCount || messageCount == 0; i++) {
String message = "Message from jijo:"+messageCount;
MessageVO messageVO = new MessageVO();
messageVO.setMessage(message);
ObjectMessage objmessage = session.createObjectMessage(messageVO);
producer.send(objmessage);
if (transacted) {
session.commit();
}

Thread.sleep(sleepTime);

}

}

public void setPersistent(boolean durable) {
this.persistent = durable;
}

public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}

public void setMessageSize(int messageSize) {
this.messageSize = messageSize;
}

public void setPassword(String pwd) {
this.password = pwd;
}

public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}

public void setSubject(String subject) {
this.subject = subject;
}

public void setTimeToLive(long timeToLive) {
this.timeToLive = timeToLive;
}

public void setTopic(boolean topic) {
this.topic = topic;
}

public void setQueue(boolean queue) {
this.topic = !queue;
}

public void setTransacted(boolean transacted) {
this.transacted = transacted;
}

public void setUrl(String url) {
this.url = url;
}

public void setUser(String user) {
this.user = user;
}

public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
}


The following jar files is needed to build these classes:



activation-1.1.jar
activeio-core-3.1.0.jar
activemq-camel-5.2.0.jar
activemq-console-5.2.0.jar
activemq-core-5.2.0.jar
activemq-jaas-5.2.0.jar
activemq-jmdns_1.0-5.2.0.jar
activemq-optional-5.2.0.jar
activemq-pool-5.2.0.jar
activemq-web-5.2.0.jar
activemq-xmpp-5.2.0.jar
ant-1.6.2.jar
axis.jar
axis-ant.jar
camel-core-1.5.0.jar
camel-jetty-1.5.0.jar
camel-jms-1.5.0.jar
camel-spring-1.5.0.jar
commons-beanutils-1.7.0.jar
commons-collections-3.1.jar
commons-dbcp-all-1.3.jar
commons-digester-1.6.jar
commons-discovery-0.4.jar
commons-el-1.0.jar
commons-httpclient-3.0.1.jar
commons-io-1.3.2.jar
commons-lang-2.1.jar
commons-logging-1.1.jar
commons-pool-1.4.jar
crimson.jar
geronimo-j2ee-management_1.0_spec-1.0.jar
geronimo-jms_1.1_spec-1.1.1.jar
geronimo-jta_1.0.1B_spec-1.0.1.jar
jaxb-api-2.0.jar
jaxb-impl-2.0.3.jar
jetty-util-6.1.0.jar
jms.jar
jstl.jar
log4j-1.2.15.jar
quartz-all-1.5.2.jar
spring.jar
spring-aop-2.5.5.jar
spring-beans-2.5.5.jar
spring-context-2.5.5.jar
spring-core-2.5.5.jar
spring-jms-2.5.5.jar
spring-tx-2.5.5.jar
spring-web-2.5.5.jar
spring-webmvc-2.5.5.jar
standard.jar
stax-1.2.0.jar
stax-api-1.0.jar
transaction-api-1.1.jar
xalan.jar
xerces-J_1.4.0.jar
xstream-1.3.jar

You can get the jar from the activemq folder /lib directory.Also download the jms.jar .


For more details try the following links:
http://activemq.apache.org/version-5-getting-started.html
http://activemq.apache.org/web-samples.html
http://activemq.apache.org/version-5-examples.html

2 comments:

Anonymous said...

Good post and this fill someone in on helped me alot in my college assignement. Thank you on your information.

Anonymous said...

Brim over I to but I about the collection should acquire more info then it has.