• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

Netty(二、深入理解)

开发技术 开发技术 3周前 (05-12) 19次浏览

reactor模式

在深入了解Netty之前,我们需要先知道reactor(反应器模式),是高性能网络编程必须知道的模式。

BIO

我们先了解下原始socket编程:

//这里可以是个多线程,每个线程对应一个socket,循环处理业务,此处代码就略了,主要讲逻辑
while (true){
       //new Thread()...//Server监听指定端口 ServerSocket server = new ServerSocket(8080);//socket阻塞,一直等待着连接到来 Socket socket =server.accept();//从socket获取输入流 InputStream inputStream =socket.getInputStream();//建立缓冲区进行读取 byte[] bytes = new byte[1024];intlen; StringBuilder sb= newStringBuilder();while ((len = inputStream.read(bytes)) != -1) {//指定编码格式 sb.append(new String(bytes, 0, len,"UTF-8")); } System.out.println(sb); inputStream.close(); socket.close(); server.close(); }

  以服务端为例,Socket建立好后不断循环监听是否有套接字连接,获取到连接后,从socket获取输入流。在发送/接收数据时,并不是直接从网络中读取或发送,而是要通过缓冲区,例如:发送数据时,现将数据写入缓冲区,然后再由TCP/IP协议将数据由缓冲区发送目标的缓冲区,目标从缓冲区中读取。

  这种多线程的socket虽然通过一个线程一个socket的方式,提高了服务器的吞吐,但每个线程内部还是阻塞的,当并发量大时,线程的反复创建和销毁会对系统造成巨大的负担。针对这种情况,我们就需要用到reactor模式。

单线程reactor模式

reactor模式,基于java NIO之上,抽象出了两个组件:Reactor和Handler。

Reactor:负责响应IO事件,如新事件的连接、读写,将事件交给Handler处理。

Handler:负责事件的处理,完成channel的读取,事件逻辑处理,channel的写出。

Reactor

packagecom.wk.test.nettyTest;importjava.io.IOException;importjava.net.InetSocketAddress;importjava.nio.channels.SelectionKey;importjava.nio.channels.Selector;importjava.nio.channels.ServerSocketChannel;importjava.nio.channels.SocketChannel;importjava.util.Iterator;importjava.util.Set;class Reactor implementsRunnable {//选择器 finalSelector selector;//服务端通道 finalServerSocketChannel serverChannel;//构造函数初始化 Reactor(int port) throwsIOException { selector=Selector.open(); serverChannel=ServerSocketChannel.open();//绑定连接 serverChannel.socket().bind(newInetSocketAddress(port));//非阻塞 serverChannel.configureBlocking(false);//将服务端的通道绑定到选择器上面,并定义事件为接收连接时间//OP_ACCEPT:接收连接就绪事件,服务端监听到客户端,可接收连接 1<<4//OP_CONNECT:连接就绪事件,表示客户端与服务端建立连接成功 1<<3//OP_READ:读就绪事件,表示通道中有可读数据,可执行读操作 1<<0//OP_WRITE:写就绪事件,表示可以向通道写数据 1<<2 SelectionKey selectionKey =serverChannel.register(selector, SelectionKey.OP_ACCEPT);//选择键通过attach方法附加一个对象 selectionKey.attach(newAcceptor()); } @Overridepublic voidrun() {//不中断的线程则循环,interrupted方法,判断线程是否中断,并能释放已经中断的线程 while (!Thread.interrupted()){try{//这里每一个request封装一个channel,所有的channel注册在一个选择器上,selector选择器不断轮询查看可读状态 selector.select();//获取选择器的选择键集合 Set<SelectionKey> selectedKeys =selector.selectedKeys(); Iterator<SelectionKey> iterator =selectedKeys.iterator();while(iterator.hasNext()){ SelectionKey selectedKey=iterator.next();//attachement方法可以获取attach方法附加的对象,这里就是前面附加进来的Handler对象,也就是事件处理类 Runnable r =(Runnable) selectedKey.attachment();if(r!=null){ r.run(); } } }catch(IOException e) { e.printStackTrace(); } } }class Acceptor implementsRunnable{ @Overridepublic voidrun() {try{//获取已连接上的channel通道 SocketChannel channel =serverChannel.accept();if(channel!=null){//自定义Handler,事件处理类,将通道绑定到选择器上面 newHandler(selector,channel); } }catch(IOException e) { e.printStackTrace(); } } } }

Handler

packagecom.wk.test.nettyTest;importjava.io.IOException;importjava.nio.ByteBuffer;importjava.nio.channels.SelectionKey;importjava.nio.channels.Selector;importjava.nio.channels.SocketChannel;public class Handler implementsRunnable {//通道 finalSocketChannel channel;//绑定到选择器的选择键 finalSelectionKey selectionKey;//定义输入输出缓冲区 ByteBuffer inputBuffer = ByteBuffer.allocate(102400); ByteBuffer outputBuffer= ByteBuffer.allocate(102400);static final boolean READING = true, WRITING = false;//初始化定义可读就绪 boolean status =READING; Handler(Selector selector, SocketChannel c)throwsIOException { channel=c;//非阻塞 c.configureBlocking(false);//这里将通道注册到选择器上,本应后面的int是 1(读),4(写),8(连接),16(可连接)的//这种操作貌似是判断JDK的selector有没有立即返回或报错,并不引起任何实质操作。//https://github.com/netty/netty/issues/1836这个讨论问题的地址,外国友人貌似也搞不懂,似乎是个JDK NIO的BUG selectionKey = channel.register(selector, 0);//选择键将本身也就是Handler附加 selectionKey.attach(this);//定义当前选择键是读就绪状态 selectionKey.interestOps(SelectionKey.OP_READ);//唤醒选择器 selector.wakeup(); } @Overridepublic voidrun() {try{if(status) { read(); }else{ write(); } }catch(IOException e) { e.printStackTrace(); } }public void read() throwsIOException { channel.read(inputBuffer);//一系列逻辑判定和处理 status =WRITING; selectionKey.interestOps(SelectionKey.OP_WRITE); }public void write() throwsIOException { channel.write(outputBuffer);//判定写操作执行完毕后,关闭selectKey selectionKey.cancel(); } }

Netty(二、深入理解)

 

  客户端每个请求都封装成一个channel通道连接到selector上面,并有一个selectionKey选择键,选择器的附加对象是Handler处理器,将请求分派到handler中。

单线程的缺点是当Handler阻塞时,会导致其他client的请求也阻塞,这种实际使用不多,一般使用多线程的reactor模式。

多线程reactor模式

多线程是将handler放入一个线程池,多线程的进行业务处理

Netty(二、深入理解)

 

 具体代码就不展示了,也就是在Handler中建立一个线程池来进行读写操作。

ps:以上就是基于java NIO的reactor模式,虽然逻辑有些复杂且不易理解。但是在理解Netty之前一定要先理解它。

Netty DEMO

我们先将demo代码贴上来,通过代码来对Netty进行理解。

服务端

NettyServerTest

packagecom.wk.test.nettyTest;importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.ChannelFuture;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelOption;importio.netty.channel.EventLoopGroup;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioServerSocketChannel;importio.netty.handler.codec.string.StringDecoder;importio.netty.handler.codec.string.StringEncoder;importio.netty.util.CharsetUtil;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;public classNettyServerTest {private static final Logger logger = LoggerFactory.getLogger(NettyServerTest.class);public static voidmain(String[] args) {//实例化两个线程组//处理服务器与客户端的连接 EventLoopGroup pGroup = new NioEventLoopGroup(1);//进行网络通信(读写) EventLoopGroup cGroup = new NioEventLoopGroup(10);//配置容器,配置相关信息 ServerBootstrap bootstrap = newServerBootstrap() .group(pGroup,cGroup)//绑定两个线程组 .channel(NioServerSocketChannel.class)                  //指定NIO的模式 .childHandler(new ChannelInitializer<SocketChannel>() { //配置业务处理类 @Overrideprotected void initChannel(SocketChannel socketChannel) throwsException {//解码器 socketChannel.pipeline().addLast("decoder", newStringDecoder(CharsetUtil.UTF_8));//编码器 socketChannel.pipeline().addLast("encoder", newStringEncoder(CharsetUtil.UTF_8));//自定义事件处理器 socketChannel.pipeline().addLast(newNettyServerHandler()); } }) .option(ChannelOption.SO_BACKLOG,1024)         //设置TCP缓冲区 .childOption(ChannelOption.SO_KEEPALIVE, true); //保持连接 try{//绑定端口启动 ChannelFuture channelFuture = bootstrap.bind(8090).sync(); logger.info("服务器启动开启监听端口:{}",8090);//等待关闭 channelFuture.channel().closeFuture().sync(); }catch(InterruptedException e) { e.printStackTrace(); }finally{//Netty优雅退出 pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } } }

NettyServerHandler

packagecom.wk.test.nettyTest;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.ChannelInboundHandlerAdapter;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;/*** 服务端事件处理器,基础入站处理器类*/ public class NettyServerHandler extendsChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);/*** 客户端连接时触发 *@paramctx *@throwsException*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throwsException { logger.info("Channel active"); }/*** 客户端发送消息时触发 *@paramctx *@parammsg *@throwsException*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throwsException { logger.info("服务端接收的消息:{}", msg.toString()); ctx.write("服务器返回"); ctx.flush(); }/*** 发生异常时触发 *@paramctx *@paramcause *@throwsException*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException { cause.printStackTrace(); ctx.close(); } }

客户端

NettyClientTest

packagecom.wk.test.nettyTest;importcn.jiguang.common.connection.NettyClientInitializer;importio.netty.bootstrap.Bootstrap;importio.netty.channel.ChannelFuture;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelOption;importio.netty.channel.EventLoopGroup;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioSocketChannel;importio.netty.handler.codec.string.StringDecoder;importio.netty.handler.codec.string.StringEncoder;importio.netty.util.CharsetUtil;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;public classNettyClientTest {private static final Logger logger = LoggerFactory.getLogger(NettyClientTest.class);public static voidmain(String[] args) {//客户端只需要定义一个读写的线程组 EventLoopGroup group = newNioEventLoopGroup();//客户端是bootstrap,其他和服务端配置大同小异 Bootstrap bootstrap = newBootstrap() .group(group) .option(ChannelOption.TCP_NODELAY,true) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Overrideprotected void initChannel(SocketChannel socketChannel) throwsException { socketChannel.pipeline().addLast("decoder", newStringDecoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast("encoder", newStringEncoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(newNettyClientHandler()); } });try{//连接服务器地址 ChannelFuture future = bootstrap.connect("127.0.0.1",8090).sync(); logger.info("客户端启动成功");//发送信息 future.channel().writeAndFlush("你好啊").sync(); future.channel().closeFuture().sync(); }catch(InterruptedException e) { e.printStackTrace(); }finally{//优雅关闭 group.shutdownGracefully(); } } }

NettyClientHandler

packagecom.wk.test.nettyTest;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.ChannelInboundHandlerAdapter;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;/*** 客户端处理类,继承入站处理适配器*/ public class NettyClientHandler extendsChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); @Overridepublic void channelActive(ChannelHandlerContext ctx) throwsException { logger.info("客户端Active ....."); }/*** *@paramctx *@parammsg *@throwsException*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throwsException { logger.info("客户端接收的消息:{}", msg.toString()); }/*** 发生异常时触发 *@paramctx *@paramcause *@throwsException*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException { cause.printStackTrace(); ctx.close(); } }

 EventLoop

//实例化两个线程组//处理服务器与客户端的连接 EventLoopGroup bossGroup = new NioEventLoopGroup(1);//进行网络通信(读写) EventLoopGroup workerGroup = new NioEventLoopGroup(10);

以服务端为例,设置两个线程组。

一个线程组负责监听连接的parentChannel,定义为BossLoopGroup

另一个线程组负责客户端连接读写的childChannel,定义为WorkerLoopGroup

一个线程封装到一个EventLoop,多个EventLoop就组成了线程组。而每一个channel绑定一个EventLoop,一个EventLoop可以有多个channel。

Netty(二、深入理解)

 

 Bootstrap

Bootstrap是Netty提供的一个工厂类,我们可以通过它来完成对Netty服务端或客户端的初始化配置,这样我们就省去了用JDK NIO繁琐的创建channel、设置、启动等步骤,将重心放在事件业务处理上面。

Bootstrap分为服务端的ServerBootstrap和客户端的Bootstrap

Bootstrap执行分为8个步骤:

ServerBootstrap bootstrap = newServerBootstrap()//1.设置reactor线程 .group(bossGroup,workerGroup)//2.设置channel通道的类型,这里是NIO .channel(NioServerSocketChannel.class)//3.设置监听端口 .localAddress(new InetSocketAddress(8090))//4.设置通道的选项 .option(ChannelOption.SO_BACKLOG, 1024)         //设置TCP缓冲区 .childOption(ChannelOption.SO_KEEPALIVE, true) //心跳检测保持连接//5.配发事件处理器流水线 .childHandler(new ChannelInitializer<SocketChannel>() { @Overrideprotected void initChannel(SocketChannel socketChannel) throwsException {//解码器 socketChannel.pipeline().addLast("decoder", newStringDecoder(CharsetUtil.UTF_8));//编码器 socketChannel.pipeline().addLast("encoder", newStringEncoder(CharsetUtil.UTF_8));//自定义事件处理器 socketChannel.pipeline().addLast(newNettyServerHandler()); } });try{//6.绑定servr,这里使用了sync方法,直到绑定成功为止 ChannelFuture channelFuture =bootstrap.bind().sync(); logger.info("服务器启动开启监听端口:{}",8090);//7.等待关闭,直到channel关闭为止 channelFuture.channel().closeFuture().sync(); }catch(InterruptedException e) { e.printStackTrace(); }finally{//8.Netty优雅退出 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }

这8个步骤和我上面写的DEMO顺序方面可能稍有不同,不过不影响。

Channel

核心概念以及流程

Channel是服务端与客户端的通信通道,每一个request都可以封装成一channel。

ChannelPipeline是用于存放Handler的容器,里面存放这事件处理器流水线。

ChannelHandler是处理器,分为入站处理器和出站处理器,以客户端的角度来看,客户端到服务端是出站,服务端到客户端就是入站。

ChannelContext是通信管道的上下文,当一个入站或出站处理器处理完后,将上下文传给下一个入站或出站处理器。

Netty(二、深入理解)

 

 ChannelHandler

//5.配发事件处理器流水线 .childHandler(new ChannelInitializer<SocketChannel>() { @Overrideprotected void initChannel(SocketChannel socketChannel) throwsException {//解码器 socketChannel.pipeline().addLast("decoder", newStringDecoder(CharsetUtil.UTF_8));//编码器 socketChannel.pipeline().addLast("encoder", newStringEncoder(CharsetUtil.UTF_8));//自定义事件处理器 socketChannel.pipeline().addLast(newNettyServerHandler()); } });

这里就是配发事件处理器流水线,编码器实质上也是个处理器,出站编码,入站解码。

自定义处理器也要继承出站或入站的事件处理配置类

public class NettyServerHandler extends ChannelInboundHandlerAdapter 

ByteBuf

数据在网络中传输并不是直接传输的,而是要通过缓冲区。写出时,先将数据写到缓冲区,再由TCP协议将数据从缓冲区。读取时也是一样,从缓冲区读取。

Netty(二、深入理解)

 

 JAVA NIO中的缓冲区是ByteBuff,长度固定且只有一个索引,在读写操作的时候还需要切换读写状态。而Netty的ByteBuf则改良了这些问题。

在ByteBuf中,提供了三个索引,读索引(readIndex)、写索引(writeIndex)、最大容量(maxCapacity)

缓冲区的释放

Netty(二、深入理解)

我们再看看这张图,入站时,当走到tailHandler(最后一个Handler)的时候,会释放掉缓冲区。出站则是在headHandler释放。

Netty(二、深入理解)

原文地址:https://www.cnblogs.com/Unlimited-Blade-Works/p/12658874.html


喜欢 (0)