Fork me on GitHub

Java NIO

注意:所有文章除特别说明外,转载请注明出处.

Java NIO

[TOC]

概念

java.io中最为核心的一个概念是流(stream),面向流编程。Java中,一个流要么是输入流,要么是输出流,不可能同时既是输入流同时又是输出流。

java.nio中拥有3个核心概念:Selector、Channel和Buffer。在java.nio中,我们是面向块(block)或缓冲区(buffer)编程。Buffer本身就是一块内存,底层实现上实际是一个数组。数据的读写都是通过Buffer来实现。

Java中提供7种原生数据类型都各自对应的Buffer类型。如IntBuffer、LongBuffer、ByteBuffer及CharBuffer等。但是没有BooleanBuffer类型。

Channel指的是可以向其写入数据或从中读取数据的对象,它类似于java.io中的Stream。所有数据的读写都是通过Buffer来进行的,永远不会出现直接向Channel写入数据或直接从Channel中读取数据的情况。

和Stream不同的是,Channel是双向的,一个流只可能是InputStream或OutputStream,Channel打开后则可以进行读取、写入或读写。

由于Channel是双向的,因此它能够更好的反映出底层操作系统的真实情况;在Linux系统中,底层操作系统的通道就是双向的。

提示:Channel和Buffer总是在一起use的。

NIO Buffer中3个重要状态属性的含义

1. postion 2. limit 3. capacity

    0 <= mark <= position <= limit <= capacity

通过NIO读取文件涉及到的3个步骤:

1. 从FileInputStream获取到FileChannel对象。

2. 创建Buffer。

3. 将数据从Channel读取到Buffer中。

flip()方法作用

1. 将limit值设置为当前的position

2. 将position设置为0

clear()方法

1. 将limit值设置为capacity

2. 将position值设置为0

compact()方法

1. 将所有未读的数据复制到buffer起始位置处

2. 将position设为最后一个未读元素的后面

3. 将limit设置为capacity

4. 现在buffer准备好,但是不会覆盖未读的数据

DirectBuffer | 零拷贝

DirectByteBuffer 自身是(Java)堆上面对象,它背后真正承载数据的buffer是在(Java)堆外(native memory)中的。这是malloc()方法分配出来的内存,是用户态的。

内存映射文件

Scattering | Gathering

NIO 创建客户端

Java在创建客户端时

1. 首先调用静态工厂方法SocketChannel.open()来创建一个新的 java.nio.channels.SocketChannel对象。该方法的参数是:java.net.SocketAddress对象,只是连接的主机和端口。

2. 通道以阻塞模式打开

NIO零拷贝

传统的IO操作(OIO)

内核空间 | 用户空间 | 硬件

1.读操作

Java虚拟机开始读操作,告知用户空间开始读操作,从而用户空间切换到内核空间,然后内核空间从硬件读取数据到内核空间的缓冲区,然后内核空间的数据原封不动拷贝到用户空间的缓存区中,然后在用户空间执行数据操作。

2.写操作

Java虚拟机调用write操作,将数据从用户空间拷贝到内核空间的缓存区,然后再由内核空间的缓存区拷贝到硬件。完成数据操作。

零拷贝

零拷贝,没有从内核空间与用户空间的数据相互缓存。

内核空间收到用户空间的sendfile()操作,然后内核空间向硬件要数据,然后硬件将数据传送给内核空间的缓存区。然后写数据到目标socket的缓存区。然后返回sendfile()信息。

用户空间内存映射到内核空间,只在内核空间进行数据操作。


Reactor模式(反应器模式)

类似的模式 Proactor模式

Netty整体架构是Reactor模式的完整体现。

Reactor设计模式会设立一些服务请求,处理由一个或多个客户端并发发送的的处理请求。

多线程IO致命缺陷

在最原始的网络编程思路就是服务器用一个while循环,不断监听端口是否有新的套接字连接,如果有,那么就调用一个处理函数处理。

while(true){
    socket = accept();
    handle(socket);
}

这种方法的最大问题是无法并发,效率太低。如果当前的请求没有处理完,那么后面的请求只能被阻塞,服务器的吞吐量太低。然后在之后,想到了使用多线程,也就是很经典的connection per thread,每一个连接用一个线程处理。

class BasicModel implements Runnable {
    public void run() {
        try {
            ServerSocket ss =
                    new ServerSocket(SystemConfig.SOCKET_SERVER_PORT);
            while (!Thread.interrupted())
                new Thread(new Handler(ss.accept())).start();
            //创建新线程来handle
            // or, single-threaded, or a thread pool
        } catch (IOException ex) { /* ... */ }
    }

    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[SystemConfig.INPUT_SIZE];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }
        private byte[] process(byte[] input) {
            byte[] output=null;
            /* ... */
            return output;
        }
    }
}

在上面对于每一个请求都分配一个线程,每个线程都独自处理上面的流程。

1. 多线程并发模式的优点

在一定程度上极大地提高了服务器的吞吐量,因为之前的请求在read阻塞以后,不会影响到后续的请求,因为他们在不同的线程中。

2. 多线程并发模式的缺点

在于资源要求太高,系统中创建线程是需要比较高的系统资源的,如果连接数太高,系统无法承受,而且线程的反复创建-销毁也需要代价。

改进的方案便是:采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。使用Reactor模式,对线程的数量进行控制,一个线程处理大量的事件。

Dispatch | Notifier 模式

在Java的NIO模式的Selector网络通讯就是一个简单的Reactor模型,是Reactor模型的朴素原型。

客户端连接到Reactor对象,然后Reactor对象派发客户端的请求到各个处理线程。Reactor本身不做任何操作,只是做一个中间商派发处理客户端各种请求。

提示:实际上Reactor模式是基于Java NIO的,在它的基础上抽象出来两个组件:Reactor和Handler两个组件。

Reactor 负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler处理。新的事件包括连接建立就绪、读就绪、写就绪等。

Handler 负责将自身(handler)与事件绑定,负责事件的处理,完成Channel的读入,完成处理业务逻辑后负责将结果写出channel。

1. Reactor1 - Setup

class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocketChannel;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);//非阻塞
        SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//注册到selector上并处于等待客户端连接

        sk.attach(new Acceptor());//向其附加上任意的对象
    }
}

2. Reactor2 - Dispatch Loop

//thread
public void run(){
    try{
        while(!Thread.interrupted()){
            selector.select();
            Set selected = selector.selectedKeys();
            Iterator it = selected.iterator();
            while(it.hasNext()){
                dispatch((SelectionKey)(it.next()));//分发
                selected.clear();//处理的selector要清除掉
            }
        }
    }catch(IOException e){

    }
}

void dispatch(SelectionKey k){
    Runnable r = (Runnable) k.attachement();//获取之前attach()方法添加的对象
    if(r != null){
        r.run();
    }
}

3. Reactor3 - Acceptor

class Acceptor implements Runnable {
    public void run(){
        try{
            SocketChannel c = serverSocketChannel.accept();
            if(c != null){
                new Handler(selector, c);//自定义handler或netty提供的
            }
        }catch(Exception e){

        }
    }
}

4. Reactor4 - Handler Setup

final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);

    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector sel, SocketChannel c){
        socket = c;
        c.configureBlocking(false);
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }
}

5. Reactor5 - Request handing

public void run(){
    try{
        if(state == READING) read();
        else if(state == SENDING) send();
    }catch(IOException e){

    }
}

void read(){
    socket.read(input);
    if(inputIsComplete()){
        process();
        state = SENDING;
        sk.interestOps(SelectionKey.OP_WRITE);
    }
}

void send(){
    socket.write(output);
    if(outputIsComplete()){
        sk.cancel();
    }
}

Reactor编程的优缺点

1. 优点
1. 响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的。

2. 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销

3. 可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源

4. 可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性
2. 缺点
1. 相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。

BossGrouo | WorkerGroup 监听过程

1. 首先存在多个客户端等待连接

2. 然后BossGroup监听OP_ACCEPT事件,一旦该事件被监听到之后就会将其注册到Selector中,并返回SelectionKey(包含了Selector的属性)集合,然后SelectionKey本身包装了SocketChannel对象(是与客户端真正建立连接的对象)

3. 对于netty对象而言,又会将SocketChannel对象包装成NioSocketChannel对象

4. 接着将NioSocketChannel注册到Worker当中的Selector选择器上面,而此选择器本身却是监听了READ事件本身的。那么接下来客户端有什么请求,就会直接跟workerGroup打交道。

本文标题:Java NIO

文章作者:Bangjin-Hu

发布时间:2019年10月15日 - 09:22:26

最后更新:2020年03月30日 - 08:03:35

原始链接:http://bangjinhu.github.io/undefined/NIO/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

Bangjin-Hu wechat
欢迎扫码关注微信公众号,订阅我的微信公众号.
坚持原创技术分享,您的支持是我创作的动力.