澳门新浦京8455comJavaSocket编程之Netty框架线程模型

澳门新浦京8455com 20

前言

Getty是自身为了求学 Java NIO
所写的叁个 NIO 框架,完成进度中参阅了 Netty 的规划,同期利用 Groovy
来贯彻。即使只是玩具,可是麻雀虽小,麻雀虽小,在促成进程中,不唯有熟谙了
NIO 的使用,还借鉴了众多 Netty 的布置性观念,进步了和睦的编码和规划工夫。

至于怎么用 Groovy 来写,因为本人刚学了 Groovy,适逢其会拿来练手,加上 Groovy
是合营 Java 的,所以只是语法上的反差,底层达成照旧根据 Java API的。

Getty 的基本代码行数不超越 500 行,一方面得益于 Groovy
简洁的语法,其他方面是因为本人只兑现了大旨的逻辑,最复杂的实际上是解码器完毕。脚手架轻松搭,摩天天津大学学楼哪有那么轻便盖,但用来学学
NIO 足以。

1.Netty概述

Netty是几个由JBoss提供的飞跃的Java NIO
client-server(客户端-服务器卡塔尔国开拓框架,使用Netty能够便捷支付互联网使用。Netty提供了一种新的点子来使开采网络应用程序,使其相当的轻松选择且有很强的可扩张性。Netty的此中得以完结是很复杂的,然而Netty提供了差不离易用的API从互连网拍卖代码中解耦业务逻辑。Netty是一心依赖NIO完成的,接收事件驱动机制,非拥塞实施,所以一切Netty都以异步的。Netty框架连串结构图如下:

澳门新浦京8455com 1

Netty框架种类构造图

《I/O模型之四:Java
浅析I/O模型》

线程模型

Getty 使用的是 Reactor 四线程模型

澳门新浦京8455com 2

  1. 澳门新浦京8455com ,有特别三个 NIO 线程- Acceptor 线程用于监听服务端,选拔顾客端的 TCP
    连接要求,然后将三回九转分配给工作线程,由专门的学业线程来监听读写事件。
  2. 互连网 IO
    操作-读/写等由多少个职业线程负担,由这么些干活儿线程担任消息的读取、解码、编码和出殡和下葬。
  3. 1 个专门的学问线程能够况兼处理N条链路,可是 1 个链路只对应 1
    个干活线程,幸免产生并发操作难点。

2.Java NIO缓冲区与通道

叁个Buffer对象是一直数量的多寡的容器,其效劳是叁个存款和储蓄器。通道Channel是I/O传输发生时通过的输入,而缓冲区是那么些数量传输的发源或目的。对于离开缓冲区的传输,您想传递出去的多少被放到二个缓冲区,被传送到大路。缓冲区是包在贰个对象内的为主数据成分数组。Buffer类相比较二个简易数组的帮助和益处是它将关于数据的数目内容和音讯满含在一个单纯的对象中。缓冲区家族成员如下所示:

澳门新浦京8455com 3

缓冲区Buffer具备八个属性来提供有关其所蕴藏的多少成分的新闻。
它们是:
容量(Capacity):缓冲区能够容纳的数目元素的最大数目。这一体量在缓冲区创办时被设定,且不可能被退换。
上界(Limit):缓冲区的首先个不可能被读或写的要素。或然说,缓冲区中现成元素的计数。
位置(Position):下贰个要被读或写的因素的目录。地方会活动由相应的get(
卡塔尔和put( 卡塔尔国函数更新。
标记(Mark):一个备忘地点。调用mark( 卡塔尔来设定mark =
postion。调用reset( 卡塔尔设定position =
mark。标志在设定前是未定义的(undefined卡塔尔国。
那多个属性之间接连信守以下关系
0 <= mark <= position <= limit <= capacity

澳门新浦京8455com 4

一、拥塞IO与非拥塞IO

事件驱动模型

漫天服务端的流水生产线管理,建设构造于事件机制上。在
[经受连接->读->业务管理->写 ->关闭连接
]这几个进度中,触发器将触发相应事件,由事件微处理机对相应事件分别响应,落成服务器端的业务管理。

3.Netty核心网络模型

Netty是名列前茅的Reactor模型结构,在达成上,Netty中的Boss类当做mainReactor,NioWorker类当作subReactor(默许NioWorker的个数是时下服务器的可用核数)。在拍卖新来的呼吁时,NioWorker读完已选取的数目到ChannelBuffer中,之后触发ChannelPipeline中的ChannelHandler流。Netty是事件驱动,非梗塞的,能够经过ChannelHandler链来决定施行流向。

阻塞IO:

  平时在开展同步I/O操作时,就算读取数据,代码会梗塞直至有
可供读取的数码。相同,写入调用将会堵塞直至数据可以看到写入。传统的Server/Client方式会基于TPKuga(Thread
per
Request),服务器会为各类顾客端需要创设三个线程,由该线程单独担任管理三个顾客央求。这种方式带给的一个标题正是线程数量的疯长,多量的线程会附加服务器的支付。大好些个的贯彻为了防止那个题目,都应用了线程池模型,并设置线程池线程的最大数目,那由带给了新的主题材料,固然线程池中有200个线程,而有200个顾客都在进展大文件下载,会变成第201个客商的伏乞不大概及时管理,纵然第201个顾客只想呼吁一个几KB大小的页面。守旧的
Server/Client格局如下图所示:

澳门新浦京8455com 5

事件定义

  1. onRead:当顾客端发来数量,并已被职业线程正确读取时,触发该事件
    。该事件通报各事件微处理器能够对顾客端发来的多少开展实际管理了。
  2. onWrite:当顾客端能够起来选取服务端发送数据时触发该事件,通过该事件,大家能够向客商端发送响应数据。(当前的兑现中平素不选用写事件)
  3. onClosed:当顾客端与服务器断开连接时触发该事件。

3.1 Refactor单线程模型

Reactor单线程模型,指的是享有的IO操作都在同三个NIO线程上边完毕;
Reactor单线程模型特点:
1)Java NIO服务端,选拔客商端的TCP连接;
2)Java NIO客商端,向服务端发起TCP连接;
3)Java NIO服务端/顾客端,读取通讯对端的央浼或许应答音信;
4)Java NIO服务端/客商端,向通信对端发送消息需要或许应答音讯
采纳情况:对于一些小体量应用处景,能够运用单线程模型。可是对于高负载、大并发的运用处景却不妥贴,比方二个NIO线程相同的时间管理成都百货上千的链路,质量上不能够支撑,纵然NIO线程的CPU负荷到达100%,也力不能支满足海量音讯的编码、解码、读取和出殡和安葬。Reactor单线程模型如下图所示:

澳门新浦京8455com 6

非阻塞IO(NIO):

  NIO中国和北美洲梗塞I/O选择了依靠Reactor形式的劳作措施,I/O调用不会被打断,相反是登记感兴趣的特定I/O事件,如可读数据达到,新的套接字连接等等,在发出一定事件时,系统再通报我们。NIO中得以完毕非堵塞I/O的主导目的便是Selector,Selector便是挂号种种I/O事件地方,并且当那一个事件爆发时,就是其一目标告诉大家所发出的轩然大波,如下图所示:

澳门新浦京8455com 7

从图中得以见到,当有读或写等此外注册的平地风波时有产生时,能够从Selector中收获相应的SelectionKey,同临时候从
SelectionKey中能够找到爆发的风浪和该事件所发出的切实可行的SelectableChannel,以获得客商端发送过来的数码。

非拥塞指的是IO事件本人不打断,可是获取IO事件的select(卡塔尔国方法是索要窒碍等待的.分化是梗塞的IO会梗塞在IO操作上,
NIO窒碍在事变得到上,未有事件就从未有过IO,
从高档案的次序看IO就不封堵了.也正是说只有IO已经发生那么我们才评估IO是或不是封堵,不过select(卡塔尔堵塞的时候IO还还未发生,何谈IO的封堵呢?NIO的庐山面目目是延迟IO操作到确实产生IO的时候,并非先前的只要IO流展开了就一贯等候IO操作。

事件回调机制的兑现

在这里个模型中,事件采用广播格局,也正是享有注册的平地风波微电脑都能赢得事件通报。那样能够将不一样属性的作业管理,分别用不一样的Computer达成,使每种微处理机的功力尽大概单一。

一般来讲图:整个事件模型由监听器、事件适配器、事件触发器(HandlerChain,PipeLine)、事件微电脑组成。

澳门新浦京8455com 8

  • ServerListener:那是一个事变接口,定义需监听的服务器事件

    interface ServerListener extends Serializable{
        /**
         * 可读事件回调
         * @param request
         */
        void onRead(ctx)
        /**
         * 可写事件回调
         * @param request
         * @param response
         */
        void onWrite(ctx)
        /**
         * 连接关闭回调
         * @param request
         */
        void onClosed(ctx)
    }
    
  • EventAdapter:对 Serverlistener 接口完毕一个适配器
    (EventAdapter卡塔尔国,那样的补益是末了的风浪微机能够只管理所关心的平地风波。

    class EventAdapter implements ServerListener {
        //下个处理器的引用
        protected next
        void onRead(Object ctx) {
        }
        void onWrite(Object ctx) {
        }
        void onClosed(Object ctx) {
        }
    }
    
  • Notifier:用于在适龄的时候经过接触服务器事件,布告在册的风浪微处理器对事件做出响应。

    interface Notifier extends Serializable{
        /**
         * 触发所有可读事件回调
         */
        void fireOnRead(ctx)
        /**
         * 触发所有可写事件回调
         */
        void fireOnWrite(ctx)
        /**
         * 触发所有连接关闭事件回调
         */
        void fireOnClosed(ctx)
    }
    
  • HandlerChain:实现了Notifier接口,维持不改变的事件微电脑链条,每一次从第贰个计算机开端接触。

    class HandlerChain implements Notifier{
        EventAdapter head
        EventAdapter tail
        /**
         * 添加处理器到执行链的最后
         * @param handler
         */
        void addLast(handler) {
            if (tail != null) {
                tail.next = handler
                tail = tail.next
            } else {
                head = handler
                tail = head
            }
        }
        void fireOnRead(ctx) {
            head.onRead(ctx)
        }
        void fireOnWrite(ctx) {
            head.onWrite(ctx)
        }
        void fireOnClosed(ctx) {
            head.onClosed(ctx)
        }
    }
    
  • PipeLine:实现了Notifier接口,作为事件总线,维持多个事件链的列表。

    class PipeLine implements Notifier{
        static logger = LoggerFactory.getLogger(PipeLine.name)
        //监听器队列
        def listOfChain = []
        PipeLine(){}
        /**
         * 添加监听器到监听队列中
         * @param chain
         */
        void addChain(chain) {
            synchronized (listOfChain) {
                if (!listOfChain.contains(chain)) {
                    listOfChain.add(chain)
                }
            }
        }
        /**
         * 触发所有可读事件回调
         */
        void fireOnRead(ctx) {
            logger.debug("fireOnRead")
            listOfChain.each { chain ->
                chain.fireOnRead(ctx)
            }
        }
        /**
         * 触发所有可写事件回调
         */
        void fireOnWrite(ctx) {
            listOfChain.each { chain ->
                chain.fireOnWrite(ctx)
            }
        }
        /**
         * 触发所有连接关闭事件回调
         */
        void fireOnClosed(ctx) {
            listOfChain.each { chain ->
                chain.fireOnClosed(ctx)
            }
        }
    }
    

3.2 Refactor二十四线程模型

Rector多线程模型与单线程模型最大的差距正是有一组NIO线程管理IO操作。
Reactor多线程模型的特色:
1)有多少个极度的NIO线程-Acceptor线程用于监听服务端,选取顾客端的TCP连接乞求;
2)网络IO操作-读、写等由一个NIO线程池担任,线程池能够选择专门的学业的JD上影线程池完成,它包涵贰个职务队列和N个可用的线程,由那个NIO线程担负音讯的读取、解码、编码和出殡和下葬;
3)1个NIO线程能够同不时候管理N条链路,可是1个链路只对应1个NIO线程,幸免发生并发操作难题。
Reactor八线程模型如下图所示:

澳门新浦京8455com 9

二.NIO原理及通讯模型

Java
NIO是在jdk1.4起来利用的,它既可以够说成“新I/O”,也得以说成非堵塞式I/O。上边是java
NIO的干活原理:

  1. 由三个专程的线程来管理全部的 IO 事件,并肩负分发。 
  2. 事件驱动机制:事件到的时候接触,实际不是一道的去监视事件。 
  3. 线程通信:线程之间通过 wait,notify
    等方法通信。有限支持每趟上下文切换都以有意义的。收缩无谓的线程切换。 

读书过局地质地之后,上边贴出笔者清楚的java NIO的行事规律图:

 

澳门新浦京8455com 10

 

(注:每一种线程的拍卖流程差相当少都以读取数据、解码、总结管理、编码、发送响应。)

Java NIO的服务端只需运维三个专程的线程来管理全部的 IO
事件,这种通讯模型是怎么贯彻的吗?呵呵,大家一并来研商它的奥妙吧。java
NIO接纳了双向通道(channel)举行多少传输,并非单向的流(stream),在通道上能够挂号大家感兴趣的事件。一共有以下多样事件:

 

事件名 对应值
服务端接收客户端连接事件 SelectionKey.OP_ACCEPT(16)
客户端连接服务端事件 SelectionKey.OP_CONNECT(8)
读事件 SelectionKey.OP_READ(1)
写事件 SelectionKey.OP_WRITE(4)

 服务端和顾客端各自维护一个管理通道的靶子,大家称之为selector,该目的能检查测量检验三个或八个通道
(channel卡塔尔国上的风云。大家以服务端为例,倘若服务端的selector上注册了读事件,某时刻客商端给服务端发送了有的数目,堵塞I/O那个时候会调用read(卡塔尔国方法堵塞地读取数据,而NIO的劳务端会在selector中增加四个读事件。服务端的拍卖线程会轮询地拜望selector,要是访谈selector时意识有感兴趣的事件到达,则处理那么些事件,如果未有感兴趣的风浪到达,则管理线程会一贯不通直到感兴趣的事件达到甘休。下边是自家了解的java
NIO的通讯模型暗中提示图:

澳门新浦京8455com 11

事件管理流程

澳门新浦京8455com 12

编制程序模型

事件管理采取任务链情势,每种微型机处理完数据之后会垄断(monopolyState of Qatar是还是不是继续试行下一个电脑。假使微处理机不将职务交给线程池管理,那么全部拍卖流程都在同二个线程中拍卖。并且各种连接都有独立的PipeLine,职业线程可以在三个三番五遍上下文切换,可是七个三番五次上下文只会被三个线程管理。

3.3 Netty服务器端创制进度

Netty服务器运营时,创制四个NIO伊芙ntLoopGroup独立的Reator线程池,三个用于收纳客户端的TCP连接,叁个用来拍卖IO的连锁的读写操作。
Netty线程模型是创建在Reactor模型的根底上,线程模型而不是萧规曹随的,通过运转参数的安插,能够在区别的线程模型之间切换。Netty服务器端创造进度类别图如下:

澳门新浦京8455com 13

Netty服务器端创立进程连串图

1)首先成立三个ServerBootstrap实例,那是Netty服务端的启航扶持类;
2)设置并绑定Reactor线程池,Netty的Reactor线程池是伊芙ntLoopGroup,它实质上正是EventLoop线程的数组。伊夫ntLoop的义务是拍卖全数注册到本线程多路复用器Selector上的Channel;
3)设置并绑定服务端NIOserverSocketChannel,
Netty通过工厂类,利用反射格局开创NioServerSocketChannel对象
4)设置TCP连接参数,TCP链路白手起家时成立并开首化ChannelPipeline,它实质是一个担当管理网络事件的职分链,负担处理和实行ChannelHandler。网络事件以事件流的花样在ChannelPipeline中流转,由ChannelPipeline依据ChannelHandler的执行政策调治ChannelHandler的施行;
5)增多并设置ChannelHandler,参预到ChannelPipeline事件流;
6)服务器端绑定监听端口,并运营服务器;
7)运维NioEventLoop担任调治和推行Selector轮询操作,接收筹划妥贴的Channel集结;
8)当轮询到筹算伏贴的Channel之后,就由Reactor线程Nio伊夫ntLoop实行ChannelPipeline的附和措施;
9)实践Netty系统并调整施行ChannelHandler业务逻辑

三、Java NIO 概述

Java NIO 由以下几当中央部分构成:

  • Channels
  • Buffers
  • Selectors

虽说Java NIO 中除却还应该有好些个类和组件,但在小编眼里,Channel,Buffer 和
Selector
构成了主导的API。其余组件,如Pipe和FileLock,只可是是与八个基本器件协同使用的工具类。因而,在概述中作者将聚集在这里四个零件上。别的组件会在单独的章节中讲到。

 

核心类

3.4 Netty线程模型实例

挂号多个OutboundHandler,实行种种为注册顺序的逆序,注册五个InboundHandler,推行顺序为注册顺序,注册HelloServerInHandler用于吸收接纳客户端音讯已经向顾客端发送消息,首要代码如下:

澳门新浦京8455com 14

澳门新浦京8455com 15

本订阅号提供Java相关手艺共享,从Java编制程序功底到Java高档技能,从JavaWeb手艺底工Jsp、Servlet、>JDBC到SSH、SSM开发框架,从REST风格接口设计到分布式项目实战。解析主流开源手艺框架,用亲身
实行来谱写深度Java技能日志。

澳门新浦京8455com 16

Java技能日志

应接关怀 Java手艺日志 Wechat订阅号

Channel 和 Buffer

超级多,全数的 IO 在NIO 中都从四个Channel 开首。Channel 有一点点象流。
数据足以从Channel读到Buffer中,也足以从Buffer
写到Channel中。这里有个图示:

澳门新浦京8455com 17

Channel和Buffer有几许种等级次序。上边是JAVA NIO中的一些重要Channel的完结:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

正如你所观望的,这一个通道包涵了UDP 和 TCP 互联网IO,甚至文件IO。

与这一个类一同的有部分珠辉玉映的接口,但为简易起见,作者竭尽在概述中不涉及它们。本学科别的章节与它们相关的地点作者会实行表明。

以下是Java NIO里第一的Buffer达成:

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

那一个Buffer覆盖了您能经过IO发送的中央数据类型:byte, short, int, long,
float, double 和 char。

Java NIO 还会有个 MappedByteBuffer,用于表示内存映射文件,
我也不考虑在概述中注脚。

ConnectionCtx

接连上下文ConnectionCtx

class ConnectionCtx {
    /**socket连接*/
    SocketChannel channel
    /**用于携带额外参数*/
    Object attachment
    /**处理当前连接的工作线程*/
    Worker worker
    /**连接超时时间*/
    Long timeout
    /**每个连接拥有自己的pipeline*/
    PipeLine pipeLine
}

NioServer

主线程担负监听端口,持有工作线程的援引(使用轮转法分配连接),每一趟有接连几天到来时,将延续放入职业线程的接连几天队列,并提示线程selector.wakeup()(线程大概过不去在selector上)。

class NioServer extends Thread {
    /**服务端的套接字通道*/
    ServerSocketChannel ssc
    /**选择器*/
    Selector selector
    /**事件总线*/
    PipeLine pipeLine
    /**工作线程列表*/
    def workers = []
    /**当前工作线程索引*/
    int index
}

Selector

Selector允许单线程管理几个Channel。若是您的应用张开了多少个延续(通道),但种种连接的流量都十分低,使用Selector就能很便利。比方,在一个聊天服务器中。

那是在贰个单线程中应用三个Selector管理3个Channel的图示:

澳门新浦京8455com 18

要利用Selector,得向Selector注册Channel,然后调用它的select(卡塔尔方法。那个方法会平昔不通到某些注册的大道有事件就绪。一旦这一个办法再次来到,线程就足以管理那一个事件,事件的事例宛如新连接进来,数据选择等。

Worker

干活线程,担当登记server传递过来的socket连接。首要监听读事件,管理socket,管理写操作。

class Worker extends Thread {
    /**选择器*/
    Selector selector
    /**读缓冲区*/
    ByteBuffer buffer
    /**主线程分配的连接队列*/
    def queue = []
    /**存储按超时时间从小到大的连接*/
    TreeMap<Long, ConnectionCtx> ctxTreeMap

    void run() {
        while (true) {
            selector.select()
            //注册主线程发送过来的连接
            registerCtx()
            //关闭超时的连接
            closeTimeoutCtx()
            //处理事件
            dispatchEvent()
        }
    }
}

NIO使用手续

澳门新浦京8455com 19

服务端步骤:

步骤一:张开ServerSocketChannel,用于监听顾客端的连年,它是有着顾客端连接的父管道,代码示举个例子下:

ServerSocketChannel acceptorSvr = ServerSocketChannel.open();

步骤二:绑定监听端口,设置连接为非阻塞形式,示例代码如下:

acceptorSvr.socket().bind(new InetSocketAddress(InetAddress.getByName(“IP”), port));
acceptorSvr.configureBlocking(false);

步骤三:创制Reactor线程,创立多路复用器并运转线程,代码如下:

Selector selector = Selector.open();
New Thread(new ReactorTask()).start();

步骤四:将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCEPT事件,代码如下:

SelectionKey key = acceptorSvr.register( selector, SelectionKey.OP_ACCEPT, ioHandler);

步骤五:多路复用器在线程run方法的最为循环体内轮询希图妥当的Key,代码如下:

    int num = selector.select();
    Set selectedKeys = selector.selectedKeys();
    Iterator it = selectedKeys.iterator();
    while (it.hasNext()) {
         SelectionKey key = (SelectionKey)it.next();
         // ... deal with I/O event ...
    }

步骤六:多路复用器监听到有新的客商端连着,处理新的连通央浼,落成TCP贰遍握手,创建物理链路,代码示举例下:

SocketChannel channel = svrChannel.accept();

步骤七:设置客商端链路为非窒碍格局,示例代码如下:

channel.configureBlocking(false);
channel.socket().setReuseAddress(true);

步骤八:将新接入的客户端连接注册到Reactor线程的多路复用器上,监听读操作,用来读取顾客端发送的网络音信,代码如下:

SelectionKey key = socketChannel.register( selector, SelectionKey.OP_READ, ioHandler);

步骤九:异步读取顾客端央求音信到缓冲区,示例代码如下:

int  readNumber =  channel.read(receivedBuffer);

步骤十:对ByteBuffer进行编解码,假诺有半包音信指针reset,继续读取后续的报文,将解码成功的新闻封装成Task,投递到业务线程池中,进行业务逻辑编排,示例代码如下:

    Object message = null;
    while(buffer.hasRemain())
    {
           byteBuffer.mark();
           Object message = decode(byteBuffer);
           if (message == null)
           {
              byteBuffer.reset();
              break;
           }
           messageList.add(message );
    }
    if (!byteBuffer.hasRemain())
    byteBuffer.clear();
    else
        byteBuffer.compact();
    if (messageList != null & !messageList.isEmpty())
    {
    for(Object messageE : messageList)
       handlerTask(messageE);
    }

手续十五:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将音讯异步发送给客商端,示例代码如下:

socketChannel.write(buffer);

专一:借使发送区TCP缓冲区满,会促成写半包,当时,须要注册监听写操作位,循环写,直到整包新闻写入TCP缓冲区,此处不赘述,后续Netty源码深入分析章节会详细剖判Netty的拍卖政策。
当大家精晓成立NIO服务端的中央步骤之后,下边大家将近些日子的时刻服务器程序通过NIO重写贰次,让大家能够学习到一体化版的NIO服务端创立。

顾客端步骤:

澳门新浦京8455com 20

步骤一:展开SocketChannel,绑定客户端本地地址(可选,暗中认可系统会自由分配三个可用的本地地址),示例代码如下:

SocketChannel clientChannel = SocketChannel.open();

手续二:设置SocketChannel为非梗塞方式,同时安装客商端连接的TCP参数,示例代码如下:

clientChannel.configureBlocking(false);
    socket.setReuseAddress(true);
    socket.setReceiveBufferSize(BUFFER_SIZE);
    socket.setSendBufferSize(BUFFER_SIZE);

步骤三:异步连接服务端,示例代码如下:

boolean connected = clientChannel.connect(new InetSocketAddress(“ip”,port));

手续四:剖断是或不是连接成功,借使三番三回成功,则一贯登记读状态位到多路复用器中,如果当前尚无连接成功(异步连接,重回false,表明顾客端已经发送sync包,服务端未有回来ack包,物理链路还没曾创立),示例代码如下:

    if (connected)
    {
        clientChannel.register( selector, SelectionKey.OP_READ, ioHandler);
    }
    else
    {
        clientChannel.register( selector, SelectionKey.OP_CONNECT, ioHandler);
    }

步骤五:向Reactor线程的多路复用器注册OP_CONNECT状态位,监听服务端的TCP
ACK应答,示例代码如下:

clientChannel.register( selector, SelectionKey.OP_CONNECT, ioHandler);

手续六:创立Reactor线程,创设多路复用器并运行线程,代码如下:

Selector selector = Selector.open();
    New Thread(new ReactorTask()).start();

步骤七:多路复用器在线程run方法的最为循环体内轮询希图妥帖的Key,代码如下:

int num = selector.select();
    Set selectedKeys = selector.selectedKeys();
    Iterator it = selectedKeys.iterator();
    while (it.hasNext()) {
    if (key.isConnectable())
      //handlerConnect();
    }

手续九:决断连接结果,要是连接成功,注册读事件到多路复用器,示例代码如下:

if (channel.finishConnect())
      registerRead();

步骤十:注册读事件到多路复用器:

clientChannel.register( selector, SelectionKey.OP_READ, ioHandler);

手续十五:异步读顾客端诉求音信到缓冲区,示例代码如下:

int  readNumber =  channel.read(receivedBuffer);

手续十五:对ByteBuffer进行编解码,假设有半包音信选择缓冲区Reset,继续读取后续的报文,将解码成功的音讯封装成Task,投递到业务线程池中,举办专门的学问逻辑编排,示例代码如下:

Object message = null;
    while(buffer.hasRemain())
    {
           byteBuffer.mark();
           Object message = decode(byteBuffer);
           if (message == null)
           {
              byteBuffer.reset();
              break;
               }
           messageList.add(message );
    }
    if (!byteBuffer.hasRemain())
    byteBuffer.clear();
    else
        byteBuffer.compact();
    if (messageList != null & !messageList.isEmpty())
    {
    for(Object messageE : messageList)
       handlerTask(messageE);
    }    

手续十五:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将音讯异步发送给客商端,示例代码如下:

socketChannel.write(buffer);

 

完整代码:

package com.dxz.springsession.nio.demo6;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NIOServer {

    public static void main(String[] args) throws IOException {

        new Thread(new ReactorTask()).start();


    }

    public static class ReactorTask implements Runnable {

        private Selector selector;

        public ReactorTask() {
            try {
                // 第一步:打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道
                ServerSocketChannel acceptorSvr = ServerSocketChannel.open();

                // 第二步:病毒监听端口,设置连接为非阻塞模式
                acceptorSvr.socket().bind(new InetSocketAddress(InetAddress.getByName("localhost"), 1234));
                acceptorSvr.configureBlocking(false);

                // 第三步:创建Reactor线程,创建多路复用器并启动线程
                selector = Selector.open();

                // 第四步:将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听Accept事件
                SelectionKey key = acceptorSvr.register(selector, SelectionKey.OP_ACCEPT);

            } catch (UnknownHostException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            // 第五步:在run方法中无限循环体内轮询准备就绪的Key
            while (true) {
                try {
                    selector.select(1000);
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    SelectionKey key = null;
                    while (it.hasNext()) {
                        key = it.next();
                        it.remove();
                        try {
                            if (key.isValid()) {
                                // 处理新接入的请求消息
                                if (key.isAcceptable()) {
                                    // 第六步:多路复用器监听到有新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路
                                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                                    SocketChannel sc = ssc.accept();
                                    // 第七步:设置客户端链路为非阻塞模式
                                    sc.configureBlocking(false);
                                    sc.socket().setReuseAddress(true);
                                    // 第八步:将新接入的客户端连接注册到Reactor线程的多路复用器上,监听读操作,读取客户端发送的网络消息
                                    sc.register(selector, SelectionKey.OP_READ);
                                }
                                if (key.isReadable()) {
                                    // 第九步:异步读取客户端请求消息到缓存区
                                    SocketChannel sc = (SocketChannel) key.channel();
                                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                                    int readBytes = sc.read(readBuffer);

                                    // 第十步:对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文
                                    if (readBytes > 0) {
                                        readBuffer.flip();
                                        byte[] bytes = new byte[readBuffer.remaining()];
                                        readBuffer.get(bytes);
                                        String body = new String(bytes, "UTF-8");
                                        System.out.println("The time server receive order : " + body);
                                        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)
                                                ? new java.util.Date(System.currentTimeMillis()).toString()
                                                : "BAD ORDER";
                                        //写应答
                                        byte[] bytes2 = currentTime.getBytes();
                                        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes2.length);
                                        writeBuffer.put(bytes2);
                                        writeBuffer.flip();
                                        sc.write(writeBuffer);
                                    } else if (readBytes < 0) {
                                        // 对端链路关闭
                                        key.cancel();
                                        sc.close();
                                    } else
                                        ; // 读到0字节,忽略
                                }
                            }
                        } catch (Exception e) {
                            if (key != null) {
                                key.cancel();
                                if (key.channel() != null)
                                    key.channel().close();
                            }
                        }
                    }
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }

        }

    }

}

 

客户端:

package com.dxz.springsession.nio.demo6;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TimeClientHandle implements Runnable {

    private String host;
    private int port;

    private Selector selector;
    private SocketChannel socketChannel;

    private volatile boolean stop;

    public TimeClientHandle(String host, int port) {
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        try {
            //第一步:打开SocketChannel,用于创建客户端连接
            socketChannel = SocketChannel.open();
            //第二步:设置SocketChannel为非阻塞模式
            socketChannel.configureBlocking(false);
            //第三步:创建多路复用器(在Reactor线程中)
            selector = Selector.open();

        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void run() {
        try {
            // 第四步:socketChannel发起连接
            if (socketChannel.connect(new InetSocketAddress(host, port))) {
                //第五步:如果直接连接成功,则注册到多路复用器上
                socketChannel.register(selector, SelectionKey.OP_READ);
                //第六步:发送请求消息,读应答
                byte[] req = "QUERY TIME ORDER".getBytes();
                ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
                writeBuffer.put(req);
                writeBuffer.flip();
                socketChannel.write(writeBuffer);
                if (!writeBuffer.hasRemaining())
                    System.out.println("Send order 2 server succeed.");
            } else
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                //第七步:多路复用器在run的无限循环体内轮询准备就绪的Key
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        if (key.isValid()) {
                            //第八步:将连接成功的Channel注册到多路复用器上
                            // 判断是否连接成功
                            SocketChannel sc = (SocketChannel) key.channel();
                            if (key.isConnectable()) {
                                if (sc.finishConnect()) {
                                    sc.register(selector, SelectionKey.OP_READ);
                                    //发送请求消息,读应答
                                    byte[] req = "QUERY TIME ORDER".getBytes();
                                    ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
                                    writeBuffer.put(req);
                                    writeBuffer.flip();
                                    sc.write(writeBuffer);
                                    if (!writeBuffer.hasRemaining())
                                        System.out.println("Send order 2 server succeed.");
                                } else
                                    System.exit(1);// 连接失败,进程退出
                            }
                            //监听读操作,读取服务端写回的网络信息
                            if (key.isReadable()) {
                                //第九步:读取信息到缓冲区
                                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                                int readBytes = sc.read(readBuffer);
                                if (readBytes > 0) {
                                    readBuffer.flip();
                                    byte[] bytes = new byte[readBuffer.remaining()];
                                    readBuffer.get(bytes);
                                    String body = new String(bytes, "UTF-8");
                                    System.out.println("Now is : " + body);
                                    this.stop = true;
                                } else if (readBytes < 0) {
                                    // 对端链路关闭
                                    key.cancel();
                                    sc.close();
                                } else
                                    ; // 读到0字节,忽略
                            }
                        }
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }

        // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if (selector != null)
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }

    }

}
package com.dxz.springsession.nio.demo6;

public class TimeClient {

    /**
     * @param args
     */
    public static void main(String[] args) {

    int port = 1234;
    if (args != null && args.length > 0) {
        try {
        port = Integer.valueOf(args[0]);
        } catch (NumberFormatException e) {
        // 采用默认值
        }
    }
    new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001")
        .start();
    }
}

 

运营三个简短的 Web 服务器

我实现了一多级管理HTTP呼吁的微机,具体得以实现看代码。

  • LineBasedDecoder:行解码器,按行剖析数据
  • HttpRequestDecoder:HTTP需要拆解分析,前段时间只扶持GET供给
  • HttpRequestHandler:Http 央求微电脑,方今只扶植GET方法
  • HttpResponseHandler:Http响应微机

下边是写在test中的例子

class WebServerTest {
    static void main(args) {
        def pipeLine = new PipeLine()

        def readChain = new HandlerChain()
        readChain.addLast(new LineBasedDecoder())
        readChain.addLast(new HttpRequestDecoder())
        readChain.addLast(new HttpRequestHandler())
        readChain.addLast(new HttpResponseHandler())

        def closeChain = new HandlerChain()
        closeChain.addLast(new ClosedHandler())

        pipeLine.addChain(readChain)
        pipeLine.addChain(closeChain)

        NioServer nioServer = new NioServer(pipeLine)
        nioServer.start()
    }
}

其余,还足以动用布置文件getty.properties安装程序的运营参数。

#用于拼接消息时使用的二进制数组的缓存区
common_buffer_size=1024
#工作线程读取tcp数据的缓存大小
worker_rcv_buffer_size=1024
#监听的端口
port=4399
#工作线程的数量
worker_num=1
#连接超时自动断开时间
timeout=900
#根目录
root=.

总结

Getty是自己造的第1个小轮子,第三个是RedisHttpSession。都说不要再一次造轮子。那话作者是肯定的,不过精晓一门技能最佳的方法正是实施,在未有合适项目方可采取新技艺的时候,造三个简约的轮子是不移至理的实践手段。

Getty 的短处可能说还足以优化的点:

  1. 线程的接收直接用了Thread类,看起来有一些low。等将来水平晋级了再来抽象一下。
  2. 当下唯有读事件是异步的,写事件是一块的。今后将写事件也改为异步的。
You can leave a response, or trackback from your own site.

Leave a Reply

网站地图xml地图