Blage's Coding Blage's Coding
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
  • JAVA基础

  • 集合容器

  • Netty

    • IO模型
      • BIO
      • NIO
      • AIO
    • Netty初级
    • Netty原理
  • JVM

  • JUC

  • Java
  • Netty
phan
2023-05-15
目录

IO模型

# IO模型

同步I/O是指应用程序发起一个I/O操作(如读或写),必须等待该操作完成后才能继续执行其他任务。读写操作会阻塞当前线程。

非阻塞是指客户端可以在进行I/O操作时继续执行其他任务。

# BIO

服务端阻塞点一:ServerSocket通过调用 accept( )方法,阻塞并返回一个 java.net.Socket 对象。只有当前客户端写完数据close后,服务端才会对下一个客户端进行处理。

while (true) {
    Socket socket = serverSocket.accept();
    BioServerHandler handler = new BioServerHandler(socket, Charset.forName("GBK"));
    handler.start();
}
1
2
3
4
5

服务端阻塞点二:BufferedReader#readLine方法是阻塞的,读取一行时只有碰到换行、回车、buffer溢出或者是客户端线程直接close关闭资源,最终才会返回null。

如果直接使用InputStream#read方法也会一直阻塞等待,除非客户端发来消息。

public void run() {
    try {
        BufferedReader input = new BufferedReader(new InputStreamReader(this.socket.getInputStream(), charset));
        String str = null;
        while ((str = input.readLine()) != null) {
            channelRead(channelHandler, str);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}
1
2
3
4
5
6
7
8
9
10
11

# NIO

非阻塞是指不会阻塞客户端的线程。同步是指accept内核IO操作会阻塞当前线程,由该线程自己负责将数据从内核空间拷贝到用户空间。

  • selector选择器

Selector.select()阻塞直到至少有一个通道在你注册的事件上就绪了(非阻塞的方式①可以指定阻塞时间,超过后不管是否就绪直接返回②通过wakeup方法直接唤醒selector)。

客户端单线程通过选择器Selector监测处理Channel多通道,监听四种事件:连接就绪(客户端事件)、接受就绪(服务端)、读就绪(服务端可以接收读取客户端通道的信息)、写就绪。

每次事件读取完成后,都需要把事件剔除,防止下次重复读取事件。但是通道一直存在的。

while (true) {
    try {
        selector.select(1000);
        //访问所触发的就绪事件
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> it = selectionKeys.iterator();
        SelectionKey key = null;
        while (it.hasNext()) {
            key = it.next();
            it.remove();
            handleInput(key);
        }
    } catch (IOException ignore) {
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  • 通道

服务端通道ServerSocketChannel :调用 accept( )方法获得与客户端连接的通道(非阻塞模式,accept拿不到连接则返回null)。一旦启动则不停,不断运行。

服务端通道首先绑定SelectionKey.OP_ACCEPT事件(专门用于检查多线程客户端连接),并注册Selector,当轮询select发现客户端连接后触发事件,拿到与客户端传输的通道可以进行读写,并返回消息给客户端,将该客户端通道注册为OP_READ读就绪事件。

if (superclass == ServerSocketChannel.class){
    //服务器接收就绪
    if (key.isAcceptable()) {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);

        channelHandler = new ChannelHandler(socketChannel, charset);
        channelActive(channelHandler);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

客户端连接通道SocketChannel :用于客户端与服务端之间相互读写数据。

客户端首先通过sc通道进行连接之后,注册绑定OP_CONNECT连接事件。然后通过finishConnect方法实现真正的连接,通过sc通道给服务端发数据。

Class<?> superclass = key.channel().getClass().getSuperclass();
if (superclass == SocketChannel.class){
    SocketChannel socketChannel = (SocketChannel) key.channel();
    //连接就绪
    if (key.isConnectable()) {
        if (socketChannel.finishConnect()) {
            channelHandler = new ChannelHandler(socketChannel, charset);
            channelActive(channelHandler);
            socketChannel.register(selector, SelectionKey.OP_READ);
        } else {
            System.exit(1);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# AIO

1、通过countdownlatch来让当前进程无限休眠,让AIO通道可以无限回调接受来自客户端的连接。

 CountDownLatch latch = new CountDownLatch(1);
 serverSocketChannel.accept(this, new AioServerChannelInitializer());
 latch.await();
1
2
3

2、异步文件通道和异步套接字通道。核心是实现CompletionHandler接口的回调方法,IO成功调用completed方法,失败则failed方法。

AsynchronousServerSocketChannel:异步服务端通道

AsynchronousSocketChannel:异步客户端通道。其中read方法是异步读取操作,可以设置超时时间,超时则触发失败回调。

buffer.flip:缓存读写转换。当前缓冲区写完数据后,将limit指向当前读写指针position的位置(限制下一次缓存读的数量),然后将position置为0从头读缓存。也就是说每次向缓存写完数据之后都需要flip将读写指针重置。

3、实际上嵌套了两层CompletionHandler回调:第一层是accept监听接受客户端连接,第二层是把通道数据读出来并放入缓存。也就是说这两个步骤都是异步的。

可以在accept的completed回调中再次accept,以及在read的completed回调中再次read。从而实现循环监听多个客户端连接,同时在一个连接中服务端持续读取缓冲区数据。

4、发起IO操作(accept,read)的线程和最终执行completed回调线程不是同一个线程。在内核态还没执行完IO时,用户线程是阻塞的;执行完函数并返回后才唤醒该阻塞线程执行回调函数。

  public void listen() throws Exception {
         serverSocketChannel = AsynchronousServerSocketChannel.open();
         serverSocketChannel.bind(new InetSocketAddress(9988));//监听9988端口
         //监听异步客户端通道连接
         serverSocketChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, AioServer>() {
             @Override
             public void completed(AsynchronousSocketChannel client, AioServer attachment) {
                 try {
                     if (client.isOpen()) {
                         System.out.println("接收到新的客户端的连接,地址:" + client.getRemoteAddress());
                         final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                         //读取客户端发送的数据
                         client.read(byteBuffer, client, new CompletionHandler<Integer, AsynchronousSocketChannel>() {
                             @Override
                             public void completed(Integer result, AsynchronousSocketChannel attachment) {
                                 try {
                                     //读取请求,处理客户端发送的数据
                                     byteBuffer.flip();
                                     String content = Charset.defaultCharset().newDecoder().decode(byteBuffer).toString();
                                     System.out.println("服务端接受到客户端发来的数据:" + content);
                                     //向客户端发送数据
                                     ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                                     writeBuffer.put("Server send".getBytes());
                                     writeBuffer.flip();
                                     attachment.write(writeBuffer).get();
                                 } catch (Exception e) {
                                     e.printStackTrace();
                                 }
                             }
                             @Override
                             public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
                                 try {
                                     exc.printStackTrace();
                                     attachment.close();
                                 } catch (IOException e) {
                                     e.printStackTrace();
                                 }
                             }
                         });
                     }
                 } catch (Exception e) {
                     e.printStackTrace();
                 } finally {
                     //当有新的客户端接入的时候,直接调用accept的方法,递归执行下去,这样可以保证多个客户端都可以阻塞
                     attachment.serverSocketChannel.accept(attachment, this);
                 }
             }
             @Override
             public void failed(Throwable exc, AioServer attachment) {
                 exc.printStackTrace();
             }
         });
     }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
编辑 (opens new window)
#Netty
上次更新: 2023/12/15, 15:49:57
ConcurrentHashMap
Netty初级

← ConcurrentHashMap Netty初级→

Theme by Vdoing | Copyright © 2023-2024 blageCoder
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式