這篇文章主要介紹“如何在Jboss中使用JMS”,在日常操作中,相信很多人在如何在Jboss中使用JMS問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”如何在Jboss中使用JMS”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
成都服務(wù)器托管,創(chuàng)新互聯(lián)建站提供包括服務(wù)器租用、成都聯(lián)通服務(wù)器托管、帶寬租用、云主機(jī)、機(jī)柜租用、主機(jī)租用托管、CDN網(wǎng)站加速、域名注冊(cè)等業(yè)務(wù)的一體化完整服務(wù)。電話咨詢:135182197921、介紹
為了能建立和運(yùn)行此例子。我們使用兩種方式來(lái)進(jìn)行:一是使用Ant命令,二是使用JAR和JAVA基本命令。必須有下面的環(huán)境變量:
w
JAVA_HOME 安裝JDK1.4的目錄。
w JBOSS_DIST 安裝JBoss的目錄。
你必須安裝JDK, 如果使用Ant必須安裝
Ant??梢詤⒖嘉仪懊嫖臋n的介紹。
2、概述
1) 什么是JMS
JMS是Java API,
允許應(yīng)用程序來(lái)建立、接收和讀取消息。程序依靠這些API, 在運(yùn)行時(shí)需要一個(gè)JMS實(shí)現(xiàn)接口,來(lái)提供管理和控制,這被稱為JMS provider,
現(xiàn)在有幾種不同的JMS Provider; 在JBoss中的叫做JbossMQ。
2) JMS
和J2EE
JMS是在EJB和J2EE框架開(kāi)發(fā)之前進(jìn)行開(kāi)發(fā)的,所以在JMS說(shuō)明書中沒(méi)有涉及到EJB或J2EE。
EJB
和J2EE第一代版本中也沒(méi)有涉及到JMS,一直到EJB1.1,在生成一個(gè)可用Beand的容器provider中JMS也不是必須的API。在J2EE1.2中JMS接口是必需的情況,但并不是非得要包含一個(gè)JMS
Provider;在EJB2.0和J2EE1.3中又進(jìn)行改變,應(yīng)用服務(wù)器包含了一個(gè)JMS
Provider,自從J2EE1。3需要EJB2.0,增加了以下兩個(gè)JMS特性:
w 一種新Bean類型定義, 也就是消息驅(qū)動(dòng)Beam (MDB),
這種bean做為JMS消息監(jiān)聽(tīng)者,可以異步地處理JMS消息。
w JMS處理作為資源,來(lái)自一個(gè)Bean 的JMD
發(fā)布(發(fā)送)必須能和其他bean的全局事務(wù)環(huán)境共享。這個(gè)需要把JMS認(rèn)為是一個(gè)容器管理資源,象JDBC的連接。
3) JMS
和JBoss
JBoss從2.0版本以后都支持JMS。
在2.1中增加了MDB,從2.4版本開(kāi)始JMS作為一個(gè)事務(wù)資源。
JBoss中JMS的體系結(jié)構(gòu)如下:
w JMS Provider,
叫做JbossMQ 。 是JBoss實(shí)現(xiàn)JMS 1.0.2規(guī)范的一部分,包括可選部分,象ASF(Application Service Facvility)。
JbossMQ處理和普遍JMS一樣:建立 queues (隊(duì)列)或topic(標(biāo)題),持久性等。
w MDB (Message Driven
Beans),
w 資源適配器。
3、JMS Provider
JBoss有它自己的JMS Provider 叫做JbossMQ。
適合與JMS 1.0.2 的JMS
Provider,并且能用在所有平常的JMS程序中。為了清楚在JBoss中JMS是如何工作的,首先要清楚在JMS中涉及到的概念和術(shù)語(yǔ),最好的辦法是閱讀JMS規(guī)范,下面給出了簡(jiǎn)單的JMS介紹。
1)
JMS的簡(jiǎn)單介紹
當(dāng)你發(fā)送一個(gè)消息,你不能直接發(fā)送到對(duì)此消息感興趣的接受者。而是你發(fā)送到一個(gè)目的地。對(duì)此消息感興趣的接受者必須連接到目的地,得到此消息或在目的地設(shè)置訂閱。
在JMS中有兩種域:topics
和queues 。
w 一個(gè)消息發(fā)送到一個(gè)topics
,可以有多個(gè)客戶端。用topic發(fā)布允許一對(duì)多,或多對(duì)多通訊通道。消息的產(chǎn)生者被叫做publisher, 消息接受者叫做subscriber。
w
queue
是另外一種方式,僅僅允許一個(gè)消息傳送給一個(gè)客戶。一個(gè)發(fā)送者將消息放在消息隊(duì)列中,接受者從隊(duì)列中抽取并得到消息,消息就會(huì)在隊(duì)列中消失。第一個(gè)接受者抽取并得到消息后,其他人就不能在得到它。
為了能發(fā)送和接收消息,必須得到一個(gè)JMS連接。該連接是使用JMS
Provider得到連接的,在得到連接之后,建立一個(gè)會(huì)話(Session)。然后再建立publisher/sender
來(lái)發(fā)送消息或subscriber/receiver來(lái)接收消息。
運(yùn)行時(shí),如果使用topic 那么publisher 或subscriber
通過(guò)一個(gè)topic來(lái)關(guān)聯(lián),如果使用queue ,則sender
或receiver通過(guò)queue來(lái)關(guān)聯(lián)起來(lái)。
通常,在JMS框架中運(yùn)轉(zhuǎn)的方法如下:
(1)
得到一個(gè)JNDI初始化上下文(Context);
(2) 根據(jù)上下文來(lái)查找一個(gè)連接工廠TopicConnectFactory/
QueueConnectionFactory (有兩種連接工廠,根據(jù)是topic/queue來(lái)使用相應(yīng)的類型);
(3)
從連接工廠得到一個(gè)連接(Connect 有兩種[TopicConnection/ QueueConnection]);
(4)
通過(guò)連接來(lái)建立一個(gè)會(huì)話(Session);
(5) 查找目的地(Topic/ Queue);
(6)
根據(jù)會(huì)話以及目的地來(lái)建立消息制造者(TopicPublisher/QueueSender)和消費(fèi)者(TopicSubscriber/
QueueReceiver).
為了得到一個(gè)連接和得到一個(gè)目的地(用來(lái)關(guān)聯(lián)publisher/sender
或subscriber/receiver),必須用provider-specific參數(shù)。
通過(guò)JNDI來(lái)查找相應(yīng)的連接工廠或目的地,JNDI適合于任何JMS
Provider。但是查找用的名字是provider使用的。因此,在你使用的JMS Provider(其中包括JBossMQ),必須學(xué)會(huì)如何進(jìn)行指定。JMS
Provider中的任何設(shè)置,象連接特性,用到目的地等,在用到的Provider都有明確描述。
2) 配置
當(dāng)使用一個(gè)JMS
Provider時(shí),有三個(gè)Provider-specific因素:
w 得到一個(gè)JNDI初始化上下文
w 用到的連接工廠的名字。
w
對(duì)目的地的管理和命名協(xié)定。
JBoss同它的JNDI一起執(zhí)行。為了簡(jiǎn)單的JMS client,
配置和查找初始化上下文,同實(shí)現(xiàn)其他J2EE客戶端一樣。
JMS-specific 來(lái)約束JBoss 的JMS provider
(JBossMQ)。JbossMQ是通過(guò)xml
文件jbossmq-service.xml進(jìn)行配置的,該文件放在在serverdefaultdeploy下。
在xml文件中最基本的三件事情:
w
增加一個(gè)新的目的地
w 配置用戶
w 獲得可用連接工廠的名字。
(1) 增加新的目的地
w 在目的地的xml文件在jboss
3.x中是jbossmq-destinations-service.xml(server/../deploy)。在文件中已經(jīng)存在幾個(gè)缺省的目的地,所以你比較容易明白怎樣增加到文件中。在例子中你需要一個(gè)topic目的地
spool,所以增加下面的語(yǔ)句到j(luò)bossmq-destinations-service.xml中。這種方式是長(zhǎng)久存在的,不隨著JBoss服務(wù)器關(guān)閉而消失。
w
另外一種方法是可以通過(guò)JMX HTML管理界面。通過(guò)http://localhost:8080/jmx-console 來(lái)訪問(wèn)。在jboss.mq
下查找service=DestinationManager
的連接。然后在createTopic()或createQueue()來(lái)建立,這種方法建立的目的地是臨時(shí)性的,隨著服務(wù)器開(kāi)始存在,當(dāng)當(dāng)JBoss
服務(wù)器重新啟動(dòng)時(shí),動(dòng)態(tài)建立的目的地將會(huì)不存在。在JbossMQ中所有目的地都有一個(gè)目的地類型的前綴。對(duì)于topic前綴是topic
,對(duì)于queues前綴是queue。例如查找一個(gè)testTopic目的地,需要查找名字為“topic/testTopic”。
在此種方法中有createTopic()或createQueue()分別有兩種方法:一是有兩個(gè)參數(shù),二是有一個(gè)參數(shù)的。兩個(gè)參數(shù)分別是:建立的目的地名稱和JNDI名稱。一個(gè)參數(shù)的只是目的地名稱,對(duì)于JNDI名稱默認(rèn)是:[目的地類型(topic/queue)
]/目的地名稱。
在這里我們使用的是第一種方法。直接修改jbossmq-destinations-service.xml文件。
(2)
管理用戶
在JMS中可能關(guān)聯(lián)一個(gè)連接和一個(gè)用戶,不幸的是沒(méi)有明確的方法來(lái)限制訪問(wèn)JbossMQ或訪問(wèn)特殊的目的地到一個(gè)給定的用戶。為了給大部分角色,在JbossMQ中不需要建立用戶,除非想有一個(gè)持久topic訂閱者。在這個(gè)例子中,用戶是必須的。
用戶可以直接在文件jbossmq-state.xml(server/../conf)中添加。同樣也可以使用JMX
HTML管理界面來(lái)增加(jboss.mq->service=StateManager->addUser())。
>
(3)
連接工廠
JBossMQ
包括為topic和queue幾個(gè)不同的連接工廠,每個(gè)連接工廠有自己特性。當(dāng)通過(guò)JNDI來(lái)查找一個(gè)連接工廠時(shí),需要知道此連接工廠的名稱。所有可用連接工廠和它們的屬性,名稱都會(huì)在文件jbossmq-service.xml中。
有三種類型連接工廠,依賴的通訊協(xié)議如下:
OIL
快速雙向scoket通訊協(xié)議。它是缺省的。
UIL
超過(guò)一個(gè)socket協(xié)議,可以使用在通過(guò)防火墻訪問(wèn),或者當(dāng)客戶端不能正確的查找到服務(wù)器的IP地址。
RMI
最早的協(xié)議,是穩(wěn)定的,但是比較慢。
JVM
在JBoss
2.4之后增加的一個(gè)新的連接工廠類型。不需要用socket,當(dāng)客戶端和JbossMQ使用同樣虛擬機(jī)時(shí),可以使用。
在JBoss2.4.1以后版本中,對(duì)于topic-
和 queue-目的地,連接工廠使用同樣的名字。下表列出了在JBoss中JMS連接工廠:
目的地類型 JNDI名字
連接工廠類型
Topic/Queue java:/ConnectionFactory JVM
Topic/Queue
java:/XAConnectionFactory JVM支持XA事務(wù)
Topic/Queue RMIConnectionFactory
RMI
Topic/Queue RMIXAConnectionFactory RMI支持XA事務(wù)
Topic/Queue
ConnectionFactory OIL
Topic/Queue XAConnectionFactory
OIL支持XA事務(wù)
Topic/Queue UILConnectionFactory UIL
Topic/Queue
UILXAConnectionFactory UIL支持XA事務(wù)
3)
JBoss中高級(jí)JMS配置
在上邊段落主要描述了和JbossMQ一起實(shí)行的基本配置工作。在本段來(lái)描述JMS其他配置。
(1)
JMS持久性管理
JMS持久性管理(PM)負(fù)責(zé)存儲(chǔ)消息,并且將消息標(biāo)記為持久,如果服務(wù)器發(fā)生故障時(shí),能保證消息不會(huì)丟失,并允許恢復(fù)持久性消息。持久性JMS消息可以使用不同的方法來(lái)完成。每個(gè)方法有自己優(yōu)點(diǎn)和缺陷:
PM
名字 優(yōu)點(diǎn) 缺點(diǎn)
File 比較穩(wěn)定 速度慢
Rollinglogged 速度比較快 此方法比較新,有些地方需要完善
JDBC
對(duì)于穩(wěn)定性和可量測(cè)性比較好 必須有JDBC
Logged 速度快 Log files grow without bound, memory
management problems during
recovery
在JBoss中缺省的持久性消息管理是File持久性管理??梢愿淖兯?,但必須對(duì)于一個(gè)JMS
Server有且僅有一個(gè)持久性管理配置。所以你在JBoss管理界面的jboss.mq ?
>
service=PersistenceManager
只是看到一個(gè)。
持久性管理的配置文件是jbossmq-service.xml。在server..deploy下。
為了讓大家了解持久性管理的各種方法,我下面來(lái)逐個(gè)介紹如何配置。
w
File持久性管理
File持久性管理的概念就是為每一個(gè)持久性消息建立一個(gè)文件。消息持久性方法不是全部都能使用,但它是比較穩(wěn)定的。
File持久性管理在JBoss發(fā)布時(shí),作為一個(gè)缺省的持久性管理。如果你打開(kāi)jbossmq-service.xml文件,你會(huì)看到下面的XML:
db/jbossmq/file
當(dāng)設(shè)置Mbean配置時(shí),F(xiàn)ile持久性管理允許你指定下面的屬性:
DataDircetory
是存放持久性消息的路徑,會(huì)把生成的數(shù)據(jù)文件放在此目錄下。
w
設(shè)置Rollinglogged持久性管理
Rollinglogged持久性管理是比較新的一種持久性消息管理方法,因?yàn)樗褂萌罩疚募?lái)持續(xù)多個(gè)消息,所以當(dāng)建立一個(gè)文件時(shí),不需要許多的I/O。
定義Rollinglogged持久性管理:
db/jbossmq/file
Rollinglogged持久性管理中DataDirctory
存放持久性消息的路徑,會(huì)把生成的數(shù)據(jù)文件放在此目錄下。
w
設(shè)置JDBC持久性管理
JDBC持久性管理使用數(shù)據(jù)庫(kù)表來(lái)存儲(chǔ)消息。需要一個(gè)JBoss配置的數(shù)據(jù)源來(lái)訪問(wèn)數(shù)據(jù)庫(kù)。具體內(nèi)容參考jbossmq-service.xml文件中的內(nèi)容。
w
設(shè)置Logged持久性管理
Logged持久性管理是比較早的一個(gè),在JBoss2.4.1以后版本中不在建議使用。現(xiàn)在有其他更好的辦法。
4、舉例說(shuō)明
當(dāng)我們清楚了以后內(nèi)容后,現(xiàn)在我們來(lái)用JBoss實(shí)現(xiàn)一個(gè)例子來(lái)加深對(duì)JBoss和JMS的了解。
在上面敘述中,我們知道明確使用JMS
provider有三個(gè)基本的事情要做:配置JNDI初始化上下文,連接工廠的名字和使用目的地的名字。
當(dāng)編寫產(chǎn)品的最好的事情是不受provider-specific
影響,使代碼能在不同的JMS provider之間容易移植。在此這個(gè)例子沒(méi)有聚焦在開(kāi)發(fā)產(chǎn)品上,而是解釋如何使用JbossMQ來(lái)工作。
1)
初始化上下文
w
配置JNDI的一個(gè)方法是通過(guò)屬性文件jndi.properties。在這個(gè)文件中使用正確的值,并且把它所在的路徑包含到classpath中,它比較容獲得正確初始化上下文。
jndi.properties文件的內(nèi)容如下:
java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
java.naming.provider.url=localhost:1099
java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
把該文件放置的路徑成為你的classpath的一部分。如果你使用這種方法,在初始化上下文時(shí),代碼比較簡(jiǎn)單:
Context context = new IntialContext();1
w
在某些情景下,可能需要手工配置JNDI;例如當(dāng)運(yùn)行的類文件中環(huán)境已經(jīng)配置了一個(gè)初始化上下文,但不是你想用的上下文時(shí),需要手工來(lái)配置一個(gè)上下文。設(shè)置在哈希表中的幾個(gè)屬性值,并且使用此哈希表來(lái)實(shí)例化一個(gè)上下文。定義語(yǔ)法:
Hashtable
props = new
Hashtable();
props.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
props.put(Context.PROVIDER_URL,
"localhost:1099");
props.put("java.naming.rmi.security.manager",
"yes");
props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
2)
查找連接工廠
自有了上下文后,需要查找一個(gè)連接工廠。為了查找它,使用一個(gè)可用的名字。查找連接工廠的代碼如下:
對(duì)于一個(gè)topic目的地
TopicConnectionFactory
topicFactory = (TopicConnectionFactory) context.lookup
(“ConnectionFactory”)
Queue 目的地:
QueueConnectionFactory queueFactory =
(QueueConnectionFactory ) context.lookup (“ConnectionFactory”)
3)
建立連接和會(huì)話
在我們有了連接工廠后,建立一個(gè)連接,在此連接中建立一個(gè)會(huì)話。
對(duì)于topic代碼如下:
//建立一個(gè)連接
topicConnection
= topicFactory.createTopicConnection();
//建立一個(gè)會(huì)話
topicSession =
topicConnection.createTopicSession(false, //不需要事務(wù)
Session.AUTO_ACKNOLEDGE
//自動(dòng)接收消息的收條。
);
對(duì)于queue代碼如下:
//建立一個(gè)連接
queueConnection =
queueFactory.createQueueConnection();
//建立一個(gè)會(huì)話
queueSession =
queueConnection .createQueueSession(false, //不需要事務(wù)
Session.AUTO_ACKNOLEDGE
//自動(dòng)接收消息的收條。
);
一個(gè)會(huì)話建立時(shí),配置是否調(diào)用事務(wù)
在事務(wù)Session中,當(dāng)事務(wù)被提交后,自動(dòng)接收,如果事務(wù)回滾,所有的被消費(fèi)的消息將會(huì)被重新發(fā)送。
在非事務(wù)Session中,如果沒(méi)有調(diào)用事務(wù)處理,消息傳遞的方式有三種:
Session.AUTO_ACKNOWLEDGE
:當(dāng)客戶機(jī)調(diào)用的receive方法成功返回,或當(dāng)MessageListenser
成功處理了消息,session將會(huì)自動(dòng)接收消息的收條。
Session.CLIENT_ACKNOWLEDGE
:客戶機(jī)通過(guò)調(diào)用消息的acknowledge方法來(lái)接收消息。接收發(fā)生在session層。接收到一個(gè)被消費(fèi)的消息時(shí),將自動(dòng)接收該session已經(jīng)消費(fèi)的所有消息。例如:如果消息的消費(fèi)者消費(fèi)了10條消息,然后接收15個(gè)被傳遞的消息,則前面的10個(gè)消息的收據(jù)都會(huì)在這15個(gè)消息中被接收。
Session.DUPS_ACKNOWLEDGE
:指示session緩慢接收消息。
4) 查找目的地
現(xiàn)在我們來(lái)介紹建立publishes/sends
或subscribles/receives消息。
下面的代碼列出來(lái)查找一個(gè)目的地:
對(duì)于topic
查找一個(gè)testTopic目的地
Topic topic = (Topic)
context.lookup(“topic/testTopic”);
對(duì)于queue 查找一個(gè)testQueue目的地
Queue
queue= (Queue) context.lookup(“queue/testQueue”);
注意:JbossM的前綴topic/
(queue/)通常被放在topic (queue)名字前面。
在JMS中,當(dāng)客戶機(jī)扮演每種角色,象對(duì)于topic來(lái)將的publisher
,subscriber 或?qū)τ趒ueue來(lái)將的sender, receiver, 都有自己不同類繼承和不同函數(shù)。
5) 建立一個(gè)消息制造者M(jìn)essage
Producer (topic publisher/ queue
sender)
消息制造者是一個(gè)由session創(chuàng)建的對(duì)象,主要工作是發(fā)送消息到目的地。
對(duì)于一個(gè)topic,需要通過(guò)TopicSession來(lái)創(chuàng)建一個(gè)TopicPublisher。代碼如下:
TopicPublisher
topicPublisher =
TopicSession.createPublisher(topic);
對(duì)于一個(gè)queue,需要通過(guò)QueueSession來(lái)創(chuàng)建一個(gè)QueueSender。代碼如下:
QueuePublisher
queuePublisher = queueSession.createSender(queue);
6)
消息發(fā)送
建立一個(gè)TestMessage并且publish 它, 代碼:
TextMessage message =
topicSession.createTestMessage();
message.setText(msg);
topicPublishe.publish(topic,
message);
建立一個(gè)TestMessage并且send它, 代碼:
TextMessage message =
queueSession.createTestMessage();
message.setText(msg);
queueSender.send(queue,
message);
7) 下面是一個(gè)完成的topic publisher 代碼,文件名HelloPublisher.java
:
import javax.naming.Context;
import
javax.naming.InitialContext;
import javax.naming.NamingException;
import
javax.jms.TopicConnectionFactory;
import javax.jms.TopicConnection;
import
javax.jms.TopicSession;
import javax.jms.TopicPublisher;
import
javax.jms.Topic;
import javax.jms.TextMessage;
import
javax.jms.Session;
import javax.jms.JMSException;
import
java.util.Hashtable;
public class HelloPublisher {
TopicConnection
topicConnection;
TopicSession topicSession;
TopicPublisher
topicPublisher;
Topic topic;
public HelloPublisher(String factoryJNDI,
String topicJNDI)
throws JMSException, NamingException {
Hashtable
props=new
Hashtable();
props.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");
props.put(Context.PROVIDER_URL,
"localhost:1099");
props.put("java.naming.rmi.security.manager",
"yes");
props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
Context
context = new InitialContext(props);
TopicConnectionFactory topicFactory
=
(TopicConnectionFactory)context.lookup(factoryJNDI);
topicConnection =
topicFactory.createTopicConnection();
topicSession =
topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic =
(Topic)context.lookup(topicJNDI);
topicPublisher =
topicSession.createPublisher(topic);
}
public void publish(String
msg) throws JMSException {
TextMessage message =
topicSession.createTextMessage();
message.setText(msg);
topicPublisher.publish(topic,
message);
}
public void close() throws JMSException
{
topicSession.close();
topicConnection.close();
}
public static
void main(String[] args) {
try {
HelloPublisher publisher = new
HelloPublisher(
"ConnectionFactory", "topic/testTopic");
for (int i = 1; i
< 11; i++) {
String msg = "Hello World no. " +
i;
System.out.println("Publishing message: " +
msg);
publisher.publish(msg);
}
publisher.close();
} catch(Exception
ex) {
System.err.println(
"An exception occurred while testing
HelloPublisher25: " +
ex);
ex.printStackTrace();
}
}
}
我們知道,使用JMS不僅能發(fā)送(send)/發(fā)布(publish)消息,也能獲得(send)/發(fā)布(publish)的消息。在時(shí)間方式有良種方法來(lái)做:
w
同步(Synchronously):需要手工的去得到消息,為了得到一個(gè)消息客戶機(jī)調(diào)用方法得到消息,直到消息到達(dá)或在規(guī)定的時(shí)間內(nèi)沒(méi)有到達(dá)而超時(shí)。我們?cè)诶又袥](méi)有說(shuō)明這部分,大家可以實(shí)驗(yàn)一下。
w
異步(Asynchronously):你需要定義一個(gè)消息監(jiān)聽(tīng)器(MessageListener),實(shí)現(xiàn)該接口。當(dāng)消息達(dá)到時(shí),JMS
provider通過(guò)調(diào)用該對(duì)象的
onMessage方法來(lái)傳遞消息。
從原則來(lái)將,topic和queue都是異步的,但是在這兩種目的地中有不同的類和方法。首先,必須定義一個(gè)MessageListener接口。
8)
建立一個(gè)MessageListener
在建立了你需要的subscriber/receiver,并且登記了監(jiān)聽(tīng)器后。就可以調(diào)用連接的start方法得到JMS
provider 發(fā)送到的消息了。如果在登記監(jiān)聽(tīng)器之前調(diào)用start方法,很可能會(huì)丟失消息。
public void onMessage(Message
m) {
try {
String msg =
((TextMessage)m).getText();
System.out.println("HelloSubscriber got message:
" + msg);
} catch(JMSException ex) {
System.err.println("Could not get
text message: " + ex);
ex.printStackTrace();
}
}
9)
建立消息消費(fèi)者
對(duì)于topic來(lái)將:
//建立一個(gè)訂閱者
topicSubscriber =
topicSession.createSubscriber(topic);
//設(shè)置消息監(jiān)聽(tīng)器,
topicSubscriber.setMessageListener(this)
//連接開(kāi)始
topicConnection.start();
對(duì)于queue來(lái)將:
//建立一個(gè)訂閱者
queueReceiver
= queueSession.createReceiver(queue);
//設(shè)置消息監(jiān)聽(tīng)器,
queueReceiver
.setMessageListener(this)
//連接開(kāi)始
queueConnection.start();
10)
完整的代碼,放在文件HelloSubscriber.java中,如下:
import javax.naming.Context;
import
javax.naming.InitialContext;
import
javax.naming.NamingException;
import
javax.jms.TopicConnectionFactory;
import javax.jms.TopicConnection;
import
javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import
javax.jms.Topic;
import javax.jms.Message;
import
javax.jms.TextMessage;
import javax.jms.Session;
import
javax.jms.MessageListener;
import javax.jms.JMSException;
public class
HelloSubscriber implements MessageListener {
TopicConnection
topicConnection;
TopicSession topicSession;
TopicSubscriber
topicSubscriber;
Topic topic;
public HelloSubscriber(String factoryJNDI,
String topicJNDI)
throws JMSException, NamingException
{
Context
context = new InitialContext();
TopicConnectionFactory topicFactory
=
(TopicConnectionFactory)context.lookup(factoryJNDI);
topicConnection =
topicFactory.createTopicConnection();
topicSession =
topicConnection.createTopicSession(
false,
Session.AUTO_ACKNOWLEDGE);
topic =
(Topic)context.lookup(topicJNDI);
topicSubscriber =
topicSession.createSubscriber(topic);
topicSubscriber.setMessageListener(this);
System.out.println(
"HelloSubscriber
subscribed to topic: " + topicJNDI);
topicConnection.start();
}
public
void onMessage(Message m) {
try {
String msg =
((TextMessage)m).getText();
System.out.println("HelloSubscriber got message:
" + msg);
} catch(JMSException ex) {
System.err.println("Could not get
text message: " + ex);
ex.printStackTrace();
}
}
public void close()
throws JMSException
{
topicSession.close();
topicConnection.close();
}
public static
void main(String[] args) {
try {
HelloSubscriber subscriber = new
HelloSubscriber(
"TopicConnectionFactory",
"topic/testTopic");
}
catch(Exception ex) {
System.err.println(
"An exception occurred while
testing HelloSubscriber: " +
ex);
ex.printStackTrace();
}
}
}
11) 編輯、運(yùn)行程序
直接使用命令(java)
w 開(kāi)啟命令操作符。設(shè)置classpath :
set
classpath=C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.jar;C:jboss-3.0.6_tomcat-4.1.18clientjboss-j2ee.jar;C:jboss-3.0.6_tomcat-4.1.18clientjnp-client.jar;C:jboss-3.0.6_tomcat-4.1.18clientlog4j.jar;.
w
首先運(yùn)行訂閱消息端:java HelloSubscriber
w 再開(kāi)啟另外一個(gè)命令窗口設(shè)置classpath :
set
classpath=C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.jar;C:jboss-3.0.6_tomcat-4.1.18clientjboss-j2ee.jar;C:jboss-3.0.6_tomcat-4.1.18clientjnp-client.jar;C:jboss-3.0.6_tomcat-4.1.18clientlog4j.jar;.
w
運(yùn)行發(fā)布消息端:java
HelloPublisher
5、補(bǔ)充
在最后我們解釋JBoss-specific特性:如何用代碼來(lái)管理目的地。JBoss各個(gè)版本可能不同,但是差別不大。我使用的是jboss3.0.6。
實(shí)現(xiàn)這個(gè)目的有兩種不同的方法,依賴于是否代碼是在和JBoss同樣的虛擬機(jī)還是獨(dú)立獨(dú)立的。它們都包括調(diào)用一個(gè)通過(guò)service=DestinationManager
登記的JMX Bean。這個(gè)Mbean
有四個(gè)方法來(lái)管理目的地:createTopic(),createQueue(),destroyTopic(),destroyQueue()。
在代碼中實(shí)現(xiàn)管理目的地在影射怎樣調(diào)用MBean有不同的地方。如果程序虛擬機(jī)和Mbean服務(wù)器一樣,可以直接調(diào)用。
建立一個(gè)topic
目的地的代碼如下:
MBeanServer server =
(MBeanServer)
MBeanServerFactory.findMBeanServer(null).iterator().next();
server.invoke(new
ObjectName("JBossMQ", "service", "DestinationManager"),
method,
new
Object[] { “myTopic” },
new String[] { "java.lang.String"
});
如果程序和Mbean服務(wù)器的虛擬機(jī)不同,需要通過(guò)一個(gè)JMX adapter。一個(gè)JMX adapter是一個(gè)HTML
GUI。用程序通過(guò)URL來(lái)調(diào)用Mbean。代碼如下:
import java.io.InputStream;
import
java.net.URL;
import java.net.HttpURLConnection;
import
javax.management.MBeanServerFactory;
import
javax.management.MBeanServer;
import javax.management.ObjectName;
import
javax.jms.Topic;
import javax.jms.Queue;
public class DestinationHelper
{
static final String HOST = "localhost";
static final int PORT =
8080;
static final String BASE_URL_ARG = "/jmx-console/HtmlAdaptor?";
public static void createDestination(Class type, String name)
throws
Exception
{
String method = null;
if (type == Topic.class) { method =
"createTopic"; }
else if (type == Queue.class) { method =
"createQueue";}
invoke(method, name);
}
public static void
destroyDestination(Class type, String name)
throws Exception
{
String
method = null;
if (type == Topic.class) { method = "destroyTopic"; }
else
if (type == Queue.class) { method = "destroyQueue";}
invoke(method,
name);
}
protected static void invoke(String method, String destName)
throws Exception
{
try {
MBeanServer server = (MBeanServer)
MBeanServerFactory.findMBeanServer(null).iterator().next();
invokeViaMBean(method,
destName);
}catch(Exception ex) { invokeViaUrl(method, destName);}
}
protected static void invokeViaUrl(String method, String destName)
throws Exception
{
String action =
"action=invokeOp&methodIndex=6&name=jboss.mq%3Aservice%3DDestinationManager&arg0="
+ destName;
String arg = BASE_URL_ARG + action;
URL url = new URL("http",
HOST, PORT, arg);
HttpURLConnection con =
(HttpURLConnection)url.openConnection();
con.connect();
InputStream is
= con.getInputStream();
java.io.ByteArrayOutputStream os = new
java.io.ByteArrayOutputStream();
byte[] buff = new byte[1024];
for(;;)
{
int size = is.read( buff );
if (size == -1 ) { break;
}
os.write(buff, 0, size);
}
os.flush();
if
(con.getResponseCode() != HttpURLConnection.HTTP_OK ) {
throw new Exception
("Could not invoke url: " + con.getResponseMessage() );
} else
{
System.out.println("Invoked URL: " + method + " for destination " +
destName + "got resonse: " + os.toString());
}
}
protected static void
invokeViaMBean(String method, String destName)
throws
Exception
{
MBeanServer server =
(MBeanServer)MBeanServerFactory.findMBeanServer(null).iterator().next();
server.invoke(new
ObjectName("JBossMQ", "service", "DestinationManager"),
method,
new
Object[] { destName },
new String[] { "java.lang.String" });
}
public
static void main(String[] args) {
try {
if (args.length
>0){
destroyDestination(Topic.class,"myCreated");
}else
{
createDestination(Topic.class,"myCreated");
}
}catch(Exception ex)
{
System.out.println("Error in administering destination: " +
ex);
ex.printStackTrace();
}
}
}
編輯命令:
javac -classpath
C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.jar;C:jboss-3.0.6_tomcat-4.1.18libjboss-jmx.jar;.
DestinationHelper.java
運(yùn)行命令
java -classpath
C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.jar;C:jboss-3.0.6_tomcat-4.1.18libjboss-jmx.jar;.
DestinationHelper
當(dāng)運(yùn)行完后查看http://localhost:8080/jmx-console下面的jboss.mq.destination中有name=myCreated,service=Topic
表明你建立成功。當(dāng)JBoss關(guān)閉重新啟動(dòng)時(shí)。該目的地不會(huì)在存在。
到此,關(guān)于“如何在Jboss中使用JMS”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!