NIO服务器

从简单I/O到异步非阻塞channel的Java Socket模型演变之旅

上世纪九十年代后期,我在一家在线视频游戏工资工作,在哪里我主要的工作就是编写Unix
Unix Berkley Socket和Windows
WinSock代码。我的任务是确保视频游戏客户端和一个游戏服务器通信。很幸运有这样的机会写一些Java
Socket代码,我对Java流式网络编程和简洁明了的API着迷。这一点都不让人惊讶,Java最初就是设计促进智能设备之间的通信,这一点很好的转移到了桌面应用和服务器应用。

1996年,JavaWorld刊登了Qusay H. Mahmoud的文章”Sockets programming in
Java: A
tutorial“。文章概述了Java的Socket编程模型。从那以后的18年,这个模型少有变化。这篇文章依然是网络系统Java
socket编程的入门经典。我将在此基础之上,首先列出一个简单的客户端/服务器例子,开启Java
I/O谦卑之旅。此例展示来自java.io包和NIO——Java1.4引起的新的非阻塞I/O
API的特性,最后一个例子会涉及Java 7引入的 NIO2 某些特性。

导语

NIO的出现是为服务器端编程而设计的。它的作用就是能够让一个线程为多个连接服务。NIO中的API都是非阻塞模式的,这样可以在服务器端采用异步的方式来处理多个请求。NIO中有两个重要的东西就是通道和缓冲器,这两个必须掌握。

Java的Socket编程:TCP和UDP

Socket编程拆分为两个系统之间的相互通信,网络通信有两种方式:ransport
Control Protocol(TCP)和User Datagram
Protocol(UDP)。TCP和UDP用途不一,并且有各自独特的约束:

  •  TCP协议相对简单稳定,可以帮助客户端与一台服务器建立连接,这样两个系统就可以通信。在TCP协议中,每个实体都能保证其通信载荷(communication
    payload)会被接受。
  •  UDP是一种非连接协议,适用于那些无需保证每个包都能抵达终点的场景,比如流媒体。

如何区分这两者的差异?试想,倘若你在自己喜欢的网站上观看流媒体视频,这时掉帧会发生什么。你是倾向于客户端放缓视频接收丢失的帧,还是继续观看视频呢?典型的流媒体协议采用UDP协议,因为TCP协议保障传输,HTTP、FTP、SMTP、POP3等协议会选择TCP。

普通IO与NIO

使用普通IO进行服务器编程时,每当有一个连接来临时,我们需要为该连接服务。如果服务器只输出数据的话,那么直接向该连接中写入数据即可,一般情况下调用write()之类的方法即可写入数据。但是我们知道该方法会阻塞,这是什么原因呢?其实很简单,我们知道网络协议栈是进行分层了的。只有等下层做好了接受数据的准备时,上层才能把数据传给下层。我们知道最底层是物理层,对应的是我们的网卡和驱动。网卡中是有数据寄存器的,只有当数据寄存器未满时,它才接受来自上层的数据。还有就是数据传输时会希望拿到对方的ACK,如果没拿到会进行重传,这也导致了数据在某一层的累计。最终的效果就是调用write方法时,发现写了一些数据写不了,然后write就被阻塞,等待下层有接受数据的空间时继续写,直到所有的数据写完时write才返回。这样就会使得的我们干着急—正在服务的连接阻塞在write上但又无法接受新的连接。那么NIO有什么好处呢?首先,IO的操作可被配置为非阻塞模式,在这种模式下,它会尽力而为的去写,比如当前只能写3字节,它写完这三字节马上返回。这样的话就为我们节省了时间。那么问题又来了,我是如何知道写了多少字节呢?不用担心,在NIO中引入了缓冲器的概念,它会记录写了多少,还需要多少去写的。

以往的Socket编程

早在NIO以前,Java
TCP客户端socket代码主要由java.net.Socket类来实现。下面的代码开启了一个对服务器的连接:

Socket socket = new Socket( server, port );

一旦Socket实例与服务器相连,我们就可以获得服务器端的输入输出流。输入流用来读取服务器端的数据,输出流用来将数据写回到服务器端。可以执行以下的方法获取输入输出流:

InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();

这是基本的流——用来读取或者写入一个文件的流是相同的,所以我们能够将其转换成最好的形式服务于用例中。比如,我们可以用一个PrintStream
包装
OutputStream,这样我们就能轻易地用println()等方法对文本进行写的操作。再比如,我们用BufferedReader包装
InputStream,再通过InputStreamReader可以很容易的用readLine()等方法对文本进行读操作。

多线程IO与NIO

多线程IO好,还是NIO好,这个不能一概而论。首先从设计的复杂性上来说NIO比较复杂。但是使用多线程IO的话又很消耗内存,而且还有存在线程切换的问题。其次NIO是不是就一定比多线程IO快,这个也不一定。有人做过测试说在Linux上使用Java
6完成的实际测试中,多线程经典IO设计要胜出于NIO
30%左右。那什么情况下使用NIO呢?有种情况就是服务器需要同时支持超大量的长期连接,比如10000个连接以上,而客户端不会很频繁的发送太多的数据。

Java I/O示例第一部分:HTTP客户端

通过一个简短的例子来看如何执行HTTP
GET获取一个HTTP服务。HTTP比本例更加复杂成熟,在我们只写一个客户端代码去处理简单案例。发出一个请求,从服务器端获取一个资源,同时服务器端返回响应,并关闭流。本案例所需的步骤如下:

  1. 澳门新浦京8455com,创建端口为80的网络服务器所对应的客户端Socket。
  2. 从服务器端获取一个PrintStream,同时发送一个GET PATH
    HTTP/1.0请求,其中PATH就是服务器上的请求资源。比如,假设你想打开一个网站根目录,那么path就是
    / 。
  3. 获取服务器端的InputStream,用一个BufferedReader将其包装,然后按行读取响应。

通道

通道的作用就是将缓冲器中的数据移入或移出到各种I/O源,如文件,socket等。通道类的层次结构相当复杂,有多个接口和许多可选操作。但是对于TCP连接来说,只需要关注SocketChannle,ServerSocketChannel这两个类即可。

列表1、 SimpleSocketClientExample.java

package com.geekcap.javaworld.simplesocketclient;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;

public class SimpleSocketClientExample
{
    public static void main( String[] args )
    {
        if( args.length < 2 )
        {
            System.out.println( "Usage: SimpleSocketClientExample <server> <path>" );
            System.exit( 0 );
        }
        String server = args[ 0 ];
        String path = args[ 1 ];

        System.out.println( "Loading contents of URL: " + server );

        try
        {
            // 创建与端口为80的网络服务器对应的客户端socket
            Socket socket = new Socket( server, 80 );

            //从服务器端获取一个PrintStream
            PrintStream out = new PrintStream( socket.getOutputStream() );
            //获取服务器端的InputStream,用一个BufferedReader将其包装
            BufferedReader in = new BufferedReader( new InputStreamReader( socket.getInputStream() ) );

            //发送一个GET PATH HTTP/1.0请求到服务器端
            out.println( "GET " + path + " HTTP/1.0" );
            out.println();

            //按行的读取服务器端的返回的响应数据
            String line = in.readLine();
            while( line != null )
            {
                System.out.println( line );
                line = in.readLine();
            }

            // 关闭流
            in.close();
            out.close();
            socket.close();
        }
        catch( Exception e )
        {
            e.printStackTrace();
        }
    }
}

列表1接受两个命令行参数:需要连接的服务器,需要取回的资源。创建一个Socket指向服务器端,并且显式地为其指定端口号80,接着程序会指向这个命令:

GET PATH HTTP/1.0

比如

GET / HTTP/1.0

连接

SocketChannel类没有任何公共构造函数,只能通过open()方法来创建SocketChannel对象:

public static SocketChannel open(SocketAddress remote) throws IOException
public static SocketChannle open() throws IOExecption

第一个方法会建立连接。这个方法将阻塞(也就说在连接建立或抛出异常之前,这个方法不会返回)。如果需要在连接之前对Socket进行配置,则需要调用第二种方法,例如:

SocketChannel channel = SocketChannel.open()
SocketAddress address = new InetSocketAddress("www.xdysite.cn", 80);
address.configureBlocking(false)
channel.connect(address);

这个过程中发生了什么?

当你准备从一个web服务器获取一个网页,比如 www.google.com, HTTP
client利用DNS服务器去获取服务器地址:从最高域名服务器开始查询com域名,哪里存有
www.google.com 的权威域名服务器,接着 HTTP client询问域名服务器
www.google.com
的IP地址。接下来,它会打开一个Socket通向端口80的服务器。最后, HTTP
Client执行特定的HTTP方法,比如GET、POST、PUT、DELETE、HEAD
或者OPTI/ONS。每种方法都有自己的语法,如上述的代码列表中,GET方法后面依次需要一个path、HTTP/版本号、一个空行。如果想加入 HTTP
Headers,我们必须在进入新的一行之前完成。

在列表1中,获取了一个 OutputStream,并用 PrintStream
包装了它,这样我们就能容易的执行基于文本的命令。 同样,从 InputStream
获取的代码,InputStreamReader 包装之后,流被转化成一个Reader,再用
BufferedReader
包装。这样我们就能用PrintStream执行GET方法,用BufferedReader
按行读取响应直到获取的响应为 null 时结束,最后关闭Socket。

现在我们执行这个类,传入以下的参数:

java com.geekcap.javaworld.simplesocketclient.SimpleSocketClientExample www.javaworld.com /

你应该能够看到类似下面的输出:

Loading contents of URL: www.javaworld.com
HTTP/1.1 200 OK
Date: Sun, 21 Sep 2014 22:20:13 GMT
Server: Apache
X-Gas_TTL: 10
Cache-Control: max-age=10
X-GasHost: gas2.usw
X-Cooking-With: Gasoline-Local
X-Gasoline-Age: 8
Content-Length: 168
Last-Modified: Tue, 24 Jan 2012 00:09:09 GMT
Etag: "60001b-a8-4b73af4bf3340"
Content-Type: text/html
Vary: Accept-Encoding
Connection: close

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="utf-8" />
    <title>Gasoline Test Page</title>
</head>
<body>
<br><br>
<center>Success</center>
</body>
</html>

本输出显示了JavaWorld网站测试页面,网页HTTP version 1.1,响应200 OK.

读取

为了读取SocketChannel,首先要创建一个ByteBuffer,通道可以在其中存储数据。然后将这个ByteBuffer传给read()方法:

public abstract int read(ByteBuffer dst) throws IOException

通道会用尽可能多的数据填充缓冲区,然后返回放入的字节数。如果遇到流末尾。通道会用所有剩余的字节填充缓冲区,而且在下一次调用read()时会返回-1。因为数据将存储在缓冲区的当前位置,而这个位置会随着增加更多数据而自动更新,所以可以一直向read()方法中传入同一个缓冲区,直到缓冲区填满。例如,下面的循环会一直读取数据,直到缓冲区填满或者检测到流末尾为止:

while (buffer.hasRemaining() && channel.read(buffer) != -1)

有时如果能从一个源填充多个缓冲区将会很有用,这被称为散布。下面两个方法接受一个ByteBuffer对象数组作为参数,按顺序填充数组中的各个ByteBuffer:

public final long read(ByteBuffer[] dsts) throws IOExecption
public final long read(ByteBuffer[] dsts, int offset, int length) throws IOException

下面是其使用示例:

ByteBuffer[] buffers = new ByteBuffer[2];
buffers[0] = ByteBuffer.allocate(1000);
buffers[1] = ByteBuffer.allocate(1000);
while(buffers[1].hasRemaining() && channel.read(buffers) != -1);

Java I/O示例第二部分:HTTP服务器

刚才我们说了客户端,幸运的是,服务器端的通信也是很容易。从一个简单的视角看,处理过程如下:

  1. 创建一个ServerSocket,并指定一个监听端口。
  2. 调用 ServerSocket的 accept() 方法监听来自客户端的连接。
  3. 一旦有客户端连接服务器,accept()
    方法通过服务器与客户端通信,返回一个Socket。在客户端用过同样的Socket类,那么处理过程相同,获取
    InputStream 读取客户端信息,OutputStream 写数据到客户端。
  4. 如果服务器需要扩展,你需要将Socket传给其他的线程去处理,因此服务器可以持续的监听后来的连接。
  5. 再次调用 ServerSocket的 accept() 方法监听其它连接。

正如你所看到的,NIO处理此场景略有不同。可以直接创建ServerSocket,并将一个端口号传给它用于监听(关于
ServerSocketFactory 的更多信息会在后面讨论):

ServerSocket serverSocket = new ServerSocket( port );

通过 accept() 方法接收传入的连接:

Socket socket = serverSocket.accept();
// 处理连接……

写入

Socket通道提供了读写方法,一般情况下它们是全双工的。要想写入,只需要一个ByteBuffer,将其回绕,然后传给某个写入方法,这个方法在把数据复制到输出并将缓存排空,这与读取过程正好相反。基本的write()会接受一个缓冲区作为参数:

public abstract int write(ByteBuffer src) throws IOException

与读取一样,如果通道是非阻塞的,这个方法不能保证会写入缓冲区的全部内容。当然由于缓冲区基于游标的特性,我们可以反复来调用该方法,直到缓冲区排空:

while (buffer.hasRemaining() && channel.write(buffer) != -1)

将多个缓冲区的数据写入到一个SocketChannel被称作聚集。例如:你在一个缓冲区中存储了HTTP首部,而在另一个缓冲区中存储了HTTP主体。可以使用下面的两个方法一次写完这两个缓冲区:

public final long write(ByteBuffer[] dsts) throw IOException
public final long write(ByteBuffer[] dsts, int offset, int length) throws IOException

第一个方法是排空所有的缓冲区,第二个方法是从位于offset的缓冲区开始,排空length个缓冲区

多线程Socket编程

在如下的列表2中,所有的服务器代码放在一起组成一个更加健壮的例子,本例中线程处理多个请求。服务器是一个ECHO服务器,就是说会将所有接收到的消息返回。

列表2中的例子不是很复杂,但已经提前介绍了一部分NIO的内容。在线程代码上花费一些精力,是为了构建一个处理多并发请求的服务器。

关闭

和正常的Socket一样,在用完通道后应当将其关闭,释放它可能使用的端口和其他任何资源:

public void close() throws IOException

如果通道已经关闭,再次进行关闭将没有任何效果。如果试图读写已关闭的通道。将抛出一个异常。

列表2、SimpleSocketServer.java

package com.geekcap.javaworld.simplesocketclient;

import java.io.BufferedReader;
import java.io.I/OException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class SimpleSocketServer extends Thread
{
    private ServerSocket serverSocket;
    private int port;
    private boolean running = false;

    public SimpleSocketServer( int port )
    {
        this.port = port;
    }

    public void startServer()
    {
        try
        {
            serverSocket = new ServerSocket( port );
            this.start();
        }
        catch (I/OException e)
        {
            e.printStackTrace();
        }
    }

    public void stopServer()
    {
        running = false;
        this.interrupt();
    }

    @Override
    public void run()
    {
        running = true;
        while( running )
        {
            try
            {
                System.out.println( "Listening for a connection" );

                // 调用 accept() 处理下一个连接
                Socket socket = serverSocket.accept();

                // 向 RequestHandler 线程传递socket对象进行处理
                RequestHandler requestHandler = new RequestHandler( socket );
                requestHandler.start();
            }
            catch (I/OException e)
            {
                e.printStackTrace();
            }
        }
    }

    public static void main( String[] args )
    {
        if( args.length == 0 )
        {
            System.out.println( "Usage: SimpleSocketServer <port>" );
            System.exit( 0 );
        }
        int port = Integer.parseInt( args[ 0 ] );
        System.out.println( "Start server on port: " + port );

        SimpleSocketServer server = new SimpleSocketServer( port );
        server.startServer();

        // 1分钟后自动关闭
        try
        {
            Thread.sleep( 60000 );
        }
        catch( Exception e )
        {
            e.printStackTrace();
        }

        server.stopServer();
    }
}

class RequestHandler extends Thread
{
    private Socket socket;
    RequestHandler( Socket socket )
    {
        this.socket = socket;
    }

    @Override
    public void run()
    {
        try
        {
            System.out.println( "Received a connection" );

            // 获取输入和输出流
            BufferedReader in = new BufferedReader( new InputStreamReader( socket.getInputStream() ) );
            PrintWriter out = new PrintWriter( socket.getOutputStream() );

            // 向客户端写出头信息
            out.println( "Echo Server 1.0" );
            out.flush();

            // 向客户端回写信息,直到客户端关闭连接或者收到空行
            String line = in.readLine();
            while( line != null && line.length() > 0 )
            {
                out.println( "Echo: " + line );
                out.flush();
                line = in.readLine();
            }

            // 关闭自己的连接
            in.close();
            out.close();
            socket.close();

            System.out.println( "Connection closed" );
        }
        catch( Exception e )
        {
            e.printStackTrace();
        }
    }
}

在列表2中,我们创建了一个新的 SimpleSocketServer
实例,并开启了这个服务器。继承 Thread 的 SimpleSocketServer
创建一个新的线程,处理存在于 run() 方法中的阻塞方法 accept() 调用。

run()
方法中存在一个循环,用来接收客户端请求,并创建RequestHandler线程去处理这些请求。再次强调,这是一个相对简单的编程,但涉及了相当的线程编程。

RequestHandler 处理客户端通信代码与列表1相似:PrintStream 包装后的
OutputStream 更容易进行写操作。同 样,BufferedReader 包装后的InputStream
更易于读取。只要服务器在跑,RequestHandler
就会将客户端的信息按行读取,并将它们返回给客户端。如果客户端发过来的是空行,那对话就结束了,RequestHandler
关闭Socket 。

ServerSocketChannel类

ServerSocketChannel只有一个目的:接受入站连接。我们是无法读取,写入或者连接ServerSocketChannel的。我们可以通过其accept()来得到一个连接(这里的accept方法和
ServerSocket中的accept方法是一样的,都是从连接队列中取出一个连接并将该连接返回给我们)。下面我们来看一个通过ServerSocketChannel来创建一个服务器Socket的例子:

ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(80));

NIO、NIO2 Socket编程

对于多数应用而言,Java基础的Socket编程,我们已经做了充分的探讨。对于涉及到高强度的
I/O 或者异步输入输出,大家就有了熟悉Java NIO和NIO.2中非阻塞API的需要。

JDK1.4 NIO包提供了如下重要特性:

  • Channel 被设计用来支持块(bulk)转移,从一个NIO转到另一个NIO。
  • Buffer 提供了连续的内存块,由一组简单的操作提供接口。
  • 非阻塞I/O 是一组class文件,它们可以将 Channel
    开放给普通的I/O资源,比如文件和Socket。

用NIO编码时,你可以打开一个到目的地的Channel,接着从目的地读取数据到一个buffer中;写入数据到一个buffer中,接着将其发送到目的地。我会创建一个Socket,并为此获取一个Channel。但首先让我们回顾一下buffer的处理流程:

  1. 写数据到一个buffer中。
  2. 调用buffer的 flip() 方法准备读的操作。
  3. 从buffer中读取数据。
  4. 调用buffer中的 clear() 或者 compact() 方法准备读取更多的数据。

当数据写入buffer后,buffer知道写入其中的数据量。它维护了三个属性,在读模式和写模式中其含义不尽相同。

  • Position:在写模式中,初始position值为0,它存储的是写入buffer后的当前位置;一旦flip一个buffer使其进入读模式,它会将位置的值重置为0,然后存储读取buffer后的当前位置。
  • Capacity:指的是buffer的固定大小。
  • Limit:在写模式中,limit定义了写入buffer的数据大小;在读模式中,limit定义了可以从buffer中读取的数据大小。

接受连接

一旦打开并绑定ServerSocketChannel对象,accept()就可以调用了:

public abstract SocketChannel accept() throws IOException

accept()可以在阻塞或者非阻塞模式下工作,这取决于我们对Socket的配置(默认情况是阻塞的)。在阻塞情况下,如果请求连接的队列中有连接,那么它会从中取出一个并返回一个SocketChannel对象。我们可以通过该对象与客户端进行数据传输了。但是需要注意的该对象不是Socket的子类,我们不能从它上面直接调用类似getInputStream()之类的方法来使用。它只能通过缓冲器来使用。

ServerSocketChannel还可以工作在非阻塞模式(在调用bind方法之前先调用configureBlocking方法)。在这种情况下,如果没有入站连接,accept()方法会返回null。非阻塞模式更适合于需要为每个连接完成大量工作的服务器,这样可以并行处理多个请求了。非阻塞模式下一般都与Selector结合使用。

Java I/O示例第三部分:基于NIO.2的ECHO服务器

JDK 7引入的NIO.2添加了非阻塞I/O库去支持文件系统任务,比如 java.nio.file
包和 java.nio.file.Path 类,并提供了一个
新的文件系统API。记住,我们采用IO.2 AsynchronousServerSocketChannel
写一个新的ECHO服务器。

 ”NIO在提供处理性能方法大放异彩,但NIO的结果跟底层平台紧密相连。比如,或许你会发现,NIO加速应用性能不光取决于OS,还跟特定的JVM有关,主机的虚拟化上下文、大存储特性、甚至数据……”
——摘自”Five ways to maximize Java NIO and
NIO.2“

AsynchronousServerSocketChannel
提供了一个非阻塞异步Channel作为流定向监听的Socket。为了用这个Channel,首先需要执行它的
open() 静态方法。然后调用 bind()
为其绑定一个端口号。接着,将一个实现CompletionHandler接口的类传给
accept() 并执行。多数时候,你会发现 handler作为匿名内部类被创建。

列表3显示新的异步ECHO服务器源码。

Channels类

ChannelS是一个简单的工具,可以将传统的基于I/O流,阅读器(Reader)和(Writer)包装在通道中,也可以从通道转化为基于I/O流,阅读器和书写器。处于性能的考虑,有可能在程序中的一部分中使用新I/O模型,但是同时仍要与处理流的传统API交互,这个类将很有用。下面是其的一些转换方法:

public static InputStream newInputStream(ReadableByteChannel ch)
public static OutputStream newOutPutStream(WritableByteChannel ch)
public static ReadableByteChannel newChannel(InputStream in)
public static WritableByteChannel newChannel(OutputStream out)
public static Reader newReader (ReadableByteChannel channel, CharsetDecoder decoder, int minimumBufferCapacity)
public static Reader newReader (ReadableByteChannel ch, String encoding)
public static Writer newWriter (WritableByteChannel ch, String encoding)

SocketChannel类实现了这些方法签名中出现的ReadableByteChannelWritableByteChannel接口。ServerSocketChannel则都没实现,所以无法对
ServerSocketChannel进行读写。

列表3、SimpleSocketServer.java

package com.geekcap.javaworld.nio2;

import java.io.I/OException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class NioSocketServer
{
    public NioSocketServer()
    {
        try
        {
            // 创建一个 AsynchronousServerSocketChannel 侦听 5000 端口
            final AsynchronousServerSocketChannel listener =
                    AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(5000));

            // 侦听新的请求
            listener.accept( null, new CompletionHandler<AsynchronousSocketChannel,Void>() {

                @Override
                public void completed(AsynchronousSocketChannel ch, Void att)
                {
                    // 接受下一个连接
                    listener.accept( null, this );

                    // 向客户端发送问候信息
                    ch.write( ByteBuffer.wrap( "Hello, I am Echo Server 2020, let's have an engaging conversation!n".getBytes() ) );

                    // 分配(4K)字节缓冲用于从客户端读取信息
                    ByteBuffer byteBuffer = ByteBuffer.allocate( 4096 );
                    try
                    {
                        // Read the first line
                        int bytesRead = ch.read( byteBuffer ).get( 20, TimeUnit.SECONDS );

                        boolean running = true;
                        while( bytesRead != -1 && running )
                        {
                            System.out.println( "bytes read: " + bytesRead );

                            // 确保有读取到数据
                            if( byteBuffer.position() > 2 )
                            {
                                // 准备缓存进行读取
                                byteBuffer.flip();

                                // 把缓存转换成字符串
                                byte[] lineBytes = new byte[ bytesRead ];
                                byteBuffer.get( lineBytes, 0, bytesRead );
                                String line = new String( lineBytes );

                                // Debug
                                System.out.println( "Message: " + line );

                                // 向调用者回写
                                ch.write( ByteBuffer.wrap( line.getBytes() ) );

                                // 准备缓冲进行写操作
                                byteBuffer.clear();

                                // 读取下一行
                                bytesRead = ch.read( byteBuffer ).get( 20, TimeUnit.SECONDS );
                            }
                            else
                            {
                                // 在我们的协议中,空行表示会话结束
                                running = false;
                            }
                        }
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    catch (ExecutionException e)
                    {
                        e.printStackTrace();
                    }
                    catch (TimeoutException e)
                    {
                        // 用户达到20秒超时,关闭连接
                        ch.write( ByteBuffer.wrap( "Good Byen".getBytes() ) );
                        System.out.println( "Connection timed out, closing connection" );
                    }

                    System.out.println( "End of conversation" );
                    try
                    {
                        // 如果需要,关闭连接
                        if( ch.isOpen() )
                        {
                            ch.close();
                        }
                    }
                    catch (I/OException e1)
                    {
                        e1.printStackTrace();
                    }
                }

                @Override
                public void failed(Throwable exc, Void att) {
                    ///...
                }
            });
        }
        catch (I/OException e)
        {
            e.printStackTrace();
        }
    }

    public static void main( String[] args )
    {
        NioSocketServer server = new NioSocketServer();
        try
        {
            Thread.sleep( 60000 );
        }
        catch( Exception e )
        {
            e.printStackTrace();
        }
    }
}

在列表3中,我们首先创建了一个新的AsynchronousServerSocketChannel,然后为其绑定端口号5000:

final AsynchronousServerSocketChannel listener =
    AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(5000));

调用 AsynchronousServerSocketChannel 的
accept(),通知其监听一个连接,并将一个典型的CompletionHandler传给它。一旦调用
accept(),结果会立即返回。注意,本例不同于列表2中的ServerSocket类;除非一个客户端与ServerSocket相连,否则accept()会被阻塞。AsynchronousChannelGroup
的 accept() 会为我们解决这个问题。

Selector类

Selector类的作用就是对通道进行检查。我们向其注册多个通道,只要有一个通道准备就绪(可接受,可读,可写)时,Selector就会通知我们。

Selector唯一的构造函数是一个保护类型方法,一般情况下,要调用静态工程方法Selector.open()来创建新的选择器:

public static Selector open() throws IOException

下一步是为具有通道特性的server注册该选择器,这里需要调用ServerSocketChannel对象的
register()方法。

public fianl SelectionKey register(Selector sel, int pos)
public final SelectionKey register(Selector sel, int pos, Object att)

第一个参数是选择器,第二个参数是SelectionKey类中的一个常量,标识选择器所要观察的操作。SelectionKey中定义的4个常量:

  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

不同的通道注册到选择器后,就可以随时查看选择器来找出哪些通道已经准备好可以进行处理了。下面的两个方法就是用来查看是否有通道准备好:

public abstract int select() throws IOException
public abstract int select(long timeout) throws IOException

第一个方法会阻塞,直到至少有一个注册的通道准备好可以进行处理了。第二个在返回前会等到不超过timeout毫秒。当知道有通道准备好处理时(select方法返回了),我们就可以使用SelctionKyes()方法获取就绪的通道了。

public abstract Set<SelectionKey> selectedKeys()

迭代处理返回的集合时,要依次处理每个SelectionKey。注意:在我们没处理一个key时,我们需要将其从集合中删除,因为Selector只会往集合中添加已就绪的key,但是不会清理里面已被处理过的key。

完整的Handler处理

接 下来的主要任务就是创建一个 CompletionHandler 类,并实现 completed()
和 failed() 方法。当 AsynchronousServerSocketChannel
接收一个客户端连接,这个连接包含一个连接客户端的
AsynchronousSocketChannel,completed()方法就会被调用。completed()方法第一次被调用从AsynchronousServerSocketChannel
处接收连接,开始与客户端进行通信。首先它做的事情向客户端写入一个“hello”消息:建立一个字符串,并将其转换成字节数组并将其传给
ByteBuffer.wrap(),完了构造一个ByteBuffer。接着ByteBuffer传给
AsynchronousSocketChannel的 write() 方法。

为了更够从客户端那里读取数据,我们创建了一个新的ByteBuffer,并调用它的allocate(4096)。接
着我们调用了AsynchronousSocketChannel的 read() 方法,此方法会返回一个
Future<Integer>,调用后者的 get()
方法可以获取读自客户端的字节数。在本例中,我们传递了20秒的timeout参数给
get();如果20分钟没有得到响应,那 get()
就会抛出一个TimeoutException。本回响服务器的应对策略是,如果20秒没有响应,就终止这个对话。

异步计算中的Future
“The
Future<V>接口显示一个异步计算的结果,此结果作为一个Future,因为它直到未来的某个时刻才存在。你可以调用它的方法去取消一个任务,返回任务的结果——如果任务没有完成,无限等待或者超时退出——并且决定任务是否已取消或者完成……”。
——摘自”Java concurrency without the pain, Part
1“

接下来我们会检测buffer的position,它会定位到最后一个来自客户端的byte。倘若客户端发来的是一个空行,接收两个字节:一个回车和一个换行。检测确保客户端发出一个空白行,我们以此作为客户端对话结束的信号。如果我们拥有有意义的数据,那我们就调用ByteBuffer的
flip()
方法去进入读的状态。我们可以创建一个临时byte数组去存储读自客户端的数据,然后调用ByteBuffer的
get()
加载数据到byte数组中。最后,我们通过创建一个新的String对象将数组转换成一行字符串。我们将这行字符串返回给客户端:将字符串line转换成一个byte数组,作为参数传递给
ByteBuffer.wrap(),然后调用 AsynchronousSocketChannel的write()
方法。接着调用ByteBuffer的clear(),这样position被重置为0并将ByteBuffer置于写的模式,接着我们读取客户端下一行。

需要注意的是 main() 方法。它
创建了服务器,同时创建了一个让应用跑60秒的计时器。这是因为AsynchronousSocketChannel的
accept() 会理解返回,如果线程 Thread.sleep()
不执行,应用将会立即停止。为了进行测试,启动服务器后用telnet客户端进行连接:

telnet localhost 5000

发送少量的字符串给服务器,观察它们向你返回结果,然后发送一个空行结束对话。

SelectionKey类

一个selector对象可以被多个通道注册,当注册完毕后会返回SelectionKey对象,该对象是与具体的通道绑定的,是通道的风向标。返回值是不需要保存的,我们可以调用Selector中的selectedKeys()方法来获取所有的SelectionKey对象(返回的是一个SelectionKey集合)。当从所选择的键集合中获取一个SelectionKey时,通常首先要测试这些键能进行哪些操作。有以下4种可能:

public final boolean isAcceptable()
public final booolean isConnectable()
public final boolean isReadable()
public final boolean isWritable()

通过根据键的测试可以获得通道的状态。比如我们调用isWritable()返回true则表示这个键对应的通道处于可写状态,这样我们就可以在通道上执行写操作了。一旦了解到与键关联的通道准备好完成何种操作,就可以用channel()方法来获取这个通道了:

public abstract SelectableChannel channel()

如果在保存状态信息的SelectionKey存储了一个对象,就可以用attachment()方法来获取该对象:

public final Object attachment()

最后,如果结束使用连接,就要撤销其SelectionKey对象的注册,这样选择器就不会浪费资源去查询它是否准备就绪。可以调用这个键的cannel()方法来撤销注册:

public abstract void cancel()

结语

本文展示了两种Socket Java编程方式:传统的Java 1.0引入的编写方式,Java
1.4和Java 7中分别引入的非阻塞 NIO 和 NIO.2
方式。采用客户端服务器几次迭代的例子,展示了基本 Java
I/O的使用,以及一些场景下非阻塞I/O对Java
socket编程模型的改进和简化。利用非阻塞I/O,你可以编写网络应用来处理多并发连接,而无需管理多线程集合。同样,你也可以利用构建在NIO和
NIO.2上新的服务器扩展特性。

一个示例的客户端

虽然新的I/O
API并非专门为客户端而设计,但的确可以用于客户端。我们可以从一个简答的客户端开始来了解NIO。我们使用RFC864中定义的字符生成器协议来实现一个客户端。服务器在端口19监听连接。当客户端连接时,客户端的所有输入都被忽略。而服务器将发送连续的字符序列,直到客户端断开连接为止。

package com.dy.xidian;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;

public class ChargenClient {

    public static int DEFAULT_PORT = 19;

    public static void main(String[] args) {
        if (args.length == 0) {
            System.out.println("Usage: java ChargenClient host [port]");
            return;
        }

        int port;
        try {
            port = Integer.parseInt(args[1]);
        } catch (RuntimeException e) {
            port = DEFAULT_PORT;
        }

        try {
            SocketAddress address = new InetSocketAddress(args[0], port);
            SocketChannel client = SocketChannel.open(address);

            ByteBuffer buffer = ByteBuffer.allocate(74);
            WritableByteChannel out = Channels.newChannel(System.out);

            while (client.read(buffer) != -1) {
                buffer.flip();
                out.write(buffer);
                buffer.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

在实现这个新I/O
API
的客户端时,首先要调用静态工厂方法SocketChannel.open()来创建一个新的
java.nio.channel.SocketChannel对象。这个方法的参数是一个是java.net.SocketAddress对象,指示要连接的主机和端口。例如,下面的代码连接指向
www.xdysite.cn端口8000的通道:

SocketAddress addr = new InetSocketAddress("www.xdysite.cn", 8000);
SocketChannel client = SocketChannel.open(addr);

通道会以阻塞的模式打开,但是不会立马去连接服务器。如果这是传统的客户端,你可能会获取该Socket的输入和输出流,但这不是传统的客户端。利用通道可以将内容写入到缓冲区中。下面使用静态方法allocate()来创建一个容量为74字节的缓冲区ByteBuffer

ByteBuffer buffer = ByteBuffer.allocate(74);

这个ByteBuffer对象传递给通道read()方法。通道会将从Socket读取的数据填充这个缓冲区。它的返回成功读取并存储到缓冲区的字节数:

int byteRead = client.read(buffer);

默认情况下,会至少读取一个字节,或者返回-1指示数据结束。假定缓冲区中有一些数据,我们可以将这些数据复制到System.out中去。有几个方法可以中ByteBuffer中提取一个字节数组,然后再写到传统的OutputStream中去。不过,采用一种完全基于通道的解决方案更为合适。这需要利用到Channels工具类,将System.out封装到一个通道中:

WritableByteChannel output = Channels.newChannel(System.out)

然后将读取的数据写入到与System.out连接的这个输出管道中。不过,在写入数据之前需要回绕(flip)缓冲区,这个后面有讲。

buffer.flip()
output.write(buffer)
buffer.clear()

一个示例服务器

客户端使用通道和缓冲区就可以了,但是实际上通道和缓冲区主要用于处理多并发连接的服务器系统。下面是通过NIO设计的一个服务器。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class ChargenServer {
    public static int DEFAULT_PORT=19;

    public static void main(String[] args) {
        int port;
        try {
            port = Integer.parseInt(args[0]);
        } catch (RuntimeException e) {
            port = DEFAULT_PORT;
        }

        System.out.println("Listening for connections on port " + port);
        byte[] rotation = new byte[95*2];

        for(byte i=' '; i<= '~'; i++) {
            rotation[i-' '] = i;
            rotation[i+95-' '] = i;
        }

        ServerSocketChannel serverChannel;
        Selector selector;

        try {
            //获取通道
            serverChannel = ServerSocketChannel.open();
            //绑定端口
            serverChannel.bind(new InetSocketAddress(19));
            //配置为非阻塞模式,这样accept()方法将不会阻塞
            serverChannel.configureBlocking(false);
            //创建Selector,迭代处理所有准备好的连接
            selector = Selector.open();
            //向通道注册选择器,第二个参数指明选择器所要关注的操作
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
            return;
        }

        while (true) {
            try {
                //当有可以进行操作的通道时,该方法将会返回,否则阻塞
                selector.select();
            } catch (IOException e) {
                e.printStackTrace();
                break;
            }

            //获取已经就绪了的通道
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            //通过迭代的方式来处理就绪通道
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                //从集合中删除这个键
                iterator.remove();

                try {
                    if(key.isAcceptable()) { //发现这个key关联到ServerSocket并且有可接受的连接
                        //通过key获取serverSocket
                        ServerSocketChannel server = (ServerSocketChannel)key.channel();
                        //一定能够取出一个连接
                        SocketChannel client = server.accept();
                        System.out.println("Accepted connection from " + client);
                        //将来自客户端的连接设为非阻塞模式
                        client.configureBlocking(false);
                        //将该连接注册到Selector
                        SelectionKey key2 = client.register(selector, SelectionKey.OP_WRITE);
                        //为每个客户端(channel)创建一个缓冲器并填充数据
                        ByteBuffer buffer = ByteBuffer.allocate(74);
                        buffer.put(rotation, 0, 72);
                        buffer.put((byte)'r');
                        buffer.put((byte)'n');
                        buffer.flip();
                        //将该缓冲器绑定到key上以方便将来使用
                        key2.attach(buffer);
                    } else if (key.isWritable()) { //发现客户端连接可以写了
                        //通过KEY获取客户端连接
                        SocketChannel client = (SocketChannel) key.channel();
                        //获取对应的缓冲器
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        //缓冲器调整
                        if (!buffer.hasRemaining()) {
                            //用下一行重新填充缓冲区
                            buffer.rewind();
                            //得到上一次的首字符
                            int first = buffer.get();
                            //准备改变缓冲区中的数据
                            buffer.rewind();
                            //寻找rotation中新的首字符位置
                            int position = first - ' '+ 1;
                            //将数据从rotation中复制到缓冲区
                            buffer.put(rotation, position, 72);
                            //在缓冲区末尾存储一个行分隔符
                            buffer.put((byte)'r');
                            buffer.put((byte)'n');
                            //准备缓冲区写入
                            buffer.flip();
                        }
                        //将缓冲器中的数据写入到Socket中,并不保证全部写入
                        client.write(buffer);
                    }
                } catch (IOException e) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
    }
}
You can leave a response, or trackback from your own site.

Leave a Reply

网站地图xml地图