本篇內(nèi)容主要講解“netty的怎么實現(xiàn)及運用到gmq中”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“netty的怎么實現(xiàn)及運用到gmq中”吧!
成都創(chuàng)新互聯(lián)是一家專注于成都網(wǎng)站建設(shè)、做網(wǎng)站與策劃設(shè)計,周至網(wǎng)站建設(shè)哪家好?成都創(chuàng)新互聯(lián)做網(wǎng)站,專注于網(wǎng)站建設(shè)10多年,網(wǎng)設(shè)計領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:周至等地區(qū)。周至做網(wǎng)站價格咨詢:18980820575
書接上文手寫MQ框架(三)-客戶端實現(xiàn) ,前面通過web的形式實現(xiàn)了mq的服務(wù)端和客戶端,現(xiàn)在計劃使用netty來改造一下。前段時間學(xué)習(xí)了一下netty的使用。大概有一些想法。
netty封裝了socket的使用,我們通過簡單的調(diào)用即可構(gòu)建高性能的網(wǎng)絡(luò)應(yīng)用。我計劃采用以下例子來對gmq進(jìn)行改造。
Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序。
netty是一個java框架,是網(wǎng)絡(luò)編程框架,支持異步、事件驅(qū)動的特性,所以性能表現(xiàn)很好。
Handler是處理器,handler 是由 Netty 生成用來處理 I/O 事件的。
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; public class SimpleServerHandler extends SimpleChannelInboundHandler{ public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("[SERVER] - " + incoming.remoteAddress() + " 加入\n"); channels.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("[SERVER] - " + incoming.remoteAddress() + " 離開\n"); channels.remove(ctx.channel()); } @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { Channel incoming = ctx.channel(); System.out.println("[" + incoming.remoteAddress() + "]" + s); if(s == null || s.length() == 0) { incoming.writeAndFlush("消息是空的呀!\n"); } else { // MqRouter> mqRouter = JSONObject.parseObject(s, MqRouter.class); // System.out.println(mqRouter.getUri()); String responseMsg = "收到了," + s + "\n"; incoming.writeAndFlush(responseMsg); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在線"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉線"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"異常"); cause.printStackTrace(); ctx.close(); } }
SimpleServerInitializer 用來增加多個的處理類到 ChannelPipeline 上,包括編碼、解碼、SimpleServerHandler 等。
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class SimpleServerInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleServerHandler()); System.out.println("SimpleChatClient:" + ch.remoteAddress() + "連接上"); } }
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class SimpleServer { private int port; public SimpleServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new SimpleServerInitializer()).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); System.out.println("SimpleChatServer 啟動了"); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); System.out.println("SimpleChatServer 關(guān)閉了"); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new SimpleServer(port).run(); } }
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class SimpleClientHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { System.out.println("收到的信息:" + s); } }
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class SimpleClientInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleClientHandler()); } }
package me.lovegao.netty.learnw3c.mqdemo; import java.io.BufferedReader; import java.io.InputStreamReader; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class SimpleClient { private final String host; private final int port; public SimpleClient(String host, int port) { this.host = host; this.port = port; } public static void main(String[] args) throws Exception { new SimpleClient("localhost", 8080).run(); } public void run() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new SimpleClientInitializer()); Channel channel = bootstrap.connect(host, port).sync().channel(); BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); while(true) { String line = in.readLine(); if(line.equals("exit!")) { break; } channel.writeAndFlush(line + "\r\n"); } } catch(Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
在我把教程上的代碼略微改了一下,測試時發(fā)現(xiàn)客戶端能夠發(fā)出消息,服務(wù)端能夠收到消息,服務(wù)端也走到了回復(fù)客戶端的流程,但是客戶端卻收不到消息。還原代碼后是正常的,想了半天,最后才發(fā)現(xiàn)是改代碼的的時候漏掉了“\n”這個標(biāo)識,以此導(dǎo)致客戶端始終不打印消息。
netty只封裝了網(wǎng)絡(luò)交互,gmq整體使用了gmvc框架,而gmvc框架目前還無法脫離servlet。而我又不太想把之前寫的代碼全部改為自己new的方式。
1)改造gmvc框架
對gmvc框架進(jìn)行重構(gòu),使得能夠脫離servlet使用。也就是將IOC功能剝離開。
優(yōu)點:一步到位,符合整體的規(guī)劃。
缺點:gmq的迭代會延遲一段時間。
2)暫時拋棄gmvc框架
暫時將目前依賴的gmvc框架給去除掉,優(yōu)先完成gmq的迭代。待后期gmvc框架改造完成后再進(jìn)行改造。
優(yōu)點:能夠盡早的完成gmq的功能。
缺點:先移除框架,后期再套上框架,相當(dāng)于做了兩次多余的功。費時費力。
到此,相信大家對“netty的怎么實現(xiàn)及運用到gmq中”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!