Apache activeMq结合spring应用

2016-12-30 09:54:43来源:oschina作者:1234find人点击

第七城市

自己记录点干货,关于apache activeMq的,她的两种消息模式主要分为队列和订阅者,queue和topic,queue是消息发出后,消费后就消失,topic可配置持久化,既消息没来及消费的情况可以持久化到硬盘上,待消费者需要时再消费 ,就是如此。


下面是结合spring的配置文件------------这个是消费者的配置文件,其中生产者的配置已注掉了


<?xml version="1.0" encoding="UTF-8"?>






------------------------------------------------下面试转换器

import java.io.Serializable;


import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session;


import org.apache.activemq.command.ActiveMQObjectMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.support.converter.MessageConversionException; import org.springframework.jms.support.converter.MessageConverter;


public class NotifyMessageConverter implements MessageConverter{ private static Logger logger = LoggerFactory.getLogger(NotifyMessageConverter.class);//消息转换java对象,可看源码,中有注释 @Override public Object fromMessage(Message message) throws JMSException, MessageConversionException { // TODO Auto-generated method stub return null; }


//java对象转换消息,可看源码,中有注释 @Override public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException { ActiveMQObjectMessage msg = (ActiveMQObjectMessage)session.createObjectMessage(); msg.setObject((Serializable)object); return msg; } }


----------------------------------------------------------消费者


----------消费者只要实现MessageListener接口,并在配置文件中配置就行

import java.io.Serializable;


import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage;


public class TopicMessageListener implements MessageListener{


@Override public void onMessage(Message message) {


ObjectMessage objectMessage = (ObjectMessage)message;


Serializable noticeInfo = null; try {noticeInfo = (Serializable) objectMessage.getObject(); } catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace(); } System.out.println("-------------------消费者一接受消息-----------------------"+noticeInfo);


}


}


项目启动程序:

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.support.ClassPathXmlApplicationContext;


public class ServiceLauncher {


private static final Logger logger = LoggerFactory.getLogger(ServiceLauncher.class);


private static volatile boolean running = true;private static ClassPathXmlApplicationContext context;


public static void main(String[] args) throws Exception { context = new ClassPathXmlApplicationContext(new String[] { "classpath*:conf/app-context.xml" }); context.start();System.out.println("start service");try {logger.info("INFO ## the Service server is running now ......");Runtime.getRuntime().addShutdownHook(new Thread() {


public void run() {try {logger.info("INFO ## stop the Service server");context.close();} catch (Throwable e) {logger.warn("WARN ##something goes wrong when stopping Service Server:/n{}");} finally {logger.info("INFO ## Service server is down.");}


synchronized (ServiceLauncher.class) {running = false;ServiceLauncher.class.notify();} }


}); } catch (Throwable e) {logger.error("ERROR ## Something goes wrong when starting up the Service Server:/n{}");System.exit(0); } synchronized (ServiceLauncher.class) {while (running) {try { ServiceLauncher.class.wait();} catch (Throwable e) {}} }} }

--------------config.properties


mqUrl=tcp://localhost:61616


---------------log4j.xml -----------建议自写,这个另一个项目,拿过来用了下


<?xml version="1.0" encoding="UTF-8" ?>








--------------------------------------------------------以上完整的消费者客户端,并采用订阅者模式接受消息,若采用queue只需要,写一个相关类实现MessageListener就行。其他实现,大致雷同,有兴趣的朋友可以试试。

----------------------------------------------------------------------------------------


-----------------------------------------------------下面是生产者的配置文件,app-context-mq.xml


<?xml version="1.0" encoding="UTF-8"?>



--------------------------------------------------------订阅者生产者


import javax.jms.Destination;


import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.converter.MessageConverter;


public class TopicMessageProducer implements TopicMessageProducerManager{ private JmsTemplate jmsTemplate; private Destination notifyQueue; private Destination notifyTopic; public Destination getNotifyTopic() { return notifyTopic; } @Override public void setNotifyTopic(Destination notifyTopic) { this.notifyTopic = notifyTopic; } private MessageConverter messageConverter; @Override public void sendTopic(T noticeInfo){ sendMessage(noticeInfo); } private void sendMessage(T noticeInfo) { // TODO Auto-generated method stub jmsTemplate.setMessageConverter(messageConverter); //jmsTemplate.setPubSubDomain(false); jmsTemplate.convertAndSend(notifyTopic,noticeInfo); } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public Destination getNotifyQueue() { return notifyQueue; } public void setNotifyQueue(Destination notifyQueue) { this.notifyQueue = notifyQueue; } public MessageConverter getMessageConverter() { return messageConverter; } public void setMessageConverter(MessageConverter messageConverter) { this.messageConverter = messageConverter; } }


-----------------------------------------------生产者接口

import javax.jms.Destination;


public interface TopicMessageProducerManager {public void setNotifyTopic(Destination destination);public void sendTopic(T noticeInfo); }


-------------------------------------------------------队列模式生产者

import javax.jms.Destination;


import org.springframework.jms.core.JmsTemplate;


public class QueueMessageProducer { private JmsTemplate jmsTemplate; private Destination notifyQueue; private NotifyMessageConverter messageConverter; public void sendQueue(PhoneNoticeInfo noticeInfo){ sendMessage(noticeInfo); } private void sendMessage(PhoneNoticeInfo noticeInfo) { // TODO Auto-generated method stub jmsTemplate.setMessageConverter(messageConverter); jmsTemplate.setPubSubDomain(false); jmsTemplate.convertAndSend(notifyQueue,noticeInfo); } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public Destination getNotifyQueue() { return notifyQueue; } public void setNotifyQueue(Destination notifyQueue) { this.notifyQueue = notifyQueue; } public NotifyMessageConverter getMessageConverter() { return messageConverter; } public void setMessageConverter(NotifyMessageConverter messageConverter) { this.messageConverter = messageConverter; } }


----------------------------------------------testcase


import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;


public class Test { @SuppressWarnings("resource") @org.junit.Test public void sendTest(){


ApplicationContext act = new ClassPathXmlApplicationContext(new String[] { "classpath*:conf/app-context.xml" }); TopicMessageProducer qmp=(TopicMessageProducer) act.getBean("topicMessageProducer"); System.out.println("sad"); for(int i=0;i<10;i++){


PhoneNoticeInfo info = new PhoneNoticeInfo();info.setNoticeContent("你好!");qmp.sendTopic("dfsdfsadfsadfsad");System.out.println("topic信息已发送----》+"+i+""); } } }


-----------------------------------------------------PhoneNoticeInfo


import java.io.Serializable;


public class PhoneNoticeInfo implements Serializable{ /** 消息标题 */ public String noticeTitle; /** 消息内容 */ public String noticeContent; /** 接收者 */ public String receiver; /** 接收手机号 */ public String receiverPhone; public String getNoticeTitle() { return noticeTitle; } public void setNoticeTitle(String noticeTitle) { this.noticeTitle = noticeTitle; } public String getNoticeContent() { return noticeContent; } public void setNoticeContent(String noticeContent) { this.noticeContent = noticeContent; } public String getReceiver() { return receiver; } public void setReceiver(String receiver) { this.receiver = receiver; } public String getReceiverPhone() { return receiverPhone; } public void setReceiverPhone(String receiverPhone) { this.receiverPhone = receiverPhone; } }

----------------------------------------------


消息转换器

import java.io.Serializable;


import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session;


import org.apache.activemq.command.ActiveMQObjectMessage; import org.springframework.jms.support.converter.MessageConversionException; import org.springframework.jms.support.converter.MessageConverter;


public class MessageConvertForSys implements MessageConverter {public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {System.out.println("[toMessage]");ObjectMessage objectMessage = session.createObjectMessage();//objectMessage.setObject((Serializable) object);return objectMessage;}


public Object fromMessage(Message message) throws JMSException,MessageConversionException { // TODO Auto-generated method stub return null;}}

第七城市

最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台