Fork me on GitHub

Netty

注意:所有文章除特别说明外,转载请注明出处.

Netty

[TOC]

概念

官方定义,它是一个异步的,基于时间Client/Server的网络框架,目标是提供一种简单、快速构建网络应用的方式,同时保证高吞吐量、低延时、高可靠性。

提示:Netty其实就是一个NIO框架,它适用于服务器通讯相关的多种应用场景,主要还是针对于TCP协议下,面向Clients端的高并发应用,或者Peer-to-Peer场景下的大量数据持续传输的应用。

NIO

NIO,非阻塞IO,在JAVA中NIO的核心就是Selector机制。简单而言,创建一个Socket Channel,并将其注册到一个Selector上(多路复用器),这个Selector将会“关注”Channel上发生的IO读写事件,并在事件发生(数据就绪)后执行相关的处理逻辑。对于阻塞IO,它需要在read()、write()操作上阻塞而直到数据操作完毕,但是NIO则不需要,只有当Selector检测到此Channel上有事件时才会触发调用read、write操作。

提示:NIO本身不是严格意义上的异步IO,最大的原因是因为Selector本身是阻塞的(selector需要通过线程阻塞的方式(其select方法)获取底层通道的事件变更,然后获取SelectionKey列表)。

总结:我觉得最好的学习方式就是学好netty包里面的所有example代码。

Channel接口

通道,一个逻辑通道,维护Socket上IO的write、read、connect等相关操作。我们可以看到Bootstrap、ServerBootstrap初始化之后都需要在channel(Channel channel)方法中指定内部需要创建的channel类型。

提示:Channel中所有的操作均是异步的,IO操作都会返回一个ChannelFuture实例

Channel接口中还包含了一个子接口:Unsafe;这个Unsafe和JDK自带的Unsafe没有关系,Netty中的Unsafe只是框架内部使用,主要用来操作底层的Socket,比如connect、close、read、write;即Channel接口中有关底层Socket操作的,将会有Unsafe来操作javaChannel()即可。

ChannelFuture

ChannelFuture表示Channel IO异步操作的结果。IO调用方法立即返回,但此时并不表示IO实际操作已经实际执行结束,只是返回一个ChannelFuture实例,具体执行的结果状态可以通过检测Future才能得到。

boolean isDone():操作是否完成,completed 还是 uncompleted

boolean isCanclled():如果Future已经完成,则判断操作是否被取消。

boolean isSuccess():同上。

Throwable cause():如果执行失败,此处可以获取导致失败的exception信息。

ChannelFuture await():等待,直到异步操作执行完毕,内部基于wait实现。

ChannelFuture sync():等待,直到异步操作执行完毕,核心思想同await。我们得到Future实例后,可以使用sync()方法来阻塞当前线程,直到异步操作执行完毕。和await的区别为,如果异步操作失败,那么将会重新抛出异常(将上述cause()方法中的异常抛出)。await和sync一样,当异步操作执行完毕后,通过notifyAll()唤醒。

ChannelFuture addListener(GenericFutureListener listener):向Future添加一个listener,当异步操作执行完毕后(无论成败),会依次调用listener的operationCompleted方法。

EventLoopGroup

我们可以简单的认为EventLoopGroup是一个线程池调度服务,这和我们常用的FixedThreadPool在设计思想上没有太大区别;在Netty中比较常用的子类就是NioEventLoopGroup,它继承了ScheduledExecutorService。我们在创建NioEventLoopGroup时,可以指定线程池的容量,默认为:CPU处理器个数 * 2。

在NioEventLoop创建实例时会同时创建一个Selector,即每个NioEventLoop都持有一个Selector实例。由此可见,NioEventLoopGroup的线程池容量,就是线程的个数,也是Bootstrap中持有的Selector的数量。

每个NioEventLoop实例内部Thread负责select多个Channel的IO事件(NIO Selector.select),如果某个Channel有事件发生,则在内部线程中直接使用此Channel的Unsafe实例进行底层实际的IO操作。简单而言,就是让每个NioEventLoop管理一组Channel。

在ServerBootstrap中,创建两个NioEventLoopGroup,其中“bossGroup”为Acceptor 线程池,这个线程池只需要一个线程(大于1,事实上没有意义),它主要是负责accept客户端链接,并创建SocketChannel,此后从“workerGroup”线程池(reactor)中以轮询的方式(next)取出一个NioEventLoop实例,并将此Channel注册到NioEventLoop的selector上,此后将由此selector负责监测Channel上的读写事件(并由此NioEventLoop线程负责执行)。

由此可见,对于ServerBootstrap而言,bossGroup中的一个线程的selector只关注SelectionKey.OPT_ACCEPT事件。

EventLoop | 线程模型

ChannelHandler | Pipeline

@Sharable 表示一个ChannelHandler可以被多个Channel安全的共享

在Netty中,从Socket通道中read数据进入Netty Handler的方向为inbound,从Netty handler向Socket通道write数据的方向为outbound。

ChannelInboundHandler

void channelRegistered(ChannelHandlerContext ctx)

当channel注册成功后执行,即channel绑定到NioEventLoop上,且将Channel注册到selector之后执行。

void channelActive(ChannelHandlerContext ctx)

channel注册首次成功后执行,即channelRegistered方法执行后调用。

void channelRead(ChannelHandlerContext ctx,Object msg)

socket将已经读取到数据传递给handler,此方法在NIO中READ事件触发后并读取到任意字节数据后被fire

void channelReadComplete(ChannelHandlerContext ctx)

read操作结束后调用;read数据首先被添加到Bytebuf中,如果这个Bytebuf已满,则会终止read操作,调用此方法。

ChannelOutboundHandler

void read(ChannelHandlerContext ctx)

总结:Channel创建后,初始化其pipeline,当Channel注册到Selector后(包括绑定到NioEventLoop线程),执行ChannelInitializer这个特殊的handler并将开发者指定的其他多个handler添加到pipeline中;当NioEventLoop线程中,selector检测到NIO事件后,将会依次执行此Channel pipeline中的相应的fire方法(比如fireChannelRead()),那么fire方法将会从head或者tail这两个特殊的handler开始,依次调用pipeline中其他handler的实际方法(比如channelRead())。

注意:pipeline中的某个handler认为操作需要继续传递下去,那么在此handler的实际业务执行完毕之后要通过调用ChannelHandlerContext相应的fire方法,将事件继续传递给下一个handler;如果没有调用fire方法,那么此事件将会被终止。


WebSocket

该协议是完全重新设计的协议,旨在为了Web上的双向数据传输问题提供一个解决方案,可以让客户端与服务器之间在任意时刻传输消息。此要求它们能够异步处理消息回执。

添加WebSocket支持

在从标准的HTTP或HTTPS协议切换到WebSocket时,将会使用一种称为升级握手的机制。因此在使用WebSocket应用程序将始终以HTTP/S作为开始,然后再执行升级。

我们应用程序将采用下面的约定:如果被请求的URL以/ws结尾,那我们将会把该协议升级为WebSocket。否则服务器将使用基本的HTTP/S。

在连接完成升级之后,所有数据传输都将使用WebSocket进行。

WEBSOCKET帧 WebSocket以帧的方式进行传输数据,每一帧代表消息的一部分。一个完整的消息可能包含许多帧。

Protobuf

概念

RMI:remote method invocation 只针对Java

client 

server

序列化与反序列化:或叫做编码与解码。

RPC Remote Procedure Call 远程过程调用 很多RPC框架是跨语言的。

1. 定义一个接口说明文件:描述了对象(结构体)、对象成员、接口方法等一系列信息。

2. 通过RPC框架所提供的编译器,将接口说明文件编译成具体的语言文件。

3. 在客户端与服务器端分别引入RPC编译器所生成的文件,即可像调用本地方法一样调用远程方法。

提示:在idea引入protobuf的时候注意pc上安装的protobuf版本号与gradle引入的版本号的一致性。

protobuf工程中使用git作为版本控制系统:
1. git submodule git仓库里面的一个仓库

    这一命令可以关联到一个新的git仓库,是专门用来存放protobuf项目的。

    在原先的工程里面生成了一个protobuf工程,在新生成的protoc编译之后的文件放在对一个的仓库下面

2. git subtree 类似于git submodule

    这一命令是将这一仓库pull到该项目,但是是在合并而不是分成两个

Thrift

Thrift 传输格式

1. TBinaryProtocol 二进制格式

2. TCompactProtocol 压缩格式

3. TJSONProtocol JSON格式

4. TSimpleJSONProtocol 提供JSON只写协议,生成的文件很容易通过脚本语言解析

5. TDebugProtocol 使用易懂的可读文本格式,以便于debug

Thrift 数据传输方式

1. TSocket 阻塞式socket

2. TFramedTransport 以frame为单位进行传输,非阻塞式服务中使用

3. TFileTransport 以文本形式进行传输

4. TMemoryTransport 将内存用于IO,Java实现时内部实际使用了简单的ByteArrayOutputStream。

5. TZlibTransport 使用zlib进行压缩,与其它传输方式联合使用。当前无Java实现。

Thrift 支持的服务类型

1. TSimpleServer 简单的单线程服务模型,常用于测试

2. TThreadPoolServer 多线程服务模型,使用标准的阻塞IO

3. TNonblockingServer 多线程服务模型,使用非阻塞IO(需要使用TFramedTransport数据传输方式)

4. THsHaServer THsHa引入了线程池去处理,其模型将读写任务放到线程池去处理。Half-sync/Half-async的处理模式:Half-aysnc是在处理IO事件上(accept/read/write io) 异步处理。 | Half-sync用于handler对rpc的同步处理。

gRPC

gRPC官方 gRPC仓库 Protobuf官网

RPC(远程过程调用)框架实际上是提供了一套机制,使得应用程序之间可以进行通信,同时遵循Server/Client模型。在我们使用的时候,客户端调用Server端的接口就像是调用本地的函数一样。

RPC通信

gRPC | Restful

gRPC和Restful API都提供了一套通信机制,用于server/client模型通信,而且它们都使用http作为底层的传输协议(严格地说, gRPC使用的http2.0,而Restful api则不一定)。不过gRPC还是有些特有的优势。

1. gRPC通过Protobuf定义接口,更加严格的接口约束。

2. Protobuf可以将数据序列化为二进制编码,从而大幅度减少需要传输的数据量,大幅度的提高性能。

3. gRPC可以方便地支持流式通信(理论上通过http2.0就可以使用streaming模式, 但是通常web服务的restful api似乎很少这么用,通常的流式数据应用如视频流,一般都会使用专门的协议如HLS,RTMP等,这些就不是我们通常web服务了,而是有专门的服务器应用。)

4. 对于性能有更高的要求时。有时我们的服务需要传递大量的数据,而又希望不影响我们的性能,这个时候也可以考虑gRPC服务,因为通过protobuf我们可以将数据压缩编码转化为二进制格式,通常传递的数据量要小得多,而且通过http2我们可以实现异步的请求,从而大大提高了通信效率。

提示:我们不会单独的使用gRPC,而是将gRPC作为一个部件进行使用,这是因为在生产环境中,我们面对高并发的情况下,需要使用分布式系统去处理,而gRPC并没有提供分布式系统相关的一些组件,而且真正的线上服务还需要提供包括负载均衡,限流熔断,监控报警,服务注册和发现等必要的组件。

gRPC 开发的通用步骤

1. 编写Protobuf来定义接口和数据类型

2. 编写gRPC Server端程序

3. 编写gRPC Client端程序

proto3

回调钩子

注册一个关闭Java虚拟机的钩子。Java虚拟机关闭响应两种类型的事件。

1. 程序执行完之后正常的退出。

2. 用户主动中断或者系统事件导致的退出。

更多的看源码。

Runtime.getRuntime().addShutdownHook(new Thread(
    ()->{
        System.out.println("close jvm");
        GrpcServer.this.stop();
    }
));

提示:回调钩子的作用在于在JVM关闭之前先关闭 GrpcServer.this.stop(); 。

gRPC 流式调用处理

提示:gRPC的四种通信方式(1. 一般的RPC调用,一个请求对象,一个返回对象。2. 服务端流式RPC,一个请求对象,服务端可以传回多个结果对象。3. 客户端流式RPC,客户端传入多个请求,服务器返回一个响应结果。4. 双向流式RPC,结合客户端流式rpc和服务端流式rpc,可以传入多个对象,返回多个响应对象。)。

syntax = "proto3";

package com.proto.xidian.edu.cn;

option java_package = "com.proto.xidian.edu.cn";
option java_outer_classname = "StudentProto";
option java_multiple_files = true;

service StudentService {
    rpc GetRealNameByUsername (MyRequest) returns (MyResponse) {}

//    rpc GetStudentsByAge(int32) returns (stream StudentResponse) {}

//    rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {}

    rpc GetStudentsWrapperByAges(stream StudentRequest ) returns (StudentResponseList) {}

}

message MyRequest {
    string username = 1;
}

message MyResponse {
    string realname = 2;
}

message StudentResponse {
    string name = 1;
    int32 age = 2;
    string city = 3;
}

message StudentRequest {
    int32 age = 1;
}

message StudentResponseList {
    repeated StudentResponse studentResponse = 1;
}

在处理过程中主要在返回stream类型的结果类型。在客户端发出一个请求之后,服务器返回的类型是tream(实际上就是一个迭代器)。

提示:在编写.proto文件之后利用gradle构建工具对项目进行build操作,得到新生成的文件。

错误 error

What went wrong:
Some problems were found with the configuration of task ‘:generateProto’.

Value ‘directory ‘D:\projects\SSM_practice\netty_lecture\build/extracted-protos/main’’ specified for property ‘$2’ cannot be converted to a file.

Value ‘main Proto source’ specified for property ‘$1’ cannot be converted to a file.

解决:将Protobuf-Gradle-plugin版本更新到0.8.4或更高版本。

Gradle

gradlew

gradlew 等价于gradle wrapper。使用者在本地pc上没有安装gradle工具的前提下,会自动构建项目。

EventLoopGroup

1. 在一个EventLoopGroup中可能会包含一个或多个EventGroup。

2. 一个EventLoop在它的整个生命周期当中都只会与唯一一个Thread进行绑定。

3. 所有由EventLoop所处理的各种IO事件都将在它所关联的那个Thread上进行处理。

4. 一个Channel在它的整个生命周期中只会注册在一个EventLoop上。

5. 一个EventLoop在运行过程中,会被分配给一个或多个Channel。

在Nettt中,Channel的实现一定是线程安全的。基于此,我们可以存储一个Channel的引用,并且在需要向远程端点发送数据时,通过这个引用来调用Channel相应的方法,即便当时有很多线程都在使用它也不会出现多线程问题,而且消息一定会按照顺序发送出去。

重要结论:我们在业务开发过程中,不要将长时间执行的耗时任务放入到EventLoop的执行队列中,因为它将会一直阻塞该线程所对应的所有Channel上的其它执行任务,如果我们需要进行阻塞调用或耗时的操作(通常在开发中很常见),那么我们需要使用一个专门的EventExecutor(业务线程池)。

业务线程池(EventExecutor)通常的两种实现方式:

    1. 在ChannelHandler的回调方法中,使用自己定义的业务线程池,这样就可以实现异步调用。

    2. 借助Netty提供的向ChannelPipeline添加ChannelHandler时调用的addLast()方法来传递EventExecutor。

说明:默认情况下(调用addLast(handler)),ChannelHandler中回调方法都是由IO线程所执行,如果调用了 ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler… handlers);方法,那么ChannelHandler中的回调方法就是由参数中的group线程组所执行的。

Future

JDK所提供的Future只能通过手工方式检查执行结果,而这个操作是会阻塞的。Netty则对ChannelFuture进行了增强,通过ChannelFutureListener以回调的方式来获取执行结果,去除了手工检查阻塞的操作。值得注意的是:ChannelFutureListener的operationComplete()方法是由IO线程执行的,因此要注意的是不要在这里执行耗时操作,否则需要通过另外的线程或线程池来执行。

Channel | ChannelHandler | ChannelHandlerContext

在Netty中有两种发送消息的方式,可以直接写到Channel中,也可以写到与ChannelHandler所关联的那个ChannelHandlerContext中。对于前一种方式来说,消息会从ChannelPipeline的末尾开始流动,而对于后一种方式来说,消息将从ChannelHandlerContext的下一个ChannelHandler开始流动。

结论:

1. ChannelHandlerContext与ChannelHandler之间的关联绑定关系是永远都不会发生改变的,因此对其进行缓冲是没有任何问题的。

2. 对于与Channel的同名方法来说,ChannelHandlerContext的方法将会产生更短的事件流,所以我们应该在可能的情况下利用这个特性来提升应用的性能。

Netty 提供的3种缓冲区类型

1. Heap Buffer (堆缓冲区)

    这是最常用的类型,ByteBuf将数据存储到JVM的堆空间中,并且将实际的数据存放到byte array中来实现。

    优点:由于数据存放在JVM的堆中,因此可以快速的创建与释放,并且他提供了直接访问内部字节数组的方法。

    缺点:每次读写数据时,都需要先将数据复制到直接缓冲区中再进行网络传输。

2. Direct Buffer (直接缓冲区)

    在堆之外直接分配内存空间,直接缓冲区并不会占用堆的容量空间,因为它是有操作系统在本地内存中进行的数据分配。

    优点:在使用Socket进行数据传递时,性能非常好,因为数据直接位于操作系统的本地内存中,所以不需要从JVM将数据复制到直接缓冲区中,性能很好。

    缺点:因为Direct Buffer是直接在操作系统内存中的,所以内存空间的分配与释放要比堆空间更加复杂,而且速度要慢一些。

    这里Netty提供内存池来解决这个问题。DirectBuf可以放在内存池里面。直接缓冲区并不支持通过字节数组的方式来访问数据。

提示:对于后端的业务消息的编解码来说,推荐使用HeapByteBuf。对于IO通信线程在读写缓冲区时,推荐使用DirectByteBuf。

3. Composite Buffer (复合缓冲区)

JDK的ByteBuffer与Netty的ByteBuf之间的差异

1. Netty的ByteBuf采用了读写分离的策略(readerIndex | writerIndex),一个初始化(里面尚未有任何数据)的ByteBuf的readerIndex | writerIndex值都为0。

2. 当读索引与写索引处于同一个位置时,如果我们继续读取,就会抛出异常IndexOutBoundsException。

3. 对于ByteBuf的任何读写操作都会分别单独维护读索引和写索引。maxCapacity最大容量默认的限制就是Integer.MAX_VALUE。

JDK的ByteBuffer的缺点

1. final byte[] hb;这是JDK的ByteBuffer对象中用于存储数据的对象声明。可以看到,字节数组被声明为final的,也就是长度是固定不变的。一旦分配好之后就不能动态扩容和收缩。而且当待存储的数据字节很大时就很有可能抛出IndexOutBoundsException异常。如果要预防这个异常的抛出,那就需要在存储之前完全确定好待存储的字节大小。如果ByteBuffer的空间不足,我们只有一种解决方案:创建一个全新的ByteBuffer对象,然后再将之前的ByteBuffer中的数据复制过去,这些操作都需要我们自发完成。

2. ByteBuffer只使用一个position指针来表示位置信息,在进行读写切换的时候需要调用flip()方法或rewind()方法,使用起来不方便。

Netty的ByteBuf的优点

1. 存储字节的数组是动态的,其最大值默认是Integer.MAX_VALUE。这里的动态性是体现在write()方法中的,write()方法在执行时会判断

提示:自旋锁,不断占用CPU进行自旋,直到条件满足跳出死循环为止。

//自旋锁的应用
for (;;) {
    int refCnt = this.refCnt;
    final int nextCnt = refCnt + increment;

    if (nextCnt <= increment) {
        throw new IllegalReferenceCountException(refCnt, increment);
    }
    //判断或跳出
    if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
        break;
    }
}

AtomicIntegerFieldUpdater 要点总结

1. 更新器更新的必须是int类型变量,不能是其包装类型。

2. 更新器更新的必须是volatile类型变量,确保线程之间共享变量时的立即可见性。

3. 待更新的变量不能是static的,必须要是实例变量。因为Unsafe.objectFieldOffset()方法不支持静态变量(CAS操作本质上是通过对象实例的偏移量来直接进行赋值)。

4. 更新器只能修改它可见范围内的变量,因为更新器是通过反射来得到这变量,如果变量不可见就会报错。

提示:如果要更新的变量是包装类型的话,可以使用AtomicReferenceFieldUpdater来进行更新。

Netty 处理器概念

1. Netty处理分类:入站处理器 | 出站处理器。

2. 入站处理器的顶层是ChannelInBoundHandler,出站处理器的顶层是ChannelOutBoundHandler。

3. 数据处理时常用的各种编解码器本质上都是处理器。

4. 编解码器:无论我们向网络中写入的数据是什么类型(int | char | String | 二进制等),数据在网络中传递时,其都是以字节流形式呈现的。

    编码:将数据由原本形式转换为字节流的操作称为编码(encode)。

    解码:将字节流转换成它原本的数据形式或其它格式称为解码(decode)。

    编解码统一称为codec。

5. 编码(从程序到网络 - 本质上是一种出站处理器(ChannelOutBoundHandler))

6. 解码(从网络到程序 - 本质上是一种入站处理器(ChannelInBoundHandler))

7. 在Netty中,编码器通常以XXXEncoder命名。解码器通常以XXXDecoder命名。

TCP 粘包与拆包

粘包表示将多条数据粘在一起。拆包的概念与粘包相反,是将多条数据拆分开来。

关于Netty编解码器的结论

1. 无论是编码器还是解码器,其所接受的消息类型必须要与待处理的参数类型一致,否则该编码器或解码器并不会执行。

2. 

本文标题:Netty

文章作者:Bangjin-Hu

发布时间:2019年10月15日 - 09:22:26

最后更新:2020年03月30日 - 08:04:51

原始链接:http://bangjinhu.github.io/undefined/Netty/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

Bangjin-Hu wechat
欢迎扫码关注微信公众号,订阅我的微信公众号.
坚持原创技术分享,您的支持是我创作的动力.