写文章

ActiveMQ入门实例

2018-11-29 15:59:33

2170 | 0 | 0

在开始之前需要去到网上去下载一个activeMQ,(官方网站下载:http://activemq.apache.org/),然后直接解压运行bin目录下面的activemq.bat文件,启动后登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue,到这里准备工作就完成了,然后就可以开始了。        

    第一步,创建一个普通的java项目,然后导入java包(本人导入的是activemq-all-5.9.0.jar)

    第二步、添加消息发送者

public class Sender {
	
	//连接工厂
	private static ConnectionFactory connectionFactory;
	
	static{
		connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,  "tcp://localhost:61616");
	}

	public static void main(String[] args) {
		//连接
		Connection connection = null;
		//会话
		Session session = null;
		//目的地,消息要发往的地方
		Destination destination;
		//消息发送者
		MessageProducer producer = null;
		try {
			//获取消息连接
			connection = connectionFactory.createConnection();
			//开启连接
			connection.start();
			//通过连接创建会话
			session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			//创建消息队列
			destination = session.createQueue("FirstQueue");
			//创建发送者
			producer = session.createProducer(destination);
			//发送消息
			send(session, producer);
		} catch (JMSException e) {
			e.printStackTrace();
		}finally {
			try {
				session.commit();
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
	
	public static void send(Session session,MessageProducer producer) throws JMSException{
		for(int i=0;i<10;i++){
			//创建消息
			TextMessage msg = session.createTextMessage("消息"+i);
			//发送消息
			producer.send(msg);
			System.out.println("发送消息:消息"+i);
		}
	}
}

最后一步,添加消息的接受者

public class consumer {
	private static ConnectionFactory connectionFactory;

	static {
		connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
	}

	public static void main(String[] args) {
		Connection connection = null;
		Session session = null;
		Destination destination;
		MessageConsumer consumer = null;
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("FirstQueue");
			consumer = session.createConsumer(destination);
			while (true) {
				TextMessage msg = (TextMessage)consumer.receive(30000);
				if(msg != null){
					System.out.println("接受到的消息: "+msg.getText());
				}else{
					break;
				}

			}
				
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			try {
				consumer.close();
				session.commit();
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}

	}
}

注意:接受消息还可以通过MessageListener接收

public class consumerList {
	private static ConnectionFactory connectionFactory;

	static {
		connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
	}
	public static void main(String[] args) {
		Connection connection = null;
		Session session = null;
		Destination destination;
		MessageConsumer consumer = null;
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("FirstQueue");
			consumer = session.createConsumer(destination);
			consumer.setMessageListener(new MessageListener() {
				
				@Override
				public void onMessage(Message message) {
					TextMessage msg = (TextMessage)message;
					try {
						System.out.println(msg.getText());
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
			});
				
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			try {
				consumer.close();
				session.commit();
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}

	}
}


0

收藏
分享