Java IO类型

Published: 24 Jan 2022 Category: IO

介绍

描述IO类型时经常会交替地使用非阻塞、异步等术语,但这两个词是有着很大的区别的。本文将从理论和实践两个方面来说明下Java编程里的非阻塞和异步IO。

TCP和UDP协议使用了套接字进行双端通信。Java的套接字 API则是底层操作系统具体实现的的适配器。兼容POSIX规范的操作系统(如Unix, Linux, Mac OS X, BSD, Solaris, AIX等)中使用的socket通信被称作伯克利套接字(Berkeley sockets)。Windows中的套接字叫winsock,它也是基于伯克利套接字,但增加了额外的功能用于支持windows的编程模型。

POSIX定义

本文使用的是POSIX规范中的简化版定义。

阻塞线程————等待特定条件以便能继续执行的线程。

阻塞————套接字的一种属性,所有对该套接字的调用都要等待所请求的动作执行完成后才返回。

非阻塞————套接字的一种属性,当所请求的动作无法在可预期的时间内完成,对该套接字的调用无需等待直接返回。

同步IO操作————它会阻塞所请求的线程,直到IO操作完成。

异步IO操作————它本身不会导致请求线程阻塞,也就是说线程和IO操作可以并行执行。

因此,根据POSIX规范,非阻塞和异步的区别显而易见:

非阻塞————这是套接字的一种属性,对该套接字的调用无需等待直接返回。

异步IO操作————这是IO操作的一种属性,该IO操作可以和请求线程并行执行。

IO模型

在兼容POSIX的操作系统上最常见的IO模型如下:

  • 阻塞IO
  • 非阻塞IO
  • IO多路复用
  • 信号驱动(signal-driven )IO
  • 异步IO

阻塞IO

在阻塞IO中,应用发起阻塞的系统调用,直到内核接受到数据,并将数据从内核空间拷贝到用户空间。

图片

  • 优点:容易实现。
  • 缺点:线程被阻塞。

非阻塞IO

非阻塞IO中,应用程序发起系统调用后,会立即返回如下其一的结果:

  • 如果IO操作能立即完成,则返回数据
  • 如果IO操作不能立即完成,则返回错误码,告知应用IO操作会阻塞或者设备暂时不可用。

应用程序可以不停地循环等待(重复发起系统调用),直至IO操作完成。

图片

  • 优点:应用程序不会阻塞
  • 缺点: 应用在完成前要不断等待,会导致多余的用户态和内核态的上下文切换; 该模型可能会带来额外的IO延迟,因为在内核数据可用到数据被应用读取之间可能会存在时间间隔。

IO多路复用

在IO多路复用模型(也被称为带阻塞通知的非阻塞IO)中,应用会发起一次阻塞的select系统调用,来监视多个IO描述符上的活动。可以针对某个描述符的特定的IO操作(连接,读/写,发生错误等),在IO状态就绪时获得通知。当select调用返回时,至少会有一个描述符上的操作是准备就绪的,应用可以发起一次非阻塞的调用,将数据从内核空间复制到用户空间。

图片

  • 优点:可以通过一个线程同时在多个描述符上执行IO操作。
  • 缺点: 应用仍然会在select系统调用时发生阻塞。 并非所有操作系统都能很好的支持该模式

信号驱动IO

在信号驱动IO模型中,应用程序会发起一次非阻塞的调用,注册一个信号通知的handler。当某个描述符的特定IO操作就绪时,会生成一个信号来通知应用。然后这个注册的handler会将数据从内核拷贝到用户态。

图片

  • 优点:应用不会阻塞。 性能比较不错。
  • 缺点:并非所有操作系统都能支持该模式

异步IO

在异步IO模型(也被称为重叠IO, overlapped I/O)中,应用发起非阻塞调用,来发起一次内核的后台操作。当操作完成时(内核已经接收到数据并将其从内核空间拷贝到用户空间),会发起一次完成的回调来结束这次IO操作。

异步IO和信号驱动IO的不同之处在于信号驱动IO中,内核告诉应用IO操作可以开始了,但在异步IO中,内核告诉应用IO操作已经完成了。

图片

  • 优点:应用不会阻塞;性能最优。
  • 缺点:实现最复杂;并非所有操作系统都能支持的很好。

Java IO API

Java的输入输出API是用stream(InputStream, OutputStream)来代表阻塞的、单向的数据流。

Java NIO API

Java NIO是基于Channel, Buffer, Selector来实现的,通过这些类来驱动操作系统的底层的IO操作。

Channel类代表了和一个能够进行IO操作(读或写)的实体(可以是硬件设备,文件,socket或者软件组件等)的连接。

和stream是单向的不同,channel是一个双向的通道。

Buffer类是一个定长的数据容器,有对应的方法用来读写数据。Channel中的所有数据交互都必须通过Buffer,不能直接操作:发送给Channel的数据,要先写入Buffer中,要从Channel中接收的数据也要读入到Buffer中。

stream是面向字节的,而channel是面向块数据的。面向字节的IO更简单,但对某些IO对象而言操作效率比较低。面向块的IO速度更快,但实现起来也更复杂。

Selector类可以在一次调用中订阅多个SelectableChannel对象的事件。当订阅的事件触发时,Selector类会将事件分发给对应的处理器来执行。

Java NIO2 API

Java NIO2是基于异步通道(AsynchronousServerSocketChannel, AsynchronousSocketChannel等)来实现的,可以实现异步IO操作(连接,读写,错误处理)。

异步通道提供了两套机制来进行异步IO操作。第一种是返回一个java.util.concurrent.Future对象,用来代表一个挂起的操作,可以透过它来查询和获取结果。第二种是给对应的IO操作传入一个java.nio.channels.CompletionHandler对象,当IO操作完成或者失败时,会执行对应的代码。两种方式都是等效的。

异步通道提供了一套平台无关的标准化地执行异步IO操作的方式。然而,Java API究竟能挖掘出多少底层操作系统的异步化能力的支持,这要取决于对该平台的支持好不好了。

socket实现的echo服务器

下面将会使用Java的套接字接口,来实现一个echo应用的服务端和客户端,以实现上述提到的这几种IO模型。这个应用工作原理如下:

  1. 有一个服务端监听着TCP的7000端口;
  2. 客户端应用通过一个动态的TCP端口来连接服务端的socket
  3. 客户端从控制台读取输入信息后,通过socket将对应的字节发送给服务端
  4. 服务端接收到后,将字节流再发回给客户端。
  5. 客户端收到返回的信息后将它打印到控制台上。
  6. 当客户端收到的字节流长度等于它发送的长度,它会断开和服务端的连接。
  7. 当服务端接收到某个特殊的字符串时,它会停止监听端口。

这里面字符串和字节流间采用的是UTF—8编码进行转换。

下面只列出了服务端的代码摘要,完成代码将在后面的链接中提供。

阻塞式IO

下面的例子实现的是阻塞式IO的版本。

ServerSocket.accept方法会一直阻塞直到边接建立。InputStream.read则会阻塞直到输入的数据可用为止,或者客户端连接断开。OutputStream.write也会一直阻塞直到输出数据写入完毕。

public class IoEchoServer {
   public static void main(String[] args) throws IOException {
       ServerSocket serverSocket = new ServerSocket(7000);
       while (active) {
           Socket socket = serverSocket.accept(); // blocking
           InputStream is = socket.getInputStream();
           OutputStream os = socket.getOutputStream();
           int read;
           byte[] bytes = new byte[1024];
           while ((read = is.read(bytes)) != -1) { // blocking
               os.write(bytes, 0, read); // blocking
           }
           socket.close();
       }
       serverSocket.close();
   }
}

阻塞的NIO版本

下面会用J ava的NIO来实现一个阻塞版IO。

ServerSocketChannel和SocketChannel都默认采用阻塞模式。ServerSocketChannel.accept方法会阻塞直到连接建立并返回一个SocketChannel对象。ServerSocket.read会一直阻塞直到数据可用,或连接断开。ServerSocket.write也会阻塞直到数据写入成功。

public class NioBlockingEchoServer {
   public static void main(String[] args) throws IOException {
       ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
       serverSocketChannel.bind(new InetSocketAddress("localhost", 7000));
       while (active) {
           SocketChannel socketChannel = serverSocketChannel.accept(); // blocking
           ByteBuffer buffer = ByteBuffer.allocate(1024);
           while (true) {
               buffer.clear();
               int read = socketChannel.read(buffer); // blocking
               if (read < 0) {
                   break;
               }
               buffer.flip();
               socketChannel.write(buffer); // blocking
           }
           socketChannel.close();
       }
       serverSocketChannel.close();
   }
}

非阻塞的NIO版本

public class NioNonBlockingEchoServer {
   public static void main(String[] args) throws IOException {
       ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
       serverSocketChannel.configureBlocking(false);
       serverSocketChannel.bind(new InetSocketAddress(7000));
       while (active) {
           SocketChannel socketChannel = serverSocketChannel.accept(); // non-blocking
           if (socketChannel != null) {
               socketChannel.configureBlocking(false);
               ByteBuffer buffer = ByteBuffer.allocate(1024);
               while (true) {
                   buffer.clear();
                   int read = socketChannel.read(buffer); // non-blocking
                   if (read < 0) {
                       break;
                   }
                   buffer.flip();
                   socketChannel.write(buffer); // can be non-blocking
               }
               socketChannel.close();
           }
       }
       serverSocketChannel.close();
   }
}

多路复用NIO

下面是NIO实现的多路复用IO模型。

初始化过程中,多个ServerSocketChannel对象会配置成非阻塞划式,并通过SelectionKey.OP_ACCEPT注册到同一个Selector对象里,表明对连接创建成功的事件感兴趣。

在主循环里,Selector.select方法会一直阻塞直到至少有一个注册事件发生了。Selector.selectedKeys方法会返回一组SelectionKey对象,里面包含已发生的事件。通过迭代SelectionKey对象,可以知道到底发生了什么IO事件(连接,或者读/写)以及事件关联的是哪个套接字。

SelectionKey对象只是提示对应的通道已经准备好进行某个IO操作了,但并不确保结果。

public class NioMultiplexingEchoServer {
   public static void main(String[] args) throws IOException {
       final int ports = 8;
       ServerSocketChannel[] serverSocketChannels = new ServerSocketChannel[ports];
       Selector selector = Selector.open();
       for (int p = 0; p < ports; p++) {
           ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
           serverSocketChannels[p] = serverSocketChannel;
           serverSocketChannel.configureBlocking(false);
           serverSocketChannel.bind(new InetSocketAddress("localhost", 7000 + p));
           serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
       }
       while (active) {
           selector.select(); // blocking
           Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
           while (keysIterator.hasNext()) {
               SelectionKey key = keysIterator.next();
               if (key.isAcceptable()) {
                   accept(selector, key);
               }
               if (key.isReadable()) {
                   keysIterator.remove();
                   read(selector, key);
               }
               if (key.isWritable()) {
                   keysIterator.remove();
                   write(key);
               }
           }
       }
       for (ServerSocketChannel serverSocketChannel : serverSocketChannels) {
           serverSocketChannel.close();
       }
   }
}

当SelectionKey对象告知连接建立的事件已发生后,会通过ServerSocketChannel.accept方法调用(可以是非阻塞式的)来获取连接。完成了之后,会创建出一个新的非阻塞模式下的SocketChannel对象,并使用SelectionKey.OP_READ注册到同一个Selector上,声明对该通道的读事件感兴趣。

private static void accept(Selector selector, SelectionKey key) throws IOException {
       ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
       SocketChannel socketChannel = serverSocketChannel.accept(); // can be non-blocking
       if (socketChannel != null) {
           socketChannel.configureBlocking(false);
           socketChannel.register(selector, SelectionKey.OP_READ);
       }
}

当SelectionKey提示发生了读事件时,会调用SocketChannel.read方法(也可以是非阻塞的)来将数据从SocketChannel中读取到新的ByteBuffer对象中。然后再用SelectionKey.OP_WRITE将该SocketChannel注册到同一个Selector对象上,表明现在对写事件感兴趣。注册写事件时也会用到这个ByteBuffer对象。

private static void read(Selector selector, SelectionKey key) throws IOException {
       SocketChannel socketChannel = (SocketChannel) key.channel();
       ByteBuffer buffer = ByteBuffer.allocate(1024);
       socketChannel.read(buffer); // can be non-blocking
       buffer.flip();
       socketChannel.register(selector, SelectionKey.OP_WRITE, buffer);
}

当SelectionKeys提示写事件发生时,会调用SocketChannel.write方法(可以是非阻塞的),将SelectionKey.attachment方法返回的ByteBuffer对象中所提取出来的数据中写入到SocketChannel。最后通过SocketChannel.close关闭连接。

private static void write(SelectionKey key) throws IOException {
       SocketChannel socketChannel = (SocketChannel) key.channel();
       ByteBuffer buffer = (ByteBuffer) key.attachment();
       socketChannel.write(buffer); // can be non-blocking
       socketChannel.close();
   }

每次读写完成后,都会移除SelectionKey对象以免被重复使用。但接受连接的SelectionKey不会被移除,确保还能用于后续的操作。

异步的NIO2

下面的例子会通过Java NIO2来实现一个异步IO的版本。这里用到了AsynchronousServerSocketChannel, AsynchronousSocketChannel以及完成事件handler的机制。

AsynchronousServerSocketChannel.accept方法会初始化一个异步的接受连接的操作。

public class Nio2CompletionHandlerEchoServer {
   public static void main(String[] args) throws IOException {
       AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
       serverSocketChannel.bind(new InetSocketAddress(7000));
       AcceptCompletionHandler acceptCompletionHandler = new AcceptCompletionHandler(serverSocketChannel);
       serverSocketChannel.accept(null, acceptCompletionHandler);
       System.in.read();
   }
}

当连接建立时,会调用AcceptCompletionHandler,然后通过AsynchronousSocketChannel.read(ByteBuffer destination, A attachment, CompletionHandler handler)方法初始化一个异步的读操作,数据将从向AsynchronousSocketChannel读取到一个新的ByteBuffer对象中。

class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {
   private final AsynchronousServerSocketChannel serverSocketChannel;
   AcceptCompletionHandler(AsynchronousServerSocketChannel serverSocketChannel) {
       this.serverSocketChannel = serverSocketChannel;
   }
   @Override
   public void completed(AsynchronousSocketChannel socketChannel, Void attachment) {
       serverSocketChannel.accept(null, this); // non-blocking
       ByteBuffer buffer = ByteBuffer.allocate(1024);
       ReadCompletionHandler readCompletionHandler = new ReadCompletionHandler(socketChannel, buffer);
       socketChannel.read(buffer, null, readCompletionHandler); // non-blocking
   }
   @Override
   public void failed(Throwable t, Void attachment) {
       // exception handling
   }
}

当读操作完成时,会调用ReadCompletionHandler类,再通过AsynchronousSocketChannel.write(ByteBuffer source, A attachment, CompletionHandler handler)方法初始化一个异步写操作,将数据从ByteBuffer中写入到AsynchronousSocketChannel。

class ReadCompletionHandler implements CompletionHandler<Integer, Void> {
   private final AsynchronousSocketChannel socketChannel;
   private final ByteBuffer buffer;
   ReadCompletionHandler(AsynchronousSocketChannel socketChannel, ByteBuffer buffer) {
       this.socketChannel = socketChannel;
       this.buffer = buffer;
   }
   @Override
   public void completed(Integer bytesRead, Void attachment) {
       WriteCompletionHandler writeCompletionHandler = new WriteCompletionHandler(socketChannel);
       buffer.flip();
       socketChannel.write(buffer, null, writeCompletionHandler); // non-blocking
   }
   @Override
   public void failed(Throwable t, Void attachment) {
       // exception handling
   }
}

写操作完成时会调用WriteCompletionHandler,再通过AsynchronousSocketChannel.close方法关闭连接。

class WriteCompletionHandler implements CompletionHandler<Integer, Void> {
   private final AsynchronousSocketChannel socketChannel;
   WriteCompletionHandler(AsynchronousSocketChannel socketChannel) {
       this.socketChannel = socketChannel;
   }
   @Override
   public void completed(Integer bytesWritten, Void attachment) {
       try {
           socketChannel.close();
       } catch (IOException e) {
           // exception handling
       }
   }
   @Override
   public void failed(Throwable t, Void attachment) {
       // exception handling
   }
}

在这个例子中,并没有用到attachment对象,因为所有需要用到的对象(AsynchronousSocketChannel, ByteBuffer)都在构造方法中传递进来了。

结论

socket通信中如何选择IO模型取决于网络通信的情况。如果IO操作比较耗时但不频繁,异步IO会是一个不错的选择。然后如果都是短而快的IO操作,同步IO可能会更好一些,能减少内核调用的处理开销。

尽管Java针对不同的操作系统提供了一套标准化的Socket通信的机制,但执行性能仍然和具体的平台实现密切相关。可以看下这篇文章来了解下不同实现的区别。

这里是文中例子的完整的代码实现。

英文原文链接