Scalable IO in Java 学习笔记
Scalable IO in Java
Java NIO浅析 <- 看这个就好,下面不用看=.=
网络服务的基本结构
- Read request
- Decode request
- Process service
- Encode reply
- Send reply
传统BIO模式

BIO模式下,socket的accept(),read(),write()方法都是同步阻塞的,即一个线程在等待客户端输入时,一直是处于阻塞状态的。为了能同时服务多个客户端,以及增加CPU的利用率,我们需要为每个连接单独开一个线程进行处理,由此会带来以下这些问题
- 线程创建及销毁成本很高(可以通过线程池缓解)
- 线程额外的内存开销,例如在没有显式设置
-Xss时,每个Java线程都会默认占用1MB的栈空间
- 线程切换成本,当线程数过高时,大量CPU时间会被浪费在context switch上
NIO的工作方式
java.nio提供了两个基本特性
- 非阻塞的读写(Channel)
- 例如调用read()时,有数据就读取并返回,如果没有数据直接返回,不像BIO会一直阻塞到读取到数据为止
- 根据IO事件分派任务(Selector)
- 操作系统会将Channel上的IO状态变化主动推送给我们,例如可读、可写、有新连接等
- 具体实现依赖操作系统(例如Linux 2.6之后是O(1)的epoll,之前是O(n)的select)
依据这两个特性,我们就可以用事件驱动(event-driven)的方式,用一个线程来服务多个连接了。Reactor模式就是事件驱动模式的一种实现。
NIO中的基本概念
- Channels
- Buffers
- Selectors
- SelectionKeys
- 维护selector和channel之间的绑定关系,包括注册感兴趣的事件及绑定处理器
- IO事件
- OP_ACCEPT
- OP_CONNECT
- OP_READ
- OP_WRITE
Reactor模式
单线程版本的Reactor

- Reactor
- Handler
- 执行非阻塞的操作,例如Acceptor,Reader,Sender等
Reactor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| public class Reactor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocket;
public Reactor(int port) throws IOException { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false); SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT); selectionKey.attach(new Acceptor()); }
@Override public void run() { try { while (!Thread.interrupted()) { selector.select(); selector.selectedKeys().forEach(this::dispatch); } } catch (IOException e) { e.printStackTrace(); } }
private void dispatch(SelectionKey key) { Runnable r = (Runnable) key.attachment(); if (r != null) { r.run(); } }
class Acceptor implements Runnable { @Override public void run() { try { SocketChannel c = serverSocket.accept(); if (c != null) { new Handler(selector, c); } } catch (IOException e) { e.printStackTrace(); } } }
public static void main(String[] args) throws IOException { new Reactor(12345).run(); }
}
|
Handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| public class Handler implements Runnable {
private final SocketChannel socketChannel;
private final SelectionKey selectionKey;
private static final int MAX = 1000;
private ByteBuffer input = ByteBuffer.allocate(MAX);
private ByteBuffer output = ByteBuffer.allocate(MAX);
private static final int READING = 0, SENDING = 1;
private int state = READING;
public Handler(Selector selector, SocketChannel channel) throws IOException { socketChannel = channel; socketChannel.configureBlocking(false); selectionKey = socketChannel.register(selector, 0); selectionKey.attach(this); selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); }
@Override public void run() { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException e) { e.printStackTrace(); } }
private void read() throws IOException { socketChannel.read(input); if (isInputComplete()) { process(); state = SENDING; selectionKey.interestOps(SelectionKey.OP_WRITE); } }
private void send() throws IOException { System.out.println("sending"); socketChannel.write(output); if (isOutputComplete()) { selectionKey.cancel(); } }
private boolean isInputComplete() { return input.remaining() < MAX; }
private boolean isOutputComplete() { return !output.hasRemaining(); }
private void process() { input.flip(); byte[] temp = new byte[input.remaining()]; input.get(temp); System.out.println("receive: " + new String(temp));
output.put("done\r\n".getBytes()); output.flip(); } }
|
ByteBuffer cheatsheet

Caveat
- 当并发连接数不高时,NIO并没有显著性能优势。
- 使用事件驱动模式编程难度更高,代码需要拆分为多个non-blocking actions,同时需要小心维护服务的逻辑状态。
- 推荐使用成熟的NIO框架,如Netty,MINA
TODO
多线程版本的Reactor
了解AIO及Proactor模式