最近公司做了一個(gè)以信息安全為主的項(xiàng)目,其中有一個(gè)業(yè)務(wù)需求就是,項(xiàng)目定時(shí)監(jiān)控操作用戶的行為,對(duì)于一些違規(guī)操作嚴(yán)重的行為,以發(fā)送郵件(FoxMail)的形式進(jìn)行郵件告警,可能是多人,也可能是一個(gè)人,第一次是以單人的形式,,直接在業(yè)務(wù)層需要告警的地方發(fā)送郵件即可,可是后邊需求變更了,對(duì)于某些告警郵件可能會(huì)發(fā)送多人,這其中可能就會(huì)有阻塞發(fā)郵件的可能,直到把所有郵件發(fā)送完畢后再繼續(xù)做下邊的業(yè)務(wù),領(lǐng)導(dǎo)說這樣會(huì)影響用戶體驗(yàn),發(fā)郵件的時(shí)候用戶一直處于等待狀態(tài),不能干別的事情。最后研究說用消息隊(duì)列,當(dāng)有需要發(fā)送郵件告警的時(shí)候,就向隊(duì)列中添加一個(gè)標(biāo)識(shí)消息,ActiveMQ通過監(jiān)聽器的形式,實(shí)時(shí)監(jiān)聽隊(duì)列里邊的小時(shí),收到消息后,判斷是不是需要發(fā)送告警的標(biāo)識(shí),是的話就自行就行發(fā)送郵件!這是就研究的消息隊(duì)列ActiveMQ,下邊就是具體內(nèi)容:
在辛集等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站設(shè)計(jì)、做網(wǎng)站、成都外貿(mào)網(wǎng)站建設(shè)公司 網(wǎng)站設(shè)計(jì)制作按需網(wǎng)站制作,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),成都全網(wǎng)營銷推廣,成都外貿(mào)網(wǎng)站建設(shè)公司,辛集網(wǎng)站建設(shè)費(fèi)用合理。
?ActiveMQ是Apache所提供的一個(gè)開源的消息系統(tǒng),完全采用Java來實(shí)現(xiàn),因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服務(wù))規(guī)范。JMS是一組Java應(yīng)用程序接口,它提供消息的創(chuàng)建、發(fā)送、讀取等一系列服務(wù)。JMS提供了一組公共應(yīng)用程序接口和響應(yīng)的語法,類似于Java數(shù)據(jù)庫的統(tǒng)一訪問接口JDBC,它是一種與廠商無關(guān)的API,使得Java程序能夠與不同廠商的消息組件很好地進(jìn)行通信。
JMS支持兩種消息發(fā)送和接收模型。
類似送快遞,快遞員(producer)將快遞(Message)放到指定地點(diǎn)(destination)后,就可以離開了,拿快遞的人(customer)在接收到通知后,到指定地點(diǎn)(destination)去取快遞(Message)就可以了。當(dāng)然,取快遞時(shí)可能要進(jìn)行身份驗(yàn)證,這就涉及到創(chuàng)建連接(connection)時(shí),需要指定用戶名和密碼了。還有就是,實(shí)際生活中,當(dāng)快遞員把快遞放好之后,照理說應(yīng)該通知客戶去哪里取快遞,而ActiveMq幫我們做好了一切,通知的工作Activemq會(huì)幫我們實(shí)現(xiàn),而無需我們親自編碼通知消費(fèi)者,生產(chǎn)者只需要將Message放到Mq中即可,通知消費(fèi)者的工作,mq會(huì)幫我們處理
用途就是用來處理消息,也就是處理JMS在大型電子商務(wù)類網(wǎng)站,如京東、淘寶、去哪兒等網(wǎng)站有著深入的應(yīng)用,隊(duì)列的主要作用是消除高并發(fā)訪問高峰,加快網(wǎng)站的響應(yīng)速度。
在不使用消息隊(duì)列的情況下,用戶的請(qǐng)求數(shù)據(jù)直接寫入數(shù)據(jù)庫,高發(fā)的情況下,會(huì)對(duì)數(shù)據(jù)庫造成巨大的壓力,同時(shí)也使得系統(tǒng)響應(yīng)延遲加劇,但使用隊(duì)列后,用戶的請(qǐng)求發(fā)給隊(duì)列后立即返回。
/apache-activemq-5.15.3/bin/win64/目錄下雙擊activemq.bat文件,在瀏覽器中輸入http://localhost:8161/admin/, 用戶名和密碼輸入admin即可
1
2
4 4.0.0
5
6 www.cnblogs.com.hongmoshu
7 test_actmq
8 0.0.1-SNAPSHOT
9 war
10 test_actmq Maven Webapp
11 http://www.example.com
12
13
14
15 4.1.8.RELEASE
16
17
18
19
20
21
22
23 junit
24 junit
25 4.11
26 test
27
28
29
30
31 jstl
32 jstl
33 1.2
34
35
36 javax.servlet
37 servlet-api
38 provided
39 2.5
40
41
42
43
44 org.springframework
45 spring-core
46 ${springframework}
47
48
49 org.springframework
50 spring-context
51 ${springframework}
52
53
54 org.springframework
55 spring-tx
56 ${springframework}
57
58
59 org.springframework
60 spring-webmvc
61 ${springframework}
62
63
64 org.springframework
65 spring-jms
66 ${springframework}
67
68
69
70
71 org.apache.xbean
72 xbean-spring
73 3.16
74
75
76
77
78 org.apache.activemq
79 activemq-core
80 5.7.0
81
82
83 org.apache.activemq
84 activemq-pool
85 5.12.1
86
87
88
89
90 com.google.code.gson
91 gson
92 1.7.1
93
94
95
96
97 net.sf.json-lib
98 json-lib
99 2.4
100 jdk15
101
102
103
104
105
106 test_actmq
107
108
109
1
2
20
21
22
23
24
25
29
30
31
33
34
35
36
37
38
39
40
41 Jaycekon
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
66
67
68
69
70
71
1
2
12
13
14
15
16
18
19
20
21
22
1
2
6 mydemo
7
8
9 index.jsp
10
11
12
13
14 contextConfigLocation
15
16 classpath:spring-mvc.xml;
17 classpath:ActiveMQ.xml;
18
19
20
21
22
23 org.springframework.web.context.ContextLoaderListener
24
25
26
27 springMVC
28 org.springframework.web.servlet.DispatcherServlet
29
30 contextConfigLocation
31 classpath:spring-mvc.xml
32
33 1
34
35
36 springMVC
37 /
38
39
40
41
42 characterEncodingFilter
43 org.springframework.web.filter.CharacterEncodingFilter
44
45 encoding
46 UTF-8
47
48
49 forceEncoding
50 true
51
52
53
54 characterEncodingFilter
55 /*
56
57
58
1 package com.svse.entity;
2 import java.io.Serializable;
3
4 public class Users implements Serializable{
5
6 private String userId;
7 private String userName;
8 private String sex;
9 private String age;
10 private String type;
11
12
13 public Users() {
14 super();
15 }
16 public Users(String userId, String userName, String sex, String age,
17 String type) {
18 super();
19 this.userId = userId;
20 this.userName = userName;
21 this.sex = sex;
22 this.age = age;
23 this.type = type;
24 }
25 public String getUserId() {
26 return userId;
27 }
28 public void setUserId(String userId) {
29 this.userId = userId;
30 }
31 public String getUserName() {
32 return userName;
33 }
34 public void setUserName(String userName) {
35 this.userName = userName;
36 }
37 public String getSex() {
38 return sex;
39 }
40 public void setSex(String sex) {
41 this.sex = sex;
42 }
43 public String getAge() {
44 return age;
45 }
46 public void setAge(String age) {
47 this.age = age;
48 }
49 public String getType() {
50 return type;
51 }
52 public void setType(String type) {
53 this.type = type;
54 }
55 @Override
56 public String toString() {
57 return "Users [userId=" + userId + ", userName=" + userName + ", sex="
58 + sex + ", age=" + age + ", type=" + type + "]";
59 }
60
61
62 }
1 package com.svse.service;
2
3 import javax.annotation.Resource;
4 import javax.jms.Destination;
5 import javax.jms.JMSException;
6 import javax.jms.Message;
7 import javax.jms.Session;
8
9 import org.springframework.jms.core.JmsTemplate;
10 import org.springframework.jms.core.MessageCreator;
11 import org.springframework.stereotype.Service;
12
13 import com.svse.entity.Users;
14
15 @Service
16 public class ProducerService {
17
18 @Resource(name="jmsTemplate")
19 private JmsTemplate jmsTemplate;
20
21
22 /**
23 * 向指定隊(duì)列發(fā)送消息 (發(fā)送文本消息)
24 */
25 public void sendMessage(Destination destination,final String msg){
26
27 jmsTemplate.setDeliveryPersistent(true);
28
29 System.out.println(Thread.currentThread().getName()+" 向隊(duì)列"+destination.toString()+"發(fā)送消息---------------------->"+msg);
30 jmsTemplate.send(destination, new MessageCreator() {
31 public Message createMessage(Session session) throws JMSException {
32 return session.createTextMessage(msg);
33 }
34 });
35 }
36
37 /**
38 * 向指定隊(duì)列發(fā)送消息以對(duì)象的方式 (發(fā)送對(duì)象消息)
39 */
40 public void sendMessageNew(Destination destination,Users user){
41 System.out.println(Thread.currentThread().getName()+" 向隊(duì)列"+destination.toString()+"發(fā)送消息---------------------->"+user);
42 jmsTemplate.convertAndSend(user);
43 }
44
45 /**
46 * 向默認(rèn)隊(duì)列發(fā)送消息
47 */
48 public void sendMessage(final String msg){
49 String destination = jmsTemplate.getDefaultDestinationName();
50 System.out.println(Thread.currentThread().getName()+" 向隊(duì)列"+destination+"發(fā)送消息---------------------->"+msg);
51 jmsTemplate.send(new MessageCreator() {
52 public Message createMessage(Session session) throws JMSException {
53 return session.createTextMessage(msg);
54 }
55 });
56 }
57 }
1 package com.svse.service;
2
3 import javax.annotation.Resource;
4 import javax.jms.Destination;
5 import javax.jms.JMSException;
6 import javax.jms.ObjectMessage;
7 import javax.jms.TextMessage;
8
9 import net.sf.json.JSONObject;
10
11 import org.springframework.jms.core.JmsTemplate;
12 import org.springframework.stereotype.Service;
13
14 import com.svse.entity.Users;
15
16 @Service
17 public class ConsumerService {
18
19 @Resource(name="jmsTemplate")
20 private JmsTemplate jmsTemplate;
21 //接收文本消息
22 public TextMessage receive(Destination destination){
23 TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
24 try{
25 JSONObject json=JSONObject.fromObject(textMessage.getText());
26 System.out.println("name:"+json.getString("userName"));
27 System.out.println("從隊(duì)列" + destination.toString() + "收到了消息:\t"
28 + textMessage.getText());
29 } catch (JMSException e) {
30 e.printStackTrace();
31 }
32 return textMessage;
33 }
34 //接收對(duì)象消息
35 public ObjectMessage receiveNew(Destination destination){
36 ObjectMessage objMsg=(ObjectMessage) jmsTemplate.receive(destination);
38 try{
39 Users users= (Users) objMsg.getObject();
44 System.out.println("name:"+users.getUserName());
47 System.out.println("從隊(duì)列" + destination.toString() + "收到了消息:\t"
48 + users);
49 } catch (JMSException e) {
50 e.printStackTrace();
51 }
52 return objMsg;
53 }
54 }
1 package com.svse.controller.mq;
2
3 import java.io.IOException;
4 import java.text.SimpleDateFormat;
5 import java.util.Date;
7 import javax.annotation.Resource;
8 import javax.jms.DeliveryMode;
9 import javax.jms.Destination;
10 import javax.jms.JMSException;
11 import javax.jms.ObjectMessage;
12 import javax.jms.TextMessage;
13 import javax.management.MBeanServerConnection;
14 import javax.management.remote.JMXConnector;
15 import javax.management.remote.JMXConnectorFactory;
16 import javax.management.remote.JMXServiceURL;
18 import org.springframework.stereotype.Controller;
19 import org.springframework.web.bind.annotation.RequestMapping;
20 import org.springframework.web.bind.annotation.RequestMethod;
21 import org.springframework.web.bind.annotation.RequestParam;
22 import org.springframework.web.servlet.ModelAndView;
24 import com.google.gson.Gson;
25 import com.svse.entity.Users;
26 import com.svse.service.ConsumerService;
27 import com.svse.service.ProducerService;
28
29 @Controller
30 public class DemoController {
35
36 //隊(duì)列名Jaycekon (ActiveMQ中設(shè)置的隊(duì)列的名稱)
37 @Resource(name="demoQueueDestination")
38 private Destination demoQueueDestination;
39
40 //隊(duì)列消息生產(chǎn)者
41 @Resource(name="producerService")
42 private ProducerService producer;
43
44 //隊(duì)列消息消費(fèi)者
45 @Resource(name="consumerService")
46 private ConsumerService consumer;
47
48 /*
49 * 準(zhǔn)備發(fā)消息
50 */
51 @RequestMapping(value="/producer",method=RequestMethod.GET)
52 public ModelAndView producer(){
53 System.out.println("------------go producer");
54
55 Date now = new Date();
56 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
57 String time = dateFormat.format( now );
58 System.out.println(time);
59
60 ModelAndView mv = new ModelAndView();
61 mv.addObject("time", time);
62 mv.setViewName("producer");
63 return mv;
64 }
65
66 /*
67 * 發(fā)消息
68 */
69 @RequestMapping(value="/onsend",method=RequestMethod.POST)
70 public ModelAndView producer(@RequestParam("message") String message) {
71 System.out.println("------------send to jms");
72 ModelAndView mv = new ModelAndView();
73 for(int i=0;i<5;i++){
74 try {
75 Users users=new Users("10"+(i+1),"趙媛媛"+(i+1),"女","27","影視演員");
76 Gson gson=new Gson();
77 String sendMessage=gson.toJson(users);
78 System.out.println("發(fā)送的消息sendMessage:"+sendMessage.toString());
79 // producer.sendMessage(demoQueueDestination,sendMessage.toString());//以文本的形式
80 producer.sendMessageNew(demoQueueDestination, users);//以對(duì)象的方式
81
82 } catch (Exception e) {
83 e.printStackTrace();
84 }
85 }
86 mv.setViewName("index");
87 return mv;
88 }
89 /*
90 * 手動(dòng)接收消息
91 */
92 @RequestMapping(value="/receive",method=RequestMethod.GET)
93 public ModelAndView queue_receive() throws JMSException {
94 System.out.println("------------receive message");
95 ModelAndView mv = new ModelAndView();
96
97 //TextMessage tm = consumer.receive(demoQueueDestination);//接收文本消息
98
99 ObjectMessage objMsg=consumer.receiveNew(demoQueueDestination);//接收對(duì)象消息
100 Users users= (Users) objMsg.getObject();
101 //mv.addObject("textMessage", tm.getText());
102 mv.addObject("textMessage", users.getUserId()+" || "+users.getUserName());
103 mv.setViewName("receive");
104 return mv;
105 }
106
107 /*
108 * ActiveMQ Manager Test
109 */
110 @RequestMapping(value="/jms",method=RequestMethod.GET)
111 public ModelAndView jmsManager() throws IOException {
112 System.out.println("------------jms manager");
113 ModelAndView mv = new ModelAndView();
114 mv.setViewName("index");
115
116 JMXServiceURL url = new JMXServiceURL("");
117 JMXConnector connector = JMXConnectorFactory.connect(url);
118 connector.connect();
119 MBeanServerConnection connection = connector.getMBeanServerConnection();
120
121 return mv;
122 }
123
124 }
在上邊的ProducerService和ConsumerService中,不論是發(fā)送消息還是接收消息,都可以以文本TextMessage的方式和ObjectMessage的方式.如果是簡單的文本消息可以以TextMessage,但是如果需要發(fā)送的內(nèi)容比較多,結(jié)構(gòu)比較復(fù)雜,這時(shí)候就建議用對(duì)象文本ObjectMessage的方式向隊(duì)列queue中發(fā)送消息了.但是這時(shí)候就需要用到對(duì)象消息轉(zhuǎn)換器MessageConverter.
MessageConverter的作用主要有兩方面,一方面它可以把我們的非標(biāo)準(zhǔn)化Message對(duì)象轉(zhuǎn)換成我們的目標(biāo)Message對(duì)象,這主要是用在發(fā)送消息的時(shí)候;另一方面它又可以把我們的Message對(duì)象
轉(zhuǎn)換成對(duì)應(yīng)的目標(biāo)對(duì)象,這主要是用在接收消息的時(shí)候。
1 package com.svse.util;
2
3 import java.io.Serializable;
4
5 import javax.jms.JMSException;
6 import javax.jms.Message;
7 import javax.jms.ObjectMessage;
8 import javax.jms.Session;
9
10 import org.springframework.jms.support.converter.MessageConversionException;
11 import org.springframework.jms.support.converter.MessageConverter;
12
13 /**
14 *功能說明:通用的消息對(duì)象轉(zhuǎn)換類
15 *@author:zsq
16 *create date:2019年7月12日 上午9:28:31
17 *修改人 修改時(shí)間 修改描述
18 *Copyright (c)2019北京智華天成科技有限公司-版權(quán)所有
19 */
20 public class ObjectMessageConverter implements MessageConverter {
21
22
23 //把一個(gè)Java對(duì)象轉(zhuǎn)換成對(duì)應(yīng)的JMS Message (生產(chǎn)消息的時(shí)候)
24 public Message toMessage(Object object, Session session)
25 throws JMSException, MessageConversionException {
26
27 return session.createObjectMessage((Serializable) object);
28 }
29
30 //把一個(gè)JMS Message轉(zhuǎn)換成對(duì)應(yīng)的Java對(duì)象 (消費(fèi)消息的時(shí)候)
31 public Object fromMessage(Message message) throws JMSException,
32 MessageConversionException {
33 ObjectMessage objMessage = (ObjectMessage) message;
34 return objMessage.getObject();
35 }
36
37 }
注意:寫了消息轉(zhuǎn)化器之后還需要的ActiveMQ.xml中進(jìn)行配置
MessageageListe作用就是動(dòng)態(tài)的自行監(jiān)聽消息隊(duì)列的生產(chǎn)者發(fā)送的消息,不需要人工手動(dòng)接收!
1 package com.svse.util;
2 import javax.jms.JMSException;
3 import javax.jms.Message;
4 import javax.jms.MessageListener;
5 import javax.jms.ObjectMessage;
6 import javax.jms.TextMessage;
7
8 import com.svse.entity.Users;
9
10
11 public class QueueMessageListener implements MessageListener {
12
13 //添加了監(jiān)聽器,只要生產(chǎn)者發(fā)布了消息,監(jiān)聽器監(jiān)聽到有消息消費(fèi)者就會(huì)自動(dòng)消費(fèi)(獲取消息)
14 public void onMessage(Message message) {
15 //(第1種方式)沒加轉(zhuǎn)換器之前接收到的是文本消息
16 //TextMessage tm = (TextMessage) message;
17
18 //(第2種方式)加了轉(zhuǎn)換器之后接收到的ObjectMessage對(duì)象消息
19 ObjectMessage objMsg=(ObjectMessage) message;
20 Users users;
21 try {
22 users = (Users) objMsg.getObject();
23 //System.out.println("QueueMessageListener監(jiān)聽到了文本消息:\t" + tm.getText());
24 System.out.println("QueueMessageListener監(jiān)聽到了文本消息:\t" + users);
25 //do something ...
26 } catch (JMSException e1) {
27 // TODO Auto-generated catch block
28 e1.printStackTrace();
29 }
30 }
31
32 }
同樣寫好監(jiān)聽器后也是需在ActiveMQ.xml中進(jìn)行配置注冊(cè)的
(1)注冊(cè)JmsTemplate時(shí),pubSubDomain這個(gè)屬性的值要特別注意。默認(rèn)值是false,也就是說默認(rèn)只是支持queue模式,不支持topic模式。但是,如果將它改為true,則不支持queue模式。因此如果項(xiàng)目需要同時(shí)支持queue和topic模式,那么需要注冊(cè)2個(gè)JmsTemplate,同時(shí)監(jiān)聽容器也需要注冊(cè)2個(gè)
(2)使用Queue時(shí),生產(chǎn)者只要將Message發(fā)送到MQ就不能被其他消費(fèi)者再消費(fèi)了,即“一次性消費(fèi)”。就是我們熟悉的“點(diǎn)對(duì)點(diǎn)”通信了;