這篇文章主要介紹“activemq特性是什么”,在日常操作中,相信很多人在activemq特性是什么問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”activemq特性是什么”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
目前創(chuàng)新互聯(lián)已為超過千家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)頁(yè)空間、成都網(wǎng)站托管、企業(yè)網(wǎng)站設(shè)計(jì)、古塔網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。
activemq特點(diǎn):用通配符訂閱多個(gè)destination,用組合發(fā)布多重destionation
activemq支持destination的層次結(jié)構(gòu)【topic和queen】便于歸類和管理。
通配符有三個(gè):
. 用來(lái)分隔路徑
* 用來(lái)匹配路徑中的一節(jié)
> 用來(lái)匹配任意節(jié)的路徑
opics:
。例如: football.division.leeds。 如果leeds 參加兩種運(yùn)動(dòng)--Scccer 和 Rugby,為了方便,我們希望通過一個(gè)消息消費(fèi)者而看到Leeds兩種運(yùn)動(dòng)的最新戰(zhàn)績(jī),這個(gè)時(shí)候,通配符就有用武之地了
. : used to separate elements in the destination name
* : used to match one element
> : match one or all trailing elements
所以,對(duì)于上面的例子, 你可以訂閱這樣的主題: *.*.Leeds
如果你想知道division1 這個(gè)賽區(qū)的所有分?jǐn)?shù), 你可以訂閱這個(gè): soccer.division1.*
如果你想知道Rugby的分?jǐn)?shù): 你可以訂閱這個(gè): rugby.>.
然而, 通配符中是為消費(fèi)者服務(wù)的,如果你發(fā)送了這樣的一個(gè)主題: rugby.>., 這個(gè)消息僅會(huì)發(fā)送到命名了rugby.>.的主題,并不是所有的主題都是以rugby開頭的。
這里有一種 方法,使消息生產(chǎn)者能將一條
消息發(fā)送到多個(gè)目的地。通過使用 composite destination。
將同一條消息發(fā)送到不同的目的地是很有用的。 比如一個(gè)用來(lái)存儲(chǔ)信息的應(yīng)用,會(huì)發(fā)送一條消息給隊(duì)列
同時(shí)也要將這條消息廣播給監(jiān)控的所有系統(tǒng)。通常,你會(huì)通過用兩個(gè)producer 發(fā)送兩次消息來(lái)達(dá)到這個(gè)目的。composite destination就是用來(lái)解決這種情況的
例如,如果你創(chuàng)建了名子為: store.order.backoffice,store.order.warehouse 的 Queue,這樣 就會(huì)發(fā)送同時(shí)兩個(gè)Queue。
訂閱信息 解釋
PRICE.> Any price for any product on any exchange
PRICE.STOCK.> Any price for a stock on any exchange
PRICE.STOCK.NASDAQ.* Any stock price on NASDAQ
PRICE.STOCK.*.IBM Any IBM stock price on any exchange
從5.5 版本以后,可以自定義路徑分隔符:
.....
此時(shí)FOO.BAR.* 可以表示為 FOO/BAR/*
也可以通過pathSeparator 屬性定義其他符號(hào)位路徑分隔符。
public void subscribeToLeeds() throws JMSException {
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic allLeeds = session.createTopic("*.*.Leeds");
MessageConsumer consumer = session.createConsumer(allLeeds);
Message result = consumer.receive();
}
11.1.2發(fā)送一個(gè)message到多重destinations
發(fā)送相同的message到不同的destination上:案列發(fā)送一個(gè)[queen,opic]組合模式,默認(rèn)的組合destination用,分隔
列如store.order.backoffice,store.order.warehouse
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue ordersDestination = session.createQueue("store.orders, topic://store.orders");
MessageProducer producer = session.createProducer(ordersDestination);
Message order = session.createObjectMessage();
producer.send(order);
11.2通知消息
單的說就是實(shí)現(xiàn)了ActiveMQ的broker上各種操作的記錄跟蹤和通知。
使用這個(gè)功能,你可以實(shí)時(shí)的知道broker上
創(chuàng)建或銷毀了連接,
添加或刪除了生存者或消費(fèi)者,
添加或刪除了主題或隊(duì)列,
有消息發(fā)送和接收,
什么時(shí)候有慢消費(fèi)者,
什么時(shí)候有快生產(chǎn)者
什么時(shí)候什么消息被丟棄
什么時(shí)候broker被添加到集群(主從或是網(wǎng)絡(luò)連接)
這個(gè)機(jī)制是ActiveMQ對(duì)JMS協(xié)議的重要補(bǔ)充,也是基于JMS實(shí)現(xiàn)的ActiveMQ的可管理性的一部分。多個(gè)ActiveMQ的相互協(xié)調(diào)和互操作的基礎(chǔ)設(shè)置。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic connectionAdvisory = org.apache.activemq.advisory.AdvisorySupport.CONNECTION_ADVISORY_TOPIC;
MessageConsumer consumer = session.createConsumer(connectionAdvisory);
ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
DataStructure data = (DataStructure) message.getDataStructure();
if (data.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) {
ConnectionInfo connectionInfo = (ConnectionInfo) data;
System.out.println("Connection started: " + connectionInfo);
} else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
RemoveInfo removeInfo = (RemoveInfo) data;
System.out.println("Connection stopped: " + removeInfo.getObjectId());
} else {
System.err.println("Unknown message " + data);
}
大多數(shù)advisor消息都是完整的對(duì)于destiation,但是呢advisorysupport類有一些方法來(lái)決定監(jiān)聽哪個(gè)advisorytopic,你也能使用通配符-
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
// Lets first create a Consumer to listen too
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Lets first create a Consumer to listen too
Queue queue = session.createQueue("test.Queue");
MessageConsumer testConsumer = session.createConsumer(queue);
// so lets listen for the Consumer starting/stoping
Topic advisoryTopic = org.apache.activemq.advisory.AdvisorySupport.getConsumerAdvisoryTopic(queue);
MessageConsumer consumer = session.createConsumer(advisoryTopic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
ActiveMQMessage message = (ActiveMQMessage) m;
try {
System.out.println("Consumer Count = " + m.getStringProperty("consumerCount"));
DataStructure data = (DataStructure) message.getDataStructure();
if (data.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
ConsumerInfo consumerInfo = (ConsumerInfo) data;
System.out.println("Consumer started: " + consumerInfo);
} else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
RemoveInfo removeInfo = (RemoveInfo) data;
System.out.println("Consumer stopped: " + removeInfo.getObjectId());
} else {
System.err.println("Unknown message " + data);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
testConsumer.close();
ActiveMQ中,topic只有在持久訂閱(durablesubscription)下是持久化的。存在持久訂閱時(shí),每個(gè)持久訂閱者,都相當(dāng)于一個(gè)持久化的queue的客戶端,它會(huì)收取所有消息。這種情況下存在兩個(gè)問題:
1. 同一應(yīng)用內(nèi)consumer端負(fù)載均衡的問題:同一個(gè)應(yīng)用上的一個(gè)持久訂閱不能使用多個(gè)consumer來(lái)共同承擔(dān)消息處理功能。因?yàn)槊總€(gè)都會(huì)獲取所有消息。queue模式可以解決這個(gè)問題,broker
端又不能將消息發(fā)送到多個(gè)應(yīng)用端。所以,既要發(fā)布訂閱,又要讓消費(fèi)者分組,這個(gè)功能jms規(guī)范本身是沒有的。
2. 同一應(yīng)用內(nèi)consumer端failover的問題:由于只能使用單個(gè)的持久訂閱者,如果這個(gè)訂閱者出錯(cuò),則應(yīng)用就無(wú)法處理消息了,系統(tǒng)的健壯性不高,為了解決這兩個(gè)問題,ActiveMQ中實(shí)現(xiàn)了虛擬
Topic的功能。使用起來(lái)非常簡(jiǎn)單。對(duì)于消息發(fā)布者來(lái)說,就是一個(gè)正常的Topic,名稱以VirtualTopic.開頭。例如VirtualTopic.TEST。對(duì)于消息接收端來(lái)說,是個(gè)隊(duì)列,不同應(yīng)用里使用不同的前綴作為
隊(duì)列的名稱,即可表明自己的身份即可實(shí)現(xiàn)消費(fèi)端應(yīng)用分組。例如Consumer.A.VirtualTopic.TEST,說明它是名稱為A的消費(fèi)端,同理Consumer.B.VirtualTopic.TEST說明是一個(gè)名稱為B的客戶端。
可以在同一個(gè)應(yīng)用里使用多個(gè)consumer消費(fèi)此queue,則可以實(shí)現(xiàn)上面兩個(gè)功能。又因?yàn)椴煌瑧?yīng)用使用的queue名稱不同(前綴不同),所以不同的應(yīng)用中都可以接收到全部的消息。每個(gè)客戶端相當(dāng)于一個(gè)持久訂
閱者,而且這個(gè)客戶端可以使用多個(gè)消費(fèi)者共同來(lái)承擔(dān)消費(fèi)任務(wù)。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection consumerConnection = connectionFactory.createConnection();
consumerConnection.start();
Session consumerSessionA = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue consumerAQueue = consumerSessionA.createQueue("Consumer.A.VirtualTopic.orders");
MessageConsumer consumerA = consumerSessionA.createConsumer(consumerAQueue);
Session consumerSessionB = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue consumerBQueue = consumerSessionB.createQueue("Consumer.B.VirtualTopic.orders");
MessageConsumer consumerB = consumerSessionB.createConsumer(consumerAQueue);
//setup the sender
Connection senderConnection = connectionFactory.createConnection();
senderConnection.start();
Session senerSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic ordersDestination = senerSession.createTopic("VirtualTopic.orders");
MessageProducer producer = senerSession.createProducer(ordersDestination);
同樣queue名稱的消費(fèi)者會(huì)平分所有消息。
從queue接收到的消息,message.getJMSDestination().toString()為topic://VirtualTopic.TEST,即原始的destination。消息的persistent屬性為true,即每個(gè)相當(dāng)于一個(gè)持久訂閱。
Virtual Topic這個(gè)功能特性在broker上有個(gè)總開關(guān),useVirtualTopics屬性,默認(rèn)為true,設(shè)置為false即可關(guān)閉此功能。
當(dāng)此功能開啟,并且使用了持久化的存儲(chǔ)時(shí),broker啟動(dòng)的時(shí)候會(huì)從持久化存儲(chǔ)里拿到所有的destinations的名稱,如果名稱模式與Virtual Topics匹配,則把它們添加到系統(tǒng)的Virtual Topics列表中去。
當(dāng)然,沒有顯式定義的Virtual Topics,也可以直接使用的,系統(tǒng)會(huì)自動(dòng)創(chuàng)建對(duì)應(yīng)的實(shí)際topic。
當(dāng)有consumer訪問此VirtualTopics時(shí),系統(tǒng)會(huì)自動(dòng)創(chuàng)建持久化的queue,并在每次Topic收到消息時(shí),分發(fā)到具體的queue。
可追溯”消費(fèi)者,只對(duì)Topic有效,如果consumer是可追溯的,那么它可以獲取實(shí)例創(chuàng)建之前的消息。通常而言,訂閱者不可能獲取實(shí)例創(chuàng)建之前的消息,因?yàn)閎roker根本不知道它的存在。對(duì)于broker而言,如果
一個(gè)Topic通道創(chuàng)建,且有發(fā)布者發(fā)布消息(Publisher),那么broker將會(huì)在內(nèi)存中(非持久化)或者磁盤中(持久化)保存已經(jīng)發(fā)布的消息,直到所有的訂閱者都消費(fèi)者,才會(huì)清除原始消息內(nèi)容。那么retroactive
類型的訂閱者,就可以獲取這些原本不屬于自己但broker上還保存的舊消息,就像我們訂閱一種Feed,可以立即獲取舊的內(nèi)容列表一樣。如果此訂閱者不是durable(耐久的),它可以獲取最近發(fā)布的一些消息;如果是durable,它可以獲取存儲(chǔ)器中尚未刪除的所有的舊消息。[下文會(huì)詳細(xì)介紹Topic的數(shù)據(jù)轉(zhuǎn)發(fā)模型]
//在destinationUrl中設(shè)置,默認(rèn)為false
feedTopic?consumer.retroactive=true
在broker端,可以配置當(dāng)前Topic默認(rèn)為“可追溯的”,不過Topic并不會(huì)在此種情況下額外的保存消息,只不過表示訂閱者默認(rèn)都是可追溯的而已。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("soccer.division1.leeds?consumer.retroactive=true");
MessageConsumer consumer = session.createConsumer(topic);
Message result = consumer.receive();
redeliveryPolicy
consumer使用的重發(fā)策略,當(dāng)消息在client端處理失敗(比如onMessage方法拋出異常,事務(wù)回滾等),將會(huì)觸發(fā)消息重發(fā)。對(duì)于Broker端,需要重發(fā)的消息將會(huì)被立即發(fā)送(如果broker端使用異步發(fā)送,
且發(fā)送隊(duì)列中還有其他消息,那么重發(fā)的消息可能不會(huì)被立即到達(dá)Consumer)。我們通過此Policy配置最大重發(fā)次數(shù)、重發(fā)頻率等,如果你的Consumer客戶端處于不良網(wǎng)絡(luò)環(huán)境中,可以適當(dāng)調(diào)整相關(guān)參數(shù)。參數(shù)列表,
請(qǐng)參見(RedeliveryPolicy)
//在brokerUrl中設(shè)置
tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=6
. redeliveryPolicy
RedelieveryPolicy policy=connection.getRedelieveryPolicy();
policy.setInitialRedelieveryDelay(500);
policy.setBackOffMultiplier(2)
policy.setUseExponentialBackOff(true)
policy.setMaximumRedelieveries(2)
DLQ-死信隊(duì)列(Dead Letter Queue)用來(lái)保存處理失敗或者過期的消息。
出現(xiàn)以下情況時(shí),消息會(huì)被redelivered
A transacted session is used and rollback() is called.
A transacted session is closed before commit is called.
A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called.
當(dāng)一個(gè)消息被redelivered超過maximumRedeliveries(缺省為6次,具體設(shè)置請(qǐng)參考后面的鏈接)次數(shù)時(shí),會(huì)給broker發(fā)送一個(gè)"Poison ack",這個(gè)消息被認(rèn)為是a poison pill,這時(shí)broker會(huì)將這
消息發(fā)送到DLQ,以便后續(xù)處理。缺省的死信隊(duì)列是ActiveMQ.DLQ,如果沒有特別指定,死信都會(huì)被發(fā)送到這個(gè)隊(duì)列。缺省持久消息過期,會(huì)被送到DLQ,非持久消息不會(huì)送到DLQ可以通過配置文件(activemq.xml)
來(lái)調(diào)整死信發(fā)送策略
' ,否則用隊(duì)列名稱 -->
...
在一個(gè)電子系統(tǒng)中可能接受來(lái)自不同供應(yīng)商的各種訂單信息,不同類型的訂單走的流程不盡相同,為了快速處理各種不同的訂單完成不同的業(yè)務(wù)。特定義不同的路由 信息。根據(jù)路由信息的不同,將消息進(jìn)行不同的處理。如果采用ActiveMQ那么最好采用apache-camel整合,使不同的消息根據(jù)不同的流程自動(dòng) 處理到不同的隊(duì)列中去。
到此,關(guān)于“activemq特性是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!