用異步輸入輸出流編寫Socket進(jìn)程通信程序
創(chuàng)新互聯(lián)專業(yè)為企業(yè)提供儀隴網(wǎng)站建設(shè)、儀隴做網(wǎng)站、儀隴網(wǎng)站設(shè)計、儀隴網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計與制作、儀隴企業(yè)網(wǎng)站模板建站服務(wù),10余年儀隴做網(wǎng)站經(jīng)驗,不只是建網(wǎng)站,更提供有價值的思路和整體網(wǎng)絡(luò)服務(wù)。
在Merlin中加入了用于實現(xiàn)異步輸入輸出機(jī)制的應(yīng)用程序接口包:java.nio(新的輸入輸出包,定義了很多基本類型緩沖(Buffer)),java.nio.channels(通道及選擇器等,用于異步輸入輸出),java.nio.charset(字符的編碼解碼)。通道(Channel)首先在選擇器(Selector)中注冊自己感興趣的事件,當(dāng)相應(yīng)的事件發(fā)生時,選擇器便通過選擇鍵(SelectionKey)通知已注冊的通道。然后通道將需要處理的信息,通過緩沖(Buffer)打包,編碼/解碼,完成輸入輸出控制。
通道介紹:
這里主要介紹ServerSocketChannel和 SocketChannel.它們都是可選擇的(selectable)通道,分別可以工作在同步和異步兩種方式下(注意,這里的可選擇不是指可以選擇兩種工作方式,而是指可以有選擇的注冊自己感興趣的事件)。可以用channel.configureBlocking(Boolean )來設(shè)置其工作方式。與以前版本的API相比較,ServerSocketChannel就相當(dāng)于ServerSocket(ServerSocketChannel封裝了ServerSocket),而SocketChannel就相當(dāng)于Socket(SocketChannel封裝了Socket)。當(dāng)通道工作在同步方式時,編程方法與以前的基本相似,這里主要介紹異步工作方式。
所謂異步輸入輸出機(jī)制,是指在進(jìn)行輸入輸出處理時,不必等到輸入輸出處理完畢才返回。所以異步的同義語是非阻塞(None Blocking)。在服務(wù)器端,ServerSocketChannel通過靜態(tài)函數(shù)open()返回一個實例serverChl。然后該通道調(diào)用serverChl.socket().bind()綁定到服務(wù)器某端口,并調(diào)用register(Selector sel, SelectionKey.OP_ACCEPT)注冊O(shè)P_ACCEPT事件到一個選擇器中(ServerSocketChannel只可以注冊O(shè)P_ACCEPT事件)。當(dāng)有客戶請求連接時,選擇器就會通知該通道有客戶連接請求,就可以進(jìn)行相應(yīng)的輸入輸出控制了;在客戶端,clientChl實例注冊自己感興趣的事件后(可以是OP_CONNECT,OP_READ,OP_WRITE的組合),調(diào)用clientChl.connect(InetSocketAddress )連接服務(wù)器然后進(jìn)行相應(yīng)處理。注意,這里的連接是異步的,即會立即返回而繼續(xù)執(zhí)行后面的代碼。
選擇器和選擇鍵介紹:
選擇器(Selector)的作用是:將通道感興趣的事件放入隊列中,而不是馬上提交給應(yīng)用程序,等已注冊的通道自己來請求處理這些事件。換句話說,就是選擇器將會隨時報告已經(jīng)準(zhǔn)備好了的通道,而且是按照先進(jìn)先出的順序。那么,選擇器是通過什么來報告的呢?選擇鍵(SelectionKey)。選擇鍵的作用就是表明哪個通道已經(jīng)做好了準(zhǔn)備,準(zhǔn)備干什么。你也許馬上會想到,那一定是已注冊的通道感興趣的事件。不錯,例如對于服務(wù)器端serverChl來說,可以調(diào)用key.isAcceptable()來通知serverChl有客戶端連接請求。相應(yīng)的函數(shù)還有:SelectionKey.isReadable(),SelectionKey.isWritable()。一般的,在一個循環(huán)中輪詢感興趣的事件(具體可參照下面的代碼)。如果選擇器中尚無通道已注冊事件發(fā)生,調(diào)用Selector.select()將阻塞,直到有事件發(fā)生為止。另外,可以調(diào)用selectNow()或者select(long timeout)。前者立即返回,沒有事件時返回0值;后者等待timeout時間后返回。一個選擇器最多可以同時被63個通道一起注冊使用。
應(yīng)用實例:
下面是用異步輸入輸出機(jī)制實現(xiàn)的客戶/服務(wù)器實例程序――程序清單1(限于篇幅,只給出了服務(wù)器端實現(xiàn),讀者可以參照著實現(xiàn)客戶端代碼):
程序類圖
public class NBlockingServer {
int port = 8000;
int BUFFERSIZE = 1024;
Selector selector = null;
ServerSocketChannel serverChannel = null;
HashMap clientChannelMap = null;//用來存放每一個客戶連接對應(yīng)的套接字和通道
public NBlockingServer( int port ) {
this.clientChannelMap = new HashMap();
this.port = port;
}
public void initialize() throws IOException {
//初始化,分別實例化一個選擇器,一個服務(wù)器端可選擇通道
this.selector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
this.serverChannel.configureBlocking(false);
InetAddress localhost = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(localhost, this.port );
this.serverChannel.socket().bind(isa);//將該套接字綁定到服務(wù)器某一可用端口
}
//結(jié)束時釋放資源
public void finalize() throws IOException {
this.serverChannel.close();
this.selector.close();
}
//將讀入字節(jié)緩沖的信息解碼
public String decode( ByteBuffer byteBuffer ) throws
CharacterCodingException {
Charset charset = Charset.forName( "ISO-8859-1" );
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode( byteBuffer );
String result = charBuffer.toString();
return result;
}
//監(jiān)聽端口,當(dāng)通道準(zhǔn)備好時進(jìn)行相應(yīng)操作
public void portListening() throws IOException, InterruptedException {
//服務(wù)器端通道注冊O(shè)P_ACCEPT事件
SelectionKey acceptKey =this.serverChannel.register( this.selector,
SelectionKey.OP_ACCEPT );
//當(dāng)有已注冊的事件發(fā)生時,select()返回值將大于0
while (acceptKey.selector().select() 0 ) {
System.out.println("event happened");
//取得所有已經(jīng)準(zhǔn)備好的所有選擇鍵
Set readyKeys = this.selector.selectedKeys();
//使用迭代器對選擇鍵進(jìn)行輪詢
Iterator i = readyKeys.iterator();
while (i
else if ( key.isReadable() ) {//如果是通道讀準(zhǔn)備好事件
System.out.println("Readable");
//取得選擇鍵對應(yīng)的通道和套接字
SelectableChannel nextReady =
(SelectableChannel) key.channel();
Socket socket = (Socket) key.attachment();
//處理該事件,處理方法已封裝在類ClientChInstance中
this.readFromChannel( socket.getChannel(),
(ClientChInstance)
this.clientChannelMap.get( socket ) );
}
else if ( key.isWritable() ) {//如果是通道寫準(zhǔn)備好事件
System.out.println("writeable");
//取得套接字后處理,方法同上
Socket socket = (Socket) key.attachment();
SocketChannel channel = (SocketChannel)
socket.getChannel();
this.writeToChannel( channel,"This is from server!");
}
}
}
}
//對通道的寫操作
public void writeToChannel( SocketChannel channel, String message )
throws IOException {
ByteBuffer buf = ByteBuffer.wrap( message.getBytes() );
int nbytes = channel.write( buf );
}
//對通道的讀操作
public void readFromChannel( SocketChannel channel, ClientChInstance clientInstance )
throws IOException, InterruptedException {
ByteBuffer byteBuffer = ByteBuffer.allocate( BUFFERSIZE );
int nbytes = channel.read( byteBuffer );
byteBuffer.flip();
String result = this.decode( byteBuffer );
//當(dāng)客戶端發(fā)出”@exit”退出命令時,關(guān)閉其通道
if ( result.indexOf( "@exit" ) = 0 ) {
channel.close();
}
else {
clientInstance.append( result.toString() );
//讀入一行完畢,執(zhí)行相應(yīng)操作
if ( result.indexOf( "\n" ) = 0 ){
System.out.println("client input"+result);
clientInstance.execute();
}
}
}
//該類封裝了怎樣對客戶端的通道進(jìn)行操作,具體實現(xiàn)可以通過重載execute()方法
public class ClientChInstance {
SocketChannel channel;
StringBuffer buffer=new StringBuffer();
public ClientChInstance( SocketChannel channel ) {
this.channel = channel;
}
public void execute() throws IOException {
String message = "This is response after reading from channel!";
writeToChannel( this.channel, message );
buffer = new StringBuffer();
}
//當(dāng)一行沒有結(jié)束時,將當(dāng)前字竄置于緩沖尾
public void append( String values ) {
buffer.append( values );
}
}
//主程序
public static void main( String[] args ) {
NBlockingServer nbServer = new NBlockingServer(8000);
try {
nbServer.initialize();
} catch ( Exception e ) {
e.printStackTrace();
System.exit( -1 );
}
try {
nbServer.portListening();
}
catch ( Exception e ) {
e.printStackTrace();
}
}
}
程序清單1
小結(jié):
從以上程序段可以看出,服務(wù)器端沒有引入多余線程就完成了多客戶的客戶/服務(wù)器模式。該程序中使用了回調(diào)模式(CALLBACK)。需要注意的是,請不要將原來的輸入輸出包與新加入的輸入輸出包混用,因為出于一些原因的考慮,這兩個包并不兼容。即使用通道時請使用緩沖完成輸入輸出控制。該程序在Windows2000,J2SE1.4下,用telnet測試成功。
Thread t=new Thread(){
public void run(){
//保存信息操作
}
}
t.start();
//同時做別的事情.
通常同步意味著一個任務(wù)的某個處理過程會對多個線程在用串行化處理,而異步則意味著某個處理過程可以允許多個線程同時處理。異步通常代表著更好的性能,因為它很大程度上依賴于緩沖,是典型的使用空間換時間的做法,例如在計算機(jī)當(dāng)中,高速緩存作為cpu和磁盤io之間的緩沖地帶協(xié)調(diào)cpu高速計算能力和磁盤的低速讀寫能力。
(1):重新啟動一個java程序就啟動了一個進(jìn)程
可以用操作系統(tǒng)命令行啟動 Runtime.getRuntime().exec("java -classpath . XXX");
(2):可不可以在接收消息的模塊中的addtolist函數(shù)中添加一個專門的處理函數(shù),函數(shù)執(zhí)行時先向list中添加消息,然后探測當(dāng)前有沒有處理線程,如果沒有,則啟動線程。
(3):想省點工作,可以用BlockingQueue來代替list,這樣線程等待和喚醒不用寫代碼實現(xiàn)了,如果非要用list,那么就做好同步
list的小例子:
Java codeclass MessageConsumer extends Thead { ? ?private ListYourMessageType list; ? ?private boolean running = true; ? ?public MessageConsumer(ListYourMessageType list) {this.list = list;} ? ?public void run() { ? ? ? ?while (running) { ? ? ? ? ? ?YourMessageType msg = null; ? ? ? ? ? ? try { ? ? ? ? ? ? ? ?synchronized(list) { ? ? ? ? ? ? ? ? ? ?while (list.size() == 0) { ? ? ? ? ? ? ? ? ? ? ? ?list.wait(); ? ? ? ? ? ? ? ? ? ?} ? ? ? ? ? ? ? ? ? ?msg = list.remove(0); ? ? ? ? ? ? ? ? ? ?list.notiryAll(); ? ? ? ? ? ? ? ?} ? ? ? ? ? ?} catch (Exception e) { ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? ?} ? ? ? ? ? ?if (msg == null) continue; ? ? ? ? ? ?//System.out.println(msg); //print message ? ? ? ?} ? ?}}//調(diào)用sampleclass ShareModule { ? ?ListYourMessageType list = new ArrayListYourMessageType(); ? ?...}public class Main { ? ?public static void main(String[] args) { ? ? ? ?ShareMudule sm; //so on ? ? ? ?... ? ? ? ?Thread t = new MessageConsumer(sm.list); ? ? ? ?t.start(); ? ? ? ?... ? ?}}
1. 使用wait和notify方法
這個方法其實是利用了鎖機(jī)制,直接貼代碼:
public class Demo1 extends BaseDemo{ private final Object lock = new Object(); @Override public void callback(long response) { System.out.println("得到結(jié)果"); System.out.println(response); System.out.println("調(diào)用結(jié)束"); synchronized (lock) { lock.notifyAll(); } } public static void main(String[] args) { Demo1 demo1 = new Demo1(); demo1.call(); synchronized (demo1.lock){ try { demo1.lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("主線程內(nèi)容"); } }
可以看到在發(fā)起調(diào)用后,主線程利用wait進(jìn)行阻塞,等待回調(diào)中調(diào)用notify或者notifyAll方法來進(jìn)行喚醒。注意,和大家認(rèn)知的一樣,這里wait和notify都是需要先獲得對象的鎖的。在主線程中最后我們打印了一個內(nèi)容,這也是用來驗證實驗結(jié)果的,如果沒有wait和notify,主線程內(nèi)容會緊隨調(diào)用內(nèi)容立刻打印;而像我們上面的代碼,主線程內(nèi)容會一直等待回調(diào)函數(shù)調(diào)用結(jié)束才會進(jìn)行打印。
沒有使用同步操作的情況下,打印結(jié)果:發(fā)起調(diào)用 調(diào)用返回 主線程內(nèi)容 得到結(jié)果 1 調(diào)用結(jié)束
而使用了同步操作后:
發(fā)起調(diào)用 調(diào)用返回 得到結(jié)果 9 調(diào)用結(jié)束 主線程內(nèi)容2. 使用條件鎖
和方法一的原理類似:
public class Demo2 extends BaseDemo { private final Lock lock = new ReentrantLock(); private final Condition con = lock.newCondition(); @Override public void callback(long response) { System.out.println("得到結(jié)果"); System.out.println(response); System.out.println("調(diào)用結(jié)束"); lock.lock(); try { con.signal(); }finally { lock.unlock(); } } public static void main(String[] args) { Demo2 demo2 = new Demo2(); demo2.call(); demo2.lock.lock(); try { demo2.con.await(); } catch (InterruptedException e) { e.printStackTrace(); }finally { demo2.lock.unlock(); } System.out.println("主線程內(nèi)容"); } }
基本上和方法一沒什么區(qū)別,只是這里使用了條件鎖,兩者的鎖機(jī)制有所不同。
在整個思路上要調(diào)整一下
1、會有很多線程給一個隊列上添加任務(wù)
2、有一個或者多個線程逐個執(zhí)行隊列的任務(wù)
考慮一下幾點:
1、沒有任務(wù)時,隊列執(zhí)行線程處于等待狀態(tài)
2、添加任務(wù)時,激活隊列執(zhí)行線程,全部run起來,首先搶到任務(wù)的執(zhí)行,其他全部wait
給個小例子吧
package?org;
import?java.util.LinkedList;
import?java.util.List;
public?class?Queues?{
public?static?ListTask?queue?=?new?LinkedListTask();
/**
?*?假如?參數(shù)o?為任務(wù)
?*?@param?o
?*/
public?static?void?add?(Task?t){
synchronized?(Queues.queue)?{
Queues.queue.add(t);?//添加任務(wù)
Queues.queue.notifyAll();//激活該隊列對應(yīng)的執(zhí)行線程,全部Run起來
}
}
static?class?Task{
public?void?test(){
System.out.println("我被執(zhí)行了");
}
}
}
package?org;
import?java.util.List;
public?class?Exec?implements?Runnable{
@Override
public?void?run()?{
while(true){
synchronized?(Queues.queue)?{
while(Queues.queue.isEmpty()){?//
try?{
Queues.queue.wait();?//隊列為空時,使線程處于等待狀態(tài)
}?catch?(InterruptedException?e)?{
e.printStackTrace();
}
System.out.println("wait...");
}
Queues.Task?t=?Queues.queue.remove(0);?//得到第一個
t.test();?//執(zhí)行該任務(wù)
System.out.println("end");
}
}
}
public?static?void?main(String[]?args)?{
Exec?e?=?new?Exec();
for?(int?i?=?0;?i??2;?i++)?{
new?Thread(e).start();?//開始執(zhí)行時,隊列為空,處于等待狀態(tài)
}
//上面開啟兩個線程執(zhí)行隊列中的任務(wù),那就是先到先得了
//添加一個任務(wù)測試
Queues.Task?t?=new?Queues.Task();
Queues.add(t);?//執(zhí)行該方法,激活所有對應(yīng)隊列,那兩個線程就會開始執(zhí)行啦
}
}
上面的就是很簡單的例子了