El Psy Congroo

Scalable IO in Java 学习笔记

Scalable IO in Java 学习笔记

Scalable IO in Java
Java NIO浅析 <- 看这个就好,下面不用看=.=

网络服务的基本结构

  • Read request
  • Decode request
  • Process service
  • Encode reply
  • Send reply

传统BIO模式

BIO
BIO模式下,socket的accept(),read(),write()方法都是同步阻塞的,即一个线程在等待客户端输入时,一直是处于阻塞状态的。为了能同时服务多个客户端,以及增加CPU的利用率,我们需要为每个连接单独开一个线程进行处理,由此会带来以下这些问题

  • 线程创建及销毁成本很高(可以通过线程池缓解)
  • 线程额外的内存开销,例如在没有显式设置-Xss时,每个Java线程都会默认占用1MB的栈空间
  • 线程切换成本,当线程数过高时,大量CPU时间会被浪费在context switch上

NIO的工作方式

java.nio提供了两个基本特性

  • 非阻塞的读写(Channel)
    • 例如调用read()时,有数据就读取并返回,如果没有数据直接返回,不像BIO会一直阻塞到读取到数据为止
  • 根据IO事件分派任务(Selector)
    • 操作系统会将Channel上的IO状态变化主动推送给我们,例如可读、可写、有新连接等
    • 具体实现依赖操作系统(例如Linux 2.6之后是O(1)的epoll,之前是O(n)的select)

依据这两个特性,我们就可以用事件驱动(event-driven)的方式,用一个线程来服务多个连接了。Reactor模式就是事件驱动模式的一种实现。

NIO中的基本概念

  • Channels
    • 支持非阻塞读写的连接,可以是文件,网络等等
  • Buffers
    • Channels读写时用到的对象,底层是数组
  • Selectors
    • 获取IO事件
  • SelectionKeys
    • 维护selector和channel之间的绑定关系,包括注册感兴趣的事件及绑定处理器
  • IO事件
    • OP_ACCEPT
      • 服务端收到一个连接请求
    • OP_CONNECT
      • 客户端发起连接
    • OP_READ
      • 当OS的读缓冲区中有数据可读
    • OP_WRITE
      • 当OS的写缓冲区中有空闲的空间

Reactor模式

单线程版本的Reactor
single-threaded-reactor

  • Reactor
    • 响应IO事件,分派任务给合适的handler
  • Handler
    • 执行非阻塞的操作,例如Acceptor,Reader,Sender等

Reactor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class Reactor implements Runnable {
private final Selector selector;
// 接收客户端连接,只支持OP_ACCEPT
private final ServerSocketChannel serverSocket;
public Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
// 管理channel和selector的绑定状态,interestOps(int)修改绑定,cancel()取消绑定
SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
selectionKey.attach(new Acceptor());
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
selector.selectedKeys().forEach(this::dispatch);
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void dispatch(SelectionKey key) {
Runnable r = (Runnable) key.attachment();
if (r != null) {
r.run();
}
}
class Acceptor implements Runnable {
@Override
public void run() {
try {
// accept返回用于和客户端交互的SocketChannel,支持OP_READ, OP_WRITE
SocketChannel c = serverSocket.accept();
if (c != null) {
new Handler(selector, c);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new Reactor(12345).run();
}
}

Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class Handler implements Runnable {
private final SocketChannel socketChannel;
private final SelectionKey selectionKey;
private static final int MAX = 1000;
private ByteBuffer input = ByteBuffer.allocate(MAX);
private ByteBuffer output = ByteBuffer.allocate(MAX);
private static final int READING = 0, SENDING = 1;
private int state = READING;
public Handler(Selector selector, SocketChannel channel) throws IOException {
socketChannel = channel;
socketChannel.configureBlocking(false);
selectionKey = socketChannel.register(selector, 0);
selectionKey.attach(this);
selectionKey.interestOps(SelectionKey.OP_READ); //绑定后再注册事件
selector.wakeup();
}
@Override
public void run() {
try {
if (state == READING) {
read();
}
else if (state == SENDING) {
send();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void read() throws IOException {
socketChannel.read(input);
if (isInputComplete()) {
process();
state = SENDING;
selectionKey.interestOps(SelectionKey.OP_WRITE);
}
}
private void send() throws IOException {
System.out.println("sending");
socketChannel.write(output);
if (isOutputComplete()) {
selectionKey.cancel();
}
}
private boolean isInputComplete() {
return input.remaining() < MAX;
}
private boolean isOutputComplete() {
return !output.hasRemaining();
}
private void process() {
input.flip();
byte[] temp = new byte[input.remaining()];
input.get(temp);
System.out.println("receive: " + new String(temp));
output.put("done\r\n".getBytes());
output.flip();
}
}

ByteBuffer cheatsheet

ByteBuffer cheatsheet

Caveat

  • 当并发连接数不高时,NIO并没有显著性能优势。
  • 使用事件驱动模式编程难度更高,代码需要拆分为多个non-blocking actions,同时需要小心维护服务的逻辑状态。
  • 推荐使用成熟的NIO框架,如Netty,MINA

TODO

多线程版本的Reactor
了解AIO及Proactor模式