Java IO类型
介绍
描述IO类型时经常会交替地使用非阻塞、异步等术语,但这两个词是有着很大的区别的。本文将从理论和实践两个方面来说明下Java编程里的非阻塞和异步IO。
TCP和UDP协议使用了套接字进行双端通信。Java的套接字 API则是底层操作系统具体实现的的适配器。兼容POSIX规范的操作系统(如Unix, Linux, Mac OS X, BSD, Solaris, AIX等)中使用的socket通信被称作伯克利套接字(Berkeley sockets)。Windows中的套接字叫winsock,它也是基于伯克利套接字,但增加了额外的功能用于支持windows的编程模型。
POSIX定义
本文使用的是POSIX规范中的简化版定义。
阻塞线程————等待特定条件以便能继续执行的线程。
阻塞————套接字的一种属性,所有对该套接字的调用都要等待所请求的动作执行完成后才返回。
非阻塞————套接字的一种属性,当所请求的动作无法在可预期的时间内完成,对该套接字的调用无需等待直接返回。
同步IO操作————它会阻塞所请求的线程,直到IO操作完成。
异步IO操作————它本身不会导致请求线程阻塞,也就是说线程和IO操作可以并行执行。
因此,根据POSIX规范,非阻塞和异步的区别显而易见:
非阻塞————这是套接字的一种属性,对该套接字的调用无需等待直接返回。
异步IO操作————这是IO操作的一种属性,该IO操作可以和请求线程并行执行。
IO模型
在兼容POSIX的操作系统上最常见的IO模型如下:
- 阻塞IO
- 非阻塞IO
- IO多路复用
- 信号驱动(signal-driven )IO
- 异步IO
阻塞IO
在阻塞IO中,应用发起阻塞的系统调用,直到内核接受到数据,并将数据从内核空间拷贝到用户空间。
- 优点:容易实现。
- 缺点:线程被阻塞。
非阻塞IO
非阻塞IO中,应用程序发起系统调用后,会立即返回如下其一的结果:
- 如果IO操作能立即完成,则返回数据
- 如果IO操作不能立即完成,则返回错误码,告知应用IO操作会阻塞或者设备暂时不可用。
应用程序可以不停地循环等待(重复发起系统调用),直至IO操作完成。
- 优点:应用程序不会阻塞
- 缺点: 应用在完成前要不断等待,会导致多余的用户态和内核态的上下文切换; 该模型可能会带来额外的IO延迟,因为在内核数据可用到数据被应用读取之间可能会存在时间间隔。
IO多路复用
在IO多路复用模型(也被称为带阻塞通知的非阻塞IO)中,应用会发起一次阻塞的select系统调用,来监视多个IO描述符上的活动。可以针对某个描述符的特定的IO操作(连接,读/写,发生错误等),在IO状态就绪时获得通知。当select调用返回时,至少会有一个描述符上的操作是准备就绪的,应用可以发起一次非阻塞的调用,将数据从内核空间复制到用户空间。
- 优点:可以通过一个线程同时在多个描述符上执行IO操作。
- 缺点: 应用仍然会在select系统调用时发生阻塞。 并非所有操作系统都能很好的支持该模式
信号驱动IO
在信号驱动IO模型中,应用程序会发起一次非阻塞的调用,注册一个信号通知的handler。当某个描述符的特定IO操作就绪时,会生成一个信号来通知应用。然后这个注册的handler会将数据从内核拷贝到用户态。
- 优点:应用不会阻塞。 性能比较不错。
- 缺点:并非所有操作系统都能支持该模式
异步IO
在异步IO模型(也被称为重叠IO, overlapped I/O)中,应用发起非阻塞调用,来发起一次内核的后台操作。当操作完成时(内核已经接收到数据并将其从内核空间拷贝到用户空间),会发起一次完成的回调来结束这次IO操作。
异步IO和信号驱动IO的不同之处在于信号驱动IO中,内核告诉应用IO操作可以开始了,但在异步IO中,内核告诉应用IO操作已经完成了。
- 优点:应用不会阻塞;性能最优。
- 缺点:实现最复杂;并非所有操作系统都能支持的很好。
Java IO API
Java的输入输出API是用stream(InputStream, OutputStream)来代表阻塞的、单向的数据流。
Java NIO API
Java NIO是基于Channel, Buffer, Selector来实现的,通过这些类来驱动操作系统的底层的IO操作。
Channel类代表了和一个能够进行IO操作(读或写)的实体(可以是硬件设备,文件,socket或者软件组件等)的连接。
和stream是单向的不同,channel是一个双向的通道。
Buffer类是一个定长的数据容器,有对应的方法用来读写数据。Channel中的所有数据交互都必须通过Buffer,不能直接操作:发送给Channel的数据,要先写入Buffer中,要从Channel中接收的数据也要读入到Buffer中。
stream是面向字节的,而channel是面向块数据的。面向字节的IO更简单,但对某些IO对象而言操作效率比较低。面向块的IO速度更快,但实现起来也更复杂。
Selector类可以在一次调用中订阅多个SelectableChannel对象的事件。当订阅的事件触发时,Selector类会将事件分发给对应的处理器来执行。
Java NIO2 API
Java NIO2是基于异步通道(AsynchronousServerSocketChannel, AsynchronousSocketChannel等)来实现的,可以实现异步IO操作(连接,读写,错误处理)。
异步通道提供了两套机制来进行异步IO操作。第一种是返回一个java.util.concurrent.Future对象,用来代表一个挂起的操作,可以透过它来查询和获取结果。第二种是给对应的IO操作传入一个java.nio.channels.CompletionHandler对象,当IO操作完成或者失败时,会执行对应的代码。两种方式都是等效的。
异步通道提供了一套平台无关的标准化地执行异步IO操作的方式。然而,Java API究竟能挖掘出多少底层操作系统的异步化能力的支持,这要取决于对该平台的支持好不好了。
socket实现的echo服务器
下面将会使用Java的套接字接口,来实现一个echo应用的服务端和客户端,以实现上述提到的这几种IO模型。这个应用工作原理如下:
- 有一个服务端监听着TCP的7000端口;
- 客户端应用通过一个动态的TCP端口来连接服务端的socket
- 客户端从控制台读取输入信息后,通过socket将对应的字节发送给服务端
- 服务端接收到后,将字节流再发回给客户端。
- 客户端收到返回的信息后将它打印到控制台上。
- 当客户端收到的字节流长度等于它发送的长度,它会断开和服务端的连接。
- 当服务端接收到某个特殊的字符串时,它会停止监听端口。
这里面字符串和字节流间采用的是UTF—8编码进行转换。
下面只列出了服务端的代码摘要,完成代码将在后面的链接中提供。
阻塞式IO
下面的例子实现的是阻塞式IO的版本。
ServerSocket.accept方法会一直阻塞直到边接建立。InputStream.read则会阻塞直到输入的数据可用为止,或者客户端连接断开。OutputStream.write也会一直阻塞直到输出数据写入完毕。
public class IoEchoServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(7000);
while (active) {
Socket socket = serverSocket.accept(); // blocking
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
int read;
byte[] bytes = new byte[1024];
while ((read = is.read(bytes)) != -1) { // blocking
os.write(bytes, 0, read); // blocking
}
socket.close();
}
serverSocket.close();
}
}
阻塞的NIO版本
下面会用J ava的NIO来实现一个阻塞版IO。
ServerSocketChannel和SocketChannel都默认采用阻塞模式。ServerSocketChannel.accept方法会阻塞直到连接建立并返回一个SocketChannel对象。ServerSocket.read会一直阻塞直到数据可用,或连接断开。ServerSocket.write也会阻塞直到数据写入成功。
public class NioBlockingEchoServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("localhost", 7000));
while (active) {
SocketChannel socketChannel = serverSocketChannel.accept(); // blocking
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
buffer.clear();
int read = socketChannel.read(buffer); // blocking
if (read < 0) {
break;
}
buffer.flip();
socketChannel.write(buffer); // blocking
}
socketChannel.close();
}
serverSocketChannel.close();
}
}
非阻塞的NIO版本
public class NioNonBlockingEchoServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(7000));
while (active) {
SocketChannel socketChannel = serverSocketChannel.accept(); // non-blocking
if (socketChannel != null) {
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
buffer.clear();
int read = socketChannel.read(buffer); // non-blocking
if (read < 0) {
break;
}
buffer.flip();
socketChannel.write(buffer); // can be non-blocking
}
socketChannel.close();
}
}
serverSocketChannel.close();
}
}
多路复用NIO
下面是NIO实现的多路复用IO模型。
初始化过程中,多个ServerSocketChannel对象会配置成非阻塞划式,并通过SelectionKey.OP_ACCEPT注册到同一个Selector对象里,表明对连接创建成功的事件感兴趣。
在主循环里,Selector.select方法会一直阻塞直到至少有一个注册事件发生了。Selector.selectedKeys方法会返回一组SelectionKey对象,里面包含已发生的事件。通过迭代SelectionKey对象,可以知道到底发生了什么IO事件(连接,或者读/写)以及事件关联的是哪个套接字。
SelectionKey对象只是提示对应的通道已经准备好进行某个IO操作了,但并不确保结果。
public class NioMultiplexingEchoServer {
public static void main(String[] args) throws IOException {
final int ports = 8;
ServerSocketChannel[] serverSocketChannels = new ServerSocketChannel[ports];
Selector selector = Selector.open();
for (int p = 0; p < ports; p++) {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannels[p] = serverSocketChannel;
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress("localhost", 7000 + p));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
while (active) {
selector.select(); // blocking
Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
while (keysIterator.hasNext()) {
SelectionKey key = keysIterator.next();
if (key.isAcceptable()) {
accept(selector, key);
}
if (key.isReadable()) {
keysIterator.remove();
read(selector, key);
}
if (key.isWritable()) {
keysIterator.remove();
write(key);
}
}
}
for (ServerSocketChannel serverSocketChannel : serverSocketChannels) {
serverSocketChannel.close();
}
}
}
当SelectionKey对象告知连接建立的事件已发生后,会通过ServerSocketChannel.accept方法调用(可以是非阻塞式的)来获取连接。完成了之后,会创建出一个新的非阻塞模式下的SocketChannel对象,并使用SelectionKey.OP_READ注册到同一个Selector上,声明对该通道的读事件感兴趣。
private static void accept(Selector selector, SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept(); // can be non-blocking
if (socketChannel != null) {
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
}
当SelectionKey提示发生了读事件时,会调用SocketChannel.read方法(也可以是非阻塞的)来将数据从SocketChannel中读取到新的ByteBuffer对象中。然后再用SelectionKey.OP_WRITE将该SocketChannel注册到同一个Selector对象上,表明现在对写事件感兴趣。注册写事件时也会用到这个ByteBuffer对象。
private static void read(Selector selector, SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer); // can be non-blocking
buffer.flip();
socketChannel.register(selector, SelectionKey.OP_WRITE, buffer);
}
当SelectionKeys提示写事件发生时,会调用SocketChannel.write方法(可以是非阻塞的),将SelectionKey.attachment方法返回的ByteBuffer对象中所提取出来的数据中写入到SocketChannel。最后通过SocketChannel.close关闭连接。
private static void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
socketChannel.write(buffer); // can be non-blocking
socketChannel.close();
}
每次读写完成后,都会移除SelectionKey对象以免被重复使用。但接受连接的SelectionKey不会被移除,确保还能用于后续的操作。
异步的NIO2
下面的例子会通过Java NIO2来实现一个异步IO的版本。这里用到了AsynchronousServerSocketChannel, AsynchronousSocketChannel以及完成事件handler的机制。
AsynchronousServerSocketChannel.accept方法会初始化一个异步的接受连接的操作。
public class Nio2CompletionHandlerEchoServer {
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(7000));
AcceptCompletionHandler acceptCompletionHandler = new AcceptCompletionHandler(serverSocketChannel);
serverSocketChannel.accept(null, acceptCompletionHandler);
System.in.read();
}
}
当连接建立时,会调用AcceptCompletionHandler,然后通过AsynchronousSocketChannel.read(ByteBuffer destination, A attachment, CompletionHandler
class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {
private final AsynchronousServerSocketChannel serverSocketChannel;
AcceptCompletionHandler(AsynchronousServerSocketChannel serverSocketChannel) {
this.serverSocketChannel = serverSocketChannel;
}
@Override
public void completed(AsynchronousSocketChannel socketChannel, Void attachment) {
serverSocketChannel.accept(null, this); // non-blocking
ByteBuffer buffer = ByteBuffer.allocate(1024);
ReadCompletionHandler readCompletionHandler = new ReadCompletionHandler(socketChannel, buffer);
socketChannel.read(buffer, null, readCompletionHandler); // non-blocking
}
@Override
public void failed(Throwable t, Void attachment) {
// exception handling
}
}
当读操作完成时,会调用ReadCompletionHandler类,再通过AsynchronousSocketChannel.write(ByteBuffer source, A attachment, CompletionHandler
class ReadCompletionHandler implements CompletionHandler<Integer, Void> {
private final AsynchronousSocketChannel socketChannel;
private final ByteBuffer buffer;
ReadCompletionHandler(AsynchronousSocketChannel socketChannel, ByteBuffer buffer) {
this.socketChannel = socketChannel;
this.buffer = buffer;
}
@Override
public void completed(Integer bytesRead, Void attachment) {
WriteCompletionHandler writeCompletionHandler = new WriteCompletionHandler(socketChannel);
buffer.flip();
socketChannel.write(buffer, null, writeCompletionHandler); // non-blocking
}
@Override
public void failed(Throwable t, Void attachment) {
// exception handling
}
}
写操作完成时会调用WriteCompletionHandler,再通过AsynchronousSocketChannel.close方法关闭连接。
class WriteCompletionHandler implements CompletionHandler<Integer, Void> {
private final AsynchronousSocketChannel socketChannel;
WriteCompletionHandler(AsynchronousSocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void completed(Integer bytesWritten, Void attachment) {
try {
socketChannel.close();
} catch (IOException e) {
// exception handling
}
}
@Override
public void failed(Throwable t, Void attachment) {
// exception handling
}
}
在这个例子中,并没有用到attachment对象,因为所有需要用到的对象(AsynchronousSocketChannel, ByteBuffer)都在构造方法中传递进来了。
结论
socket通信中如何选择IO模型取决于网络通信的情况。如果IO操作比较耗时但不频繁,异步IO会是一个不错的选择。然后如果都是短而快的IO操作,同步IO可能会更好一些,能减少内核调用的处理开销。
尽管Java针对不同的操作系统提供了一套标准化的Socket通信的机制,但执行性能仍然和具体的平台实现密切相关。可以看下这篇文章来了解下不同实现的区别。
这里是文中例子的完整的代码实现。