IO模型
# IO模型
同步I/O是指应用程序发起一个I/O操作(如读或写),必须等待该操作完成后才能继续执行其他任务。读写操作会阻塞当前线程。
非阻塞是指客户端可以在进行I/O操作时继续执行其他任务。
# BIO
服务端阻塞点一:ServerSocket通过调用 accept( )方法,阻塞并返回一个 java.net.Socket 对象。只有当前客户端写完数据close后,服务端才会对下一个客户端进行处理。
while (true) {
Socket socket = serverSocket.accept();
BioServerHandler handler = new BioServerHandler(socket, Charset.forName("GBK"));
handler.start();
}
2
3
4
5
服务端阻塞点二:BufferedReader#readLine方法是阻塞的,读取一行时只有碰到换行、回车、buffer溢出或者是客户端线程直接close关闭资源,最终才会返回null。
如果直接使用InputStream#read方法也会一直阻塞等待,除非客户端发来消息。
public void run() {
try {
BufferedReader input = new BufferedReader(new InputStreamReader(this.socket.getInputStream(), charset));
String str = null;
while ((str = input.readLine()) != null) {
channelRead(channelHandler, str);
}
} catch (IOException e) {
e.printStackTrace();
}
}
2
3
4
5
6
7
8
9
10
11
# NIO
非阻塞是指不会阻塞客户端的线程。同步是指accept内核IO操作会阻塞当前线程,由该线程自己负责将数据从内核空间拷贝到用户空间。
- selector选择器
Selector.select()阻塞直到至少有一个通道在你注册的事件上就绪了(非阻塞的方式①可以指定阻塞时间,超过后不管是否就绪直接返回②通过wakeup方法直接唤醒selector)。
客户端单线程通过选择器Selector监测处理Channel多通道,监听四种事件:连接就绪(客户端事件)、接受就绪(服务端)、读就绪(服务端可以接收读取客户端通道的信息)、写就绪。
每次事件读取完成后,都需要把事件剔除,防止下次重复读取事件。但是通道一直存在的。
while (true) {
try {
selector.select(1000);
//访问所触发的就绪事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
handleInput(key);
}
} catch (IOException ignore) {
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- 通道
服务端通道ServerSocketChannel :调用 accept( )方法获得与客户端连接的通道(非阻塞模式,accept拿不到连接则返回null)。一旦启动则不停,不断运行。
服务端通道首先绑定SelectionKey.OP_ACCEPT事件(专门用于检查多线程客户端连接),并注册Selector,当轮询select发现客户端连接后触发事件,拿到与客户端传输的通道可以进行读写,并返回消息给客户端,将该客户端通道注册为OP_READ读就绪事件。
if (superclass == ServerSocketChannel.class){
//服务器接收就绪
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
channelHandler = new ChannelHandler(socketChannel, charset);
channelActive(channelHandler);
}
}
2
3
4
5
6
7
8
9
10
11
12
客户端连接通道SocketChannel :用于客户端与服务端之间相互读写数据。
客户端首先通过sc通道进行连接之后,注册绑定OP_CONNECT连接事件。然后通过finishConnect方法实现真正的连接,通过sc通道给服务端发数据。
Class<?> superclass = key.channel().getClass().getSuperclass();
if (superclass == SocketChannel.class){
SocketChannel socketChannel = (SocketChannel) key.channel();
//连接就绪
if (key.isConnectable()) {
if (socketChannel.finishConnect()) {
channelHandler = new ChannelHandler(socketChannel, charset);
channelActive(channelHandler);
socketChannel.register(selector, SelectionKey.OP_READ);
} else {
System.exit(1);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# AIO
1、通过countdownlatch来让当前进程无限休眠,让AIO通道可以无限回调接受来自客户端的连接。
CountDownLatch latch = new CountDownLatch(1);
serverSocketChannel.accept(this, new AioServerChannelInitializer());
latch.await();
2
3
2、异步文件通道和异步套接字通道。核心是实现CompletionHandler接口的回调方法,IO成功调用completed方法,失败则failed方法。
AsynchronousServerSocketChannel:异步服务端通道
AsynchronousSocketChannel:异步客户端通道。其中read方法是异步读取操作,可以设置超时时间,超时则触发失败回调。
buffer.flip:缓存读写转换。当前缓冲区写完数据后,将limit指向当前读写指针position的位置(限制下一次缓存读的数量),然后将position置为0从头读缓存。也就是说每次向缓存写完数据之后都需要flip将读写指针重置。
3、实际上嵌套了两层CompletionHandler回调:第一层是accept监听接受客户端连接,第二层是把通道数据读出来并放入缓存。也就是说这两个步骤都是异步的。
可以在accept的completed回调中再次accept,以及在read的completed回调中再次read。从而实现循环监听多个客户端连接,同时在一个连接中服务端持续读取缓冲区数据。
4、发起IO操作(accept,read)的线程和最终执行completed回调线程不是同一个线程。在内核态还没执行完IO时,用户线程是阻塞的;执行完函数并返回后才唤醒该阻塞线程执行回调函数。
public void listen() throws Exception {
serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9988));//监听9988端口
//监听异步客户端通道连接
serverSocketChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, AioServer>() {
@Override
public void completed(AsynchronousSocketChannel client, AioServer attachment) {
try {
if (client.isOpen()) {
System.out.println("接收到新的客户端的连接,地址:" + client.getRemoteAddress());
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//读取客户端发送的数据
client.read(byteBuffer, client, new CompletionHandler<Integer, AsynchronousSocketChannel>() {
@Override
public void completed(Integer result, AsynchronousSocketChannel attachment) {
try {
//读取请求,处理客户端发送的数据
byteBuffer.flip();
String content = Charset.defaultCharset().newDecoder().decode(byteBuffer).toString();
System.out.println("服务端接受到客户端发来的数据:" + content);
//向客户端发送数据
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put("Server send".getBytes());
writeBuffer.flip();
attachment.write(writeBuffer).get();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
try {
exc.printStackTrace();
attachment.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//当有新的客户端接入的时候,直接调用accept的方法,递归执行下去,这样可以保证多个客户端都可以阻塞
attachment.serverSocketChannel.accept(attachment, this);
}
}
@Override
public void failed(Throwable exc, AioServer attachment) {
exc.printStackTrace();
}
});
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53