本文介紹了spring集成mina實(shí)現(xiàn)服務(wù)端主動(dòng)推送(包含心跳檢測(cè)),分享給大家,具體如下:
成都創(chuàng)新互聯(lián)一直通過網(wǎng)站建設(shè)和網(wǎng)站營銷幫助企業(yè)獲得更多客戶資源。 以"深度挖掘,量身打造,注重實(shí)效"的一站式服務(wù),以成都網(wǎng)站建設(shè)、網(wǎng)站建設(shè)、移動(dòng)互聯(lián)產(chǎn)品、成都全網(wǎng)營銷推廣服務(wù)為核心業(yè)務(wù)。10余年網(wǎng)站制作的經(jīng)驗(yàn),使用新網(wǎng)站建設(shè)技術(shù),全新開發(fā)出的標(biāo)準(zhǔn)網(wǎng)站,不但價(jià)格便宜而且實(shí)用、靈活,特別適合中小公司網(wǎng)站制作。網(wǎng)站管理系統(tǒng)簡(jiǎn)單易用,維護(hù)方便,您可以完全操作網(wǎng)站資料,是中小公司快速網(wǎng)站建設(shè)的選擇。
服務(wù)端
1.常規(guī)的spring工程集成mina時(shí),pom.xml中需要加入如下配置:
org.slf4j slf4j-jdk14 1.7.7 org.apache.mina mina-integration-beans 2.0.13 org.apache.mina mina-core 2.0.13 bundle compile org.apache.mina mina-integration-spring 1.1.7
注意此處mina-core的配置寫法。如果工程中引入上述依賴之后報(bào)錯(cuò):Missing artifact xxx bundle,則需要在pom.xml的plugins之間加入如下插件配置:
org.apache.felix maven-bundle-plugin true
2.Filter1:編解碼器,實(shí)現(xiàn)ProtocolCodecFactory解碼工廠
package com.he.server; import java.nio.charset.Charset; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineDecoder; import org.apache.mina.filter.codec.textline.TextLineEncoder; public class MyCodeFactory implements ProtocolCodecFactory { private final TextLineEncoder encoder; private final TextLineDecoder decoder; public MyCodeFactory() { this(Charset.forName("utf-8")); } public MyCodeFactory(Charset charset) { encoder = new TextLineEncoder(charset, LineDelimiter.UNIX); decoder = new TextLineDecoder(charset, LineDelimiter.AUTO); } public ProtocolDecoder getDecoder(IoSession arg0) throws Exception { // TODO Auto-generated method stub return decoder; } public ProtocolEncoder getEncoder(IoSession arg0) throws Exception { // TODO Auto-generated method stub return encoder; } public int getEncoderMaxLineLength() { return encoder.getMaxLineLength(); } public void setEncoderMaxLineLength(int maxLineLength) { encoder.setMaxLineLength(maxLineLength); } public int getDecoderMaxLineLength() { return decoder.getMaxLineLength(); } public void setDecoderMaxLineLength(int maxLineLength) { decoder.setMaxLineLength(maxLineLength); } }
此處使用了mina自帶的TextLineEncoder編解碼器,此解碼器支持使用固定長(zhǎng)度或者固定分隔符來區(qū)分上下兩條消息。如果要使用自定義協(xié)議,則需要自己編寫解碼器。要使用websocket,也需要重新編寫解碼器,關(guān)于mina結(jié)合websocket,jira上有一個(gè)開源項(xiàng)目https://issues.apache.org/jira/browse/DIRMINA-907,專門為mina編寫了支持websocket的編解碼器,親測(cè)可用。。。此部分不是本文重點(diǎn),略。
3.Filter2:心跳工廠,加入心跳檢測(cè)功能需要實(shí)現(xiàn)KeepAliveMessageFactory:
package com.he.server; import org.apache.log4j.Logger; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.keepalive.KeepAliveMessageFactory; public class MyKeepAliveMessageFactory implements KeepAliveMessageFactory{ private final Logger LOG = Logger.getLogger(MyKeepAliveMessageFactory.class); /** 心跳包內(nèi)容 */ private static final String HEARTBEATREQUEST = "1111"; private static final String HEARTBEATRESPONSE = "1112"; public Object getRequest(IoSession session) { LOG.warn("請(qǐng)求預(yù)設(shè)信息: " + HEARTBEATREQUEST); return HEARTBEATREQUEST; } public Object getResponse(IoSession session, Object request) { LOG.warn("響應(yīng)預(yù)設(shè)信息: " + HEARTBEATRESPONSE); /** 返回預(yù)設(shè)語句 */ return HEARTBEATRESPONSE; } public boolean isRequest(IoSession session, Object message) { LOG.warn("請(qǐng)求心跳包信息: " + message); if (message.equals(HEARTBEATREQUEST)) return true; return false; } public boolean isResponse(IoSession session, Object message) { LOG.warn("響應(yīng)心跳包信息: " + message); if(message.equals(HEARTBEATRESPONSE)) return true; return false; } }
此處我設(shè)置服務(wù)端發(fā)送的心跳包是1111,客戶端應(yīng)該返回1112.
4.實(shí)現(xiàn)必不可少的IoHandlerAdapter,得到監(jiān)聽事件處理權(quán):
package com.he.server; import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; public class MyHandler extends IoHandlerAdapter { //private final int IDLE = 3000;//(單位s) private final Logger LOG = Logger.getLogger(MyHandler.class); // public static Setsessions = Collections.synchronizedSet(new HashSet ()); public static ConcurrentHashMap sessionsConcurrentHashMap = new ConcurrentHashMap (); @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { session.closeOnFlush(); LOG.warn("session occured exception, so close it." + cause.getMessage()); } @Override public void messageReceived(IoSession session, Object message) throws Exception { String str = message.toString(); LOG.warn("客戶端" + ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress() + "連接成功!"); session.setAttribute("type", message); String remoteAddress = ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress(); session.setAttribute("ip", remoteAddress); LOG.warn("
此處有幾點(diǎn)說明:
第一點(diǎn):網(wǎng)上教程會(huì)在此處(sessionOpened()方法中)設(shè)置IDLE,IDLE表示session經(jīng)過多久判定為空閑的時(shí)間,單位s,上述代碼中已經(jīng)注釋掉了,因?yàn)楹竺嬖趕pring配置中加入心跳檢測(cè)部分時(shí)會(huì)進(jìn)行IDLE的配置,已經(jīng)不需要在此處進(jìn)行配置了,而且如果在心跳配置部分和此處都對(duì)BOTH_IDLE模式設(shè)置了空閑時(shí)間,親測(cè)發(fā)現(xiàn)此處配置不生效。
第二點(diǎn):關(guān)于存放session的容器,建議使用
而不是用已經(jīng)注釋掉的Collections.synchronizedSet類型的set或者map,至于原因,java5中新增了ConcurrentMap接口和它的一個(gè)實(shí)現(xiàn)類ConcurrentHashMap,可以保證線程的足夠安全。詳細(xì)的知識(shí)你應(yīng)該搜索SynchronizedMap和ConcurrentHashMap的區(qū)別,學(xué)習(xí)更加多的并發(fā)安全知識(shí)。
上述代碼中,每次在收到客戶端的消息時(shí),我會(huì)返回一段文本:welcome by he。
有了map,主動(dòng)推送就不是問題了,想推給誰,在map中找到誰就可以了。
5.完成spring的配置工作
<?xml version="1.0" encoding="UTF-8"?>
好了,xml中已經(jīng)寫了足夠多的注釋了。說明一下關(guān)于心跳檢測(cè)中的最后一個(gè)屬性:forwardEvent,默認(rèn)false,比如在心跳頻率為5s時(shí),實(shí)際上每5s會(huì)觸發(fā)一次KeepAliveFilter中的session_idle事件,該事件中開始發(fā)送心跳包。當(dāng)此參數(shù)設(shè)置為false時(shí),對(duì)于session_idle事件不再傳遞給其他filter,如果設(shè)置為true,則會(huì)傳遞給其他filter,例如handler中的session_idle事件,此時(shí)也會(huì)被觸發(fā)。另外IdleStatus一共有三個(gè)值,點(diǎn)擊進(jìn)源碼就能看到。
6.寫main方法啟動(dòng)服務(wù)端
package com.he.server; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class MainTest { public static void main(String[] args) { ClassPathXmlApplicationContext ct = new ClassPathXmlApplicationContext("applicationContext.xml"); } }
run之后,端口就已經(jīng)開始監(jiān)聽了。此處,如果是web工程,使用tomcat之類的容器,只要在web.xml中配置了
contextConfigLocation /WEB-INF/classes/applicationContext.xml org.springframework.web.context.ContextLoaderListener
則容器在啟動(dòng)時(shí)就會(huì)加載spring的配置文件,端口的監(jiān)聽就開始了,這樣就不需要main方法來啟動(dòng)。
客戶端,本文采用兩種方式來實(shí)現(xiàn)客戶端
方式一:用mina結(jié)構(gòu)來實(shí)現(xiàn)客戶端,引入mina相關(guān)jar包即可,Android也可以使用
1.先實(shí)現(xiàn)IoHandlerAdater得到監(jiān)聽事件,類似于服務(wù)端:
package com.he.client.minaclient; import org.apache.log4j.Logger; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; public class ClientHandler extends IoHandlerAdapter{ private final Logger LOG = Logger.getLogger(ClientHandler.class); @Override public void messageReceived(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub LOG.warn("客戶端收到消息:" + message); if (message.toString().equals("1111")) { //收到心跳包 LOG.warn("收到心跳包"); session.write("1112"); } } @Override public void messageSent(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub super.messageSent(session, message); } @Override public void sessionClosed(IoSession session) throws Exception { // TODO Auto-generated method stub super.sessionClosed(session); } @Override public void sessionCreated(IoSession session) throws Exception { // TODO Auto-generated method stub super.sessionCreated(session); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { // TODO Auto-generated method stub super.sessionIdle(session, status); } @Override public void sessionOpened(IoSession session) throws Exception { // TODO Auto-generated method stub super.sessionOpened(session); } }
2.寫main方法啟動(dòng)客戶端,連接服務(wù)端:
package com.he.client.minaclient; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; public class ClientTest { public static void main(String[] args) throws InterruptedException { //創(chuàng)建客戶端連接器. NioSocketConnector connector = new NioSocketConnector(); connector.getFilterChain().addLast("logger", new LoggingFilter()); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8")))); //設(shè)置編碼過濾器 connector.setHandler(new ClientHandler());//設(shè)置事件處理器 ConnectFuture cf = connector.connect(new InetSocketAddress("127.0.0.1", 8888));//建立連接 cf.awaitUninterruptibly();//等待連接創(chuàng)建完成 cf.getSession().write("hello,測(cè)試!");//發(fā)送消息,中英文都有 //cf.getSession().closeOnFlush(); //cf.getSession().getCloseFuture().awaitUninterruptibly();//等待連接斷開 //connector.dispose(); } }
過程也是一樣的,加各種filter,綁定handler。上述代碼運(yùn)行之后向服務(wù)器發(fā)送了:“hello,測(cè)試”,并且收到返回值:welcome by he。然后每隔5s,就會(huì)收到服務(wù)端的心跳包:1111。在handler的messageReceived中,確認(rèn)收到心跳包之后返回1112,實(shí)現(xiàn)心跳應(yīng)答。以上過程,每5s重復(fù)一次。
main方法中最后三行注釋掉的代碼如果打開,客戶端在發(fā)送完消息之后會(huì)主動(dòng)斷開。
方式二:客戶端不借助于mina,換用java的普通socket來實(shí)現(xiàn),這樣就可以換成其他任何語言:
package com.he.client; import java.io.DataInputStream; import java.io.IOException; import java.io.PrintWriter; import java.net.Socket; /** *@function:java的簡(jiǎn)單socket連接,長(zhǎng)連接,嘗試連續(xù)從服務(wù)器獲取消息 *@parameter: *@return: *@date:2016-6-22 下午03:43:18 *@author:he *@notice: */ public class SocketTestTwo { public static final String IP_ADDR = "127.0.0.1";// 服務(wù)器地址 public static final int PORT = 8888;// 服務(wù)器端口號(hào) static String text = null; public static void main(String[] args) throws IOException { System.out.println("客戶端啟動(dòng)..."); Socket socket = null; socket = new Socket(IP_ADDR, PORT); PrintWriter os = new PrintWriter(socket.getOutputStream()); os.println("al"); os.println("two"); os.flush(); while (true) { try { // 創(chuàng)建一個(gè)流套接字并將其連接到指定主機(jī)上的指定端口號(hào) DataInputStream input = new DataInputStream(socket.getInputStream()); // 讀取服務(wù)器端數(shù)據(jù) byte[] buffer; buffer = new byte[input.available()]; if (buffer.length != 0) { System.out.println("length=" + buffer.length); // 讀取緩沖區(qū) input.read(buffer); // 轉(zhuǎn)換字符串 String three = new String(buffer); System.out.println("內(nèi)容=" + three); if (three.equals("1111\n")) { System.out.println("發(fā)送返回心跳包"); os = new PrintWriter(socket.getOutputStream()); os.println("1112"); os.flush(); } } } catch (Exception e) { System.out.println("客戶端異常:" + e.getMessage()); os.close(); } } } }
以上代碼運(yùn)行效果和前一種方式完全一樣。
但是注意此種方法和使用mina結(jié)構(gòu)的客戶端中有一處不同:對(duì)于心跳包的判斷。本教程中服務(wù)端選用了mina自帶的編解碼器,通過換行符來區(qū)分上下兩條消息,也就是每一條消息后面會(huì)帶上一個(gè)換行符,所以在使用java普通的socket來連接時(shí),判斷心跳包不再是判斷是否為“1111”,而是“1111\n”。對(duì)比mina結(jié)構(gòu)的客戶端中并不需要加上換行符是因?yàn)榭蛻舳酥薪壎讼嗤木幗獯a器。
程序運(yùn)行結(jié)果截圖:
服務(wù)端:
客戶端:
紅色的打印是mina自帶的打印信息,黑色的是本工程中使用的log4j打印,所以你們的工程應(yīng)該配置有如下log4j的配置文件才能看到一樣的打印結(jié)果:
log4j.rootLogger=WARN,stdout log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.Threshold=WARN log4j.appender.stdout.layout.ConversionPattern = [%-5p] [%d{yyyy-MM-dd HH\:mm\:ss,SSS}] [%x] %c %l - %m%n
應(yīng)大家需求,工程代碼終于抽空放到github了! https://github.com/smile326/minaSpring
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持創(chuàng)新互聯(lián)。