IO/NIO/AIO

目录

本文会说明各种IO的特点、分别解决了什么样的问题做一个分析阐述,并结合Java代码例子来辅助理解,像这些的历史演进和详细的底层原理网上很多,所以我们只站在应用层,使用者的角度去分析

(所有例子均可直接运行)

BIO——同步阻塞IO

看这个名称大家可能会有点陌生,我们直接上例子:

服务端

public static void main(String[] args) throws IOException {
        //1.创建服务端Socket 并绑定端口
        ServerSocket serverSocket = new ServerSocket(8080);
        //2.等待客户端连接 阻塞的
        Socket accept = serverSocket.accept();
        System.out.println(accept.getRemoteSocketAddress() + " 客户端已连接");
        //3.获取输入、输出流
        InputStream inputStream = accept.getInputStream();
        OutputStream outputStream = accept.getOutputStream();
        //4.接收客户端信息
        byte[] bytes = new byte[1024];
        inputStream.read(bytes);
        String data = new String(bytes);
        System.out.println("来自" + accept.getRemoteSocketAddress() + "的信息:" + data);
        //5.返回信息
        outputStream.write(data.getBytes());
        accept.shutdownOutput();
        //6.关闭资源
        inputStream.close();
        outputStream.close();
        accept.close();
        serverSocket.close();
    }

客户端:

public static void main(String[] args) throws IOException {
        //1.创建客户端Socket
        Socket socket = new Socket("127.0.0.1",8080);
        //2.获取输入、输出流
        InputStream inputStream = socket.getInputStream();
        OutputStream outputStream = socket.getOutputStream();
        //3.给服务端发送信息
        outputStream.write("你好".getBytes());
        socket.shutdownOutput();
        //4.获取服务端返回信息
        byte[] data = new byte[1024];
        inputStream.read(data);
        System.out.println("来自服务端的信息:" + new String(data));
        //6.关闭资源
        inputStream.close();
        outputStream.close();
        socket.close();
    }

这就是我们熟知的Socket连接,也是Java最早的网络通信IO,为什么这种叫同步阻塞IO:

因为在做read操作、accept操作的时候会阻塞没法往下执行,说白了就是串行的,就因为这个服务端和客户端只能1对1通信,这合理嘛?肯定不合理啊,所以进阶的有了伪异步IO

伪异步阻塞IO

看完上面的,很多人就有想法了,你说同步的只能1对1通信,那我直接把服务端改成多线程版本不就好了嘛,不就可以1对多通信了嘛,没错这版本确实是这样,如下:

服务端:

public static void main(String[] args) throws IOException {
        //1.创建服务端Socket 并绑定端口
        ServerSocket serverSocket = new ServerSocket(8080);
        //2.等待客户端连接 多线程模式 (开线程异步等待)
        new Thread(()->{
            while (true){
                try {
                    Socket accept = serverSocket.accept();
                    System.out.println(accept.getRemoteSocketAddress() + " 客户端已连接");
                    // 开线程异步处理客户端连接任务
                    new Thread(new AcceptHandler(accept)).start();
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        }).start();
        // 阻塞防止程序退出
        while (true){}
    }

    private static class AcceptHandler implements Runnable{
        private Socket accept;
        private InputStream inputStream = null;
        private OutputStream outputStream =null;

        public AcceptHandler(Socket accept){
            this.accept=accept;
        }

        @Override
        public void run() {
            try {
                //3.获取输入、输出流
                inputStream = accept.getInputStream();
                outputStream = accept.getOutputStream();
                //4.接收客户端信息
                byte[] bytes = new byte[1024];
                inputStream.read(bytes);
                String data = new String(bytes);
                if(data!=null){
                    System.out.println("来自" + accept.getRemoteSocketAddress() + "的信息:" + data);
                    //5.返回信息
                    outputStream.write(data.getBytes());
                    accept.shutdownOutput();
                }
            } catch (IOException e) {
                System.out.println(accept.getRemoteSocketAddress() + "发送异常断开连接");
                closeSource();
            }finally {
                System.out.println(accept.getRemoteSocketAddress() + "断开连接");
                closeSource();
            }
        }

        private void closeSource(){
            //6.关闭资源
            try {
                if(inputStream!=null){inputStream.close();}
                if(outputStream!=null){outputStream.close();}
                accept.close();
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }

客户端不变,服务端我们做了三个改动:

  • 一:在等待客户端连接的时候我们开启一个线程,并死循环等待连接,这样可以保证不阻塞主线程的运行,同时可以不断的和客户端建立连接
  • 二:和客户端建立连接后又开启一个线程来单独处理与客户端的通信
  • 三:最后加了个死循环防止程序退出,因为现在是异步的了

这样处理不就是异步的了吗?为什么叫伪异步阻塞IO呢?

虽然现在不会阻塞主线程了,但是阻塞并没有解决,该阻塞的地方依旧还是会阻塞,所以本质上来说只是解决了1对1连接通信的问题

但是新的问题又来了,现在虽然是1对多通信,但是有一个客户端连接就新建一个线程,1万个客户端就1万个线程,这合理吗?这明显不合理啊,用线程池管理?那也不行啊,这连接一多还要排队吗?极端情况下,队列不一样会爆?

那怎么办?有没有可能一个线程监听多个连接呢?于是有了NIO

NIO——同步非阻塞IO

NIO的引入同时引入了三个概念ByteBuffer缓冲区Channel通道和Selector多路复用器

  • Channel的作用:就是一个通道,数据读取和写入的通道,根据功能可以分为不同的通道如:网络通道ServerSocketChannel和SocketChannel、文件操作通道FileChannel等等
  • Selector的作用:是轮询Channel上面的事件,如读事件、写事件、连接事件、接受连接事件
  • ByteBuffer缓冲区:就是向Channel读取或写入数据的对象,本质就是个字节数组

怎么理解这三个呢?说白了以传统IO为例:服务端accept就是接受连接事件、客户端connect就是连接事件、发送消息就是写事件、读取消息就是读事件 Selector就是监听这些事件的工具 ServerSocketChannel是服务端接受连接的通道,所以只能注册监听连接事件 SocketChannel是服务端与客户端连接建立后的通道,所以可以注册读写事件、连接事件 ByteBuffer就是Channel读取或写入数据的单位对象

下面搞个例子看看,注释全有:

服务端:

public static void main(String[] args) throws IOException {
        // 开启服务端Socket通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        // 绑定端口
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        // 打开多路复用器 并将其注册到通道上 监听连接请求事件
        Selector selector = Selector.open();
        // 为服务端Socket通道 注册一个接受连接的事件 
        // 假设有客户端要连接 下面轮询的时候就会触发这个事件 我们就可以去与客户端建立连接了
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            // 这段时间没获取到任何事件,则跳过下面操作
            // 不同于IO和BIO的阻塞 多路复用器会一直轮询 如果长时间无事件 这里会一直空循环
            // 所以这里在查询事件的时候加了个时间 这样无事件的情况下 1s才会循环一次
            if (selector.select(1000) == 0) {
                continue;
            }
            // 获取到本次轮询所获取到的全部事件
            Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
            // 轮询获取到的事件,并处理
            while (selectorKeys.hasNext()) {
                SelectionKey selectorKey = selectorKeys.next();
                //这个已经处理的事件Key一定要移除。如果不移除,就会一直存在在selector.selectedKeys集合中
                //待到下一次selector.select() > 0时,这个Key又会被处理一次
                selectorKeys.remove();
                try {
                    // 事件key处理 也就是事件处理
                    selectorKeyHandler(selectorKey, selector);
                } catch (Exception e) {
                    SocketChannel channel = (SocketChannel) selectorKey.channel();
                    System.out.println(channel.getRemoteAddress() + "客户端已断开连接");
                    if (selectorKey != null) {
                        selectorKey.cancel();
                        if (selectorKey.channel() != null) {
                            selectorKey.channel().close();
                        }
                    }
                }
            }
        }
    }

    // 事件处理方法 按照事件类型处理不同的事件
    public static void selectorKeyHandler(SelectionKey selectorKey, Selector selector) throws IOException {
        // 连接事件 代表有客户端连接 所以需要去处理这个连接请求
        if (selectorKey.isAcceptable()) {
            acceptHandler(selectorKey, selector);
        }

        // 读事件 可以去读取信息
        if (selectorKey.isReadable()) {
            readHandler(selectorKey, selector);
        }

        // 写事件 可以向客户端发送信息
        if (selectorKey.isWritable()) {
            SocketChannel socketChannel = (SocketChannel) selectorKey.channel();
            writeHandler(socketChannel);
            // 写事件完成后要取消写事件不然会一直写  我这里就干脆注册了个读事件
            socketChannel.register(selector,SelectionKey.OP_READ);
        }
    }

    // 连接事件处理 这个有客户端要建立连接了  所以accept与客户端建立连接
    public static void acceptHandler(SelectionKey selectorKey, Selector selector) throws IOException {
        ServerSocketChannel channel = (ServerSocketChannel) selectorKey.channel();
        SocketChannel accept = channel.accept();
        // 建立连接后 客户端和服务端就等于形成了一个数据交互的通道 SocketChannel
        // 这个通道也要设置为非阻塞
        accept.configureBlocking(false);
        // 为这个通道注册一个读事件 表示我先读取客户端信息
        accept.register(selector, SelectionKey.OP_READ);
        System.out.println(accept.getRemoteAddress() + "客户端已连接");
    }

    // 读事件处理  读取客户端的信息
    public static void readHandler(SelectionKey selectorKey, Selector selector) throws IOException {
        SocketChannel channel = (SocketChannel) selectorKey.channel();
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        int read = channel.read(allocate);
        if (read > 0) {
            allocate.flip();
            byte[] bytes = new byte[allocate.remaining()];
            allocate.get(bytes);
            System.out.println(channel.getRemoteAddress() + "发来消息:" + new String(bytes));
        }
        if(read<0){
            System.out.println(channel.getRemoteAddress() + "断开连接");
        }
        // 读完信息后要给客户端发送信息 所以这个再注册一个写的事件
        channel.register(selector, SelectionKey.OP_WRITE);
    }

    // 写事件处理
    public static void writeHandler(SocketChannel socketChannel) throws IOException {
        byte[] bytes = "你好".getBytes();
        ByteBuffer allocate = ByteBuffer.allocate(bytes.length);
        allocate.put(bytes);
        allocate.flip();
        socketChannel.write(allocate);
    }

客户端:

public static void main(String[] args) throws IOException {
        // 开启一个Socket通道
        SocketChannel clientChannel = SocketChannel.open();
        // 设置非阻塞
        clientChannel.configureBlocking(false);
        // 允许端口复用
        clientChannel.socket().setReuseAddress(true);
        // 连接地址
        clientChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
        // 开启多路复用器
        Selector selector = Selector.open();
        // 为这个通道注册一个连接事件
        clientChannel.register(selector, SelectionKey.OP_CONNECT);
        while (true) {
            // 这段时间没获取到任何事件,则跳过下面操作
            // 不同于IO和BIO的阻塞 多路复用器会一直轮询 如果长时间无事件 这里会一直空循环
            // 所以这里在查询事件的时候加了个时间 这样无事件的情况下 1s才会循环一次
            if (selector.select(1000) == 0) {
                continue;
            }
            // 获取到本次轮询所获取到的全部事件
            Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
            // 轮询获取到的事件,并处理
            while (selectorKeys.hasNext()) {
                SelectionKey selectorKey = selectorKeys.next();
                //这个已经处理的事件Key一定要移除。如果不移除,就会一直存在在selector.selectedKeys集合中
                //待到下一次selector.select() > 0时,这个Key又会被处理一次
                selectorKeys.remove();
                try {
                    // 事件key处理
                    selectorKeyHandler(selectorKey, selector);
                } catch (Exception e) {
                    if (selectorKey != null) {
                        selectorKey.cancel();
                        if (selectorKey.channel() != null) {
                            selectorKey.channel().close();
                        }
                    }
                }
            }
        }
    }

    // 事件处理方法
    public static void selectorKeyHandler(SelectionKey selectorKey, Selector selector) throws IOException {
        // 连接事件 判断是否连接成功
        if (selectorKey.isValid()) {
            SocketChannel channel = (SocketChannel) selectorKey.channel();
            if (selectorKey.isConnectable() && channel.finishConnect()) {
                System.out.println("连接成功........");
                // 连接成功注册写事件 向服务端发送信息
                channel.register(selector,SelectionKey.OP_WRITE);
            }
        }
        // 读事件 可以去读取信息
        if (selectorKey.isReadable()) {
            readHandler(selectorKey, selector);
        }
        // 写事件 可以向客户端发送信息
        if (selectorKey.isWritable()) {
            SocketChannel channel = (SocketChannel) selectorKey.channel();
            writeHandler(channel);
            // 写事件完成后要取消写事件不然会一直写  我这里就干脆注册了个读事件
            channel.register(selector,SelectionKey.OP_READ);
        }
    }

    // 读事件处理  就是处理服务端发来的消息
    public static void readHandler(SelectionKey selectorKey, Selector selector) throws IOException {
        SocketChannel channel = (SocketChannel) selectorKey.channel();
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        int read = channel.read(allocate);
        if (read > 0) {
            allocate.flip();
            byte[] bytes = new byte[allocate.remaining()];
            allocate.get(bytes);
            System.out.println("服务端发来消息:" + new String(bytes));
        }
        if(read<0){
            System.out.println("与服务端断开连接");
        }
    }

    // 写事件处理 就是像服务端发送消息
    public static void writeHandler(SocketChannel socketChannel) throws IOException {
        byte[] bytes = "你好".getBytes();
        ByteBuffer allocate = ByteBuffer.allocate(bytes.length);
        allocate.put(bytes);
        allocate.flip();
        socketChannel.write(allocate);
    }

可以看到写法和传统的IO完全不一样了,操作的对象都是Channel,读写对象都是ByteBuffer,那到底是什么引起了这种改变呢?因为系统内核的优化,说白了这种操作都是API,底层都是需要系统支持的,系统在这块也有一个模型优化,简单介绍三种模型区别:

  • select: 每有一个连接的产生会打开一个Socket描述符(下面简称FD),select会把这些FD保存在一个数组中,因为是数组所以就代表有了容量的上限意味了连接数量的上限,每次调用,都会遍历这个数组,1w个连接就算只有一个事件,也会遍历这1w个连接,效率极低
  • poll: 和select不同,这个底层结构是链表,所有没了连接数量的上限,但是每次调用依旧会遍历所有的
  • epoll: 底层结构是红黑树,同样没有连接数量的上限,而且有一个就绪的事件列表,这意味着不再需要遍历所有的连接了

JDK中采用的就是epoll模型,但尽管这样也依旧是同步的,因为还是需要主动去获取结果,只是从方式阻塞等待变成了轮询,有没有什么方式在结果产生的时候异步的回调呢?于是有了AIO

AIO——异步IO

这种方式同样需要系统的支持,目前主流还是NIO,这块就不多介绍了,提供个例子:

服务端:

    public static void main(String[] args) throws IOException {
        AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8080));
        // 接收连接的时候 提供连接处理类
        serverSocketChannel.accept(serverSocketChannel, new ServerSocketHandler());
        // 异步的  防止程序退出
        while (true) {
        }
    }
    // 连接处理
    public static class ServerSocketHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {

        @Override
        public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {
            // 继续接受连接
            attachment.accept(attachment, this);
            try {
                System.out.println(result.getRemoteAddress() + " 已连接");
            } catch (IOException e) {
                e.printStackTrace();
            }

            new Thread(() -> {
                // 异步读
                readHandler(result);
            }).start();

            // 写数据处理
            writeHandler(result, "你好");
        }

        @Override
        public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
            System.out.println("发生异常");
        }

        public void readHandler(AsynchronousSocketChannel socketChannel) {
            ByteBuffer allocate = ByteBuffer.allocate(1024);
            socketChannel.read(allocate, allocate, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    try {
                        if (result > 0) {
                            attachment.flip();
                            byte[] bytes = new byte[attachment.remaining()];
                            attachment.get(bytes);
                            System.out.println(socketChannel.getRemoteAddress() + " 客户端消息: " + new String(bytes));
                            readHandler(socketChannel);
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }

                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    System.out.println();
                    try {
                        System.out.println(socketChannel.getRemoteAddress() + " 已下线");
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

        public void writeHandler(AsynchronousSocketChannel socketChannel, String data) {
            byte[] bytes = data.getBytes();
            ByteBuffer allocate = ByteBuffer.allocate(bytes.length);
            allocate.put(bytes);
            allocate.flip();
            socketChannel.write(allocate, allocate, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    if (attachment.hasRemaining()) {
                        socketChannel.write(attachment, attachment, this);
                    }
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

客户端:

 public static void main(String[] args) throws IOException {
        AsynchronousSocketChannel socketChannel=AsynchronousSocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080), null, new AsyncClientHandler(socketChannel));
        while (true){}
    }

    public static class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>{

        private AsynchronousSocketChannel socketChannel;

        public AsyncClientHandler(AsynchronousSocketChannel socketChannel){
            this.socketChannel=socketChannel;
        }

        @Override
        public void completed(Void result, AsyncClientHandler attachment) {

            new Thread(()->{
                // 异步 一秒发送一次消息
                while (true){
                    writeHandler("你好");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            // 读处理
            readHandler();
        }

        @Override
        public void failed(Throwable exc, AsyncClientHandler attachment) {

        }

        public void readHandler() {
            ByteBuffer allocate = ByteBuffer.allocate(1024);
            socketChannel.read(allocate, allocate, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    attachment.flip();
                    byte[] bytes = new byte[attachment.remaining()];
                    attachment.get(bytes);
                    System.out.println(" 服务端消息: " + new String(bytes));
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

        public void writeHandler( String data) {
            byte[] bytes = data.getBytes();
            ByteBuffer allocate = ByteBuffer.allocate(bytes.length);
            allocate.put(bytes);
            allocate.flip();
            socketChannel.write(allocate, allocate, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    if (attachment.hasRemaining()) {
                        socketChannel.write(attachment, attachment, this);
                    }
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

总结

BIO伪异步IONIOAIO
线程:客户端1:1N:M (M可以大于N)1:N (一个线程处理多个)0:M (无需额外线程,异步回调)
I/O类型同步阻塞伪异步阻塞同步非阻塞异步非阻塞
可靠性非常差
难度简单简单复杂复杂
性能
Last Updated: