Netty基础
Netty
一、Netty入门
1.1 概述
Netty是一个异步的(这里异步主要指通过多线程完成方法调用和处理结果相分离(因为如果调用方法的线程和接收数据的线程是同一个,那么意味着是同步)指调用时的异步,不是异步IO)、基于事件驱动(即底层多路复用selector)的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
1.2 Hello Word
//客户端向服务器发送HW,服务器接收不返回
/**
* 服务端
*/
public class HelloServer{
public static void main(String[] args){
// 1.服务启动器,负责组装netty组件,启动服务器
new ServerBootstrap()
// 2.NIO基础部分有用到BossEventLoop, WorkerEventLoop(selector, thread), group组 EventLoop包含了线程和选择器
.group(new NioEventLoopGroup())
// 3.选择服务器的ServerSocketChannel实现
.channel(NioServerSocketChannel.class)
// 4. boss负责处理连接 worker(child) 负责处理读写,决定了worker(child)能执行哪些操作(handler)
.childHandler(
// 5.channel代表和客户端进行数据读写的通道 Inializer初始化,负责添加别的handler
new ChannelInitializer<NioSocketChannel>(){
@Override
protected void initChannel(NioSocketChannel ch) throws Exception{
// 6.添加具体handler
ch.pipeline().addLast(new StringDecoder()); // 将ByteBuf转换为字符串
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ // 自定义handler
@Override // 读事件发生后
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
// 打印上一步转换好的字符串
sout(msg);
}
});
}
})
.bind(8080);
}
}
/**
* 客户端
*/
public class HelloClient{
public static void main(String[] args) throws InterruptedException {
// 1.启动类
new Bootstrap()
// 2.添加EventLoop
.group(new NioEventLoopGroup())
// 3.选择客户端channel实现
.channel(NioSocketChannel.class)
// 4.添加处理器
.handler(new ChannelInitializer<NioSocketChannel>(){
@Override // 初始化处理器在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception{
ch.pipeline().addLast(new StringEncoder());
}
})
// 5.连接到服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync() // 同步等待channel关闭
.channel()
.writeAndFlush("hello Word");
}
}
提示:
一开始要树立正确的理解
- 把channel理解为数据的通道
- 把msg理解为流动的数据,最开始输入是ByteBuf,但经过pipeline的加工,会变成其它类型对象,最后输出又变成ByteBuf
- 把handler理解为数据的处理工序
- 工序有多道,合在一起就是pipeline,pipeline负责发布事件(读、读取完成……)传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应事件处理方法)
- handler分Inbound和Outbound两类 (入站 出站)
- 把eventLoop理解为处理数据的工人
- 工人可以管理多个channel的io操作,并且一旦工人负责了某个channel,就要负责到底(绑定)
- 工人既可以执行io操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个channel的待处理任务,任务分为普通任务、定时任务
- 工人按照pipeline顺序,一次按照handler的规划(代码)处理数据,可以为每道工序指定不同的工人
1.3 组件
1.3.1 EventLoop
事件循环对象
EventLoop本质是一个单线程执行器(同时维护了一个Selector),里面有run方法处理Channel上源源不断的io事件。它的继承关系比较复杂
- 一条线是继承自j.u.c.ScheduledExecutorService因此包含了线程池中所有的方法
- 另一条线是继承自Netty自己的OrderedEventExecutor
- 提供了boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此EventLoop
- 提供了parent方法来看看自己属于哪个EventLoopGroup
事件循环组
EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个Channel上的io事件都由此EventLoop来处理(保证了io事件处理时的线程安全)
- 继承自Netty自己的EventExecutorGroup
- 实现了Iterable接口提供遍历EventLoop的能力
- 另有next方法获取集合中下一个EventLoop
@Slf4j
public class TestEventLoop{
public static void main(String[] args){
// 1.创建事件循环组
EventLoopGroup group = new NioEventLoopGroup(); // io事件、普通任务、定时任务
// EventLoopGroup group = new DefaultEventLoopGroup(); // 普通任务、定时任务
// sout(NettyRuntime.availableProcessors()); // 获取到系统的CPU核心数
// 2.获取下一个事件循环对象
sout(group.next());
// 3.执行普通任务
group.next().execute(() -> {
log.debug("ok") ;
});
log.debug("main");
// 4.执行定时任务 做keepAlive做连接的保活
group.next().scheduleAtFixedRate(() -> {
log.debug("ok");
}, 0, 1, TimeUnit.SECONDS);
}
}
EventLoop IO任务
public class EventLoopServer{
public static void main(String[] args){
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>(){
@Override
protected void initChannel(NioSocketChannel ch) throws Exception{
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override // ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}
测试多线程时,断点处Suspend:应选Thread即主线程停下来不会影响其他线程的运行 (All执行到断点处所有线程都会停下来)
head、h1、tail都是handler
一旦建立连接,channel就会和一个NioEventLoop绑定,后续所有的请求都会通过同一个EventLoop来处理
EventLoop 分工细化
@Slf4j
public class EventLoopServer{
public static void main(String[] args){
// 细分2:创建一个独立的EventLoopGroup,专门去处理耗时较长的操作【一个EventLoop selector管理多个channel,其中一个channel执行到了handler执行耗时较长那么会影响其他channel的操作,某个handler耗时较长,最好不要让它去占用worker的NIO线程,否则会影响NIO的读写操作】
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
// boss 和 worker 【EventLoop(Selector)比作boss与worker】
// 细分1:参数1boss只负责ServerSocketChannel上accept事件 参数2worker只负责socketChannel上的读写
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>(){
@Override
protected void initChannel(NioSocketChannel ch) throws Exception{
ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter(){ // 因为第一次参数未指定group,所以依然会采用NioEventLoopGroup去处理
@Override // ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
ByteBuf buf = (ByteBuf)msg;
log.debug(buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg); // 让消息传递给下一个handler,不使用的话执行到handler1就断了不会执行下一个handler2,这是自定义handler需注意的
}
}).addLast(group, "handler2", new ChannelInboundHandlerAdapter(){
@Override // ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}
head 、h1、 h2、tail都是handler往下执行,粉色handler执行NioEvenLoopGroup中Selector负责,绿色为耗时较长的另外新创建独立的handler,该channel注册到DefaultEventLoopGroup,该组下有两个selector负责监管
源码分析
解释多个handler之间如果使用不同的EventLoop(不同的Selector),那么是怎么进行线程的切换的
1.3.2 Channel
channel的主要作用
- close() 可以用来关闭 channel 【close() 方法是异步的】
- closeFuture()用来处理channel的关闭
- sync方法作用是同步等待channel关闭 【阻塞住当前线程,直到Nio线程连接建立完毕】
- 而addListener方法是异步等待channel关闭
- pipeline() 方法添加处理器
- write() 方法将数据写入
- writeAndFlush()方法将数据写入并刷出
Netty为什么用异步
异步多线程对响应时间并没有提升,反而降低了,异步提升的是吞吐量,单位时间内处理请求的个数
因此需要合理的拆分任务,即精细化任务
1.3.3 Future & Promise
在异步处理时,经常用到这两个接口
Netty中的Future与JDK中的Future同名,但是是两个接口,它们之间有关系:Netty中的Future继承自JDK中的Future,而Promise继承自Netty中的Future接口
- JDK Future只能同步等待任务结束(或成功、或失败)才能得到结果
- Netty Future可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
- Netty Promise不仅有Netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 | JDK Future | Netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | ||
isCanceled | 任务是否取消 | ||
isDone | 任务是否完成,不能区分成功失败 | ||
get | 获取任务结果,同步阻塞等待 | ||
getNow | 获取任务结果,非阻塞 | ||
await | 等待任务结束,如果任务失败,不会抛异常,而是通过isSuccess判断 | ||
sync | 等待任务结束,如果任务失败,抛出异常(不返回结果) | ||
isSuccess | 判断任务是否成功 | ||
cause | 获取失败信息,非阻塞,如果没有失败,返回null | ||
addListener | 添加回调,异步接收结果 | ||
setSuccess | 设置成功结果 | ||
setFailure | 设置失败结果 |
JDK Future
// 可以理解为Future就是线程之间传递结果数据的容器
@Slf4j
public class TestJDKFuture{
public static void main(String[] args) throws Exception{
// jdk中Future一般关联线程池一起使用
// 1. 创建线程池
ExecutorService service = Executors.newFixedThreadPool(2);
// 2. 提交任务 返回的是JDK中Future对象
Future<Integer> future = service.submit(new Callable<Integer>(){
@Override
public Integer call() throws Exception{
log.debug("线程池中的线程执行计算");
Thread.sleep(1000); // 任务执行时间
return 50; //返回结果
}
});
// 执行任务的是线程池中的线程,获取结果的是外面的主线程,那么主线程如何与线程池中的该线程通信获得其结果呢?此时需要用到Future对象(submit返回即是Future对象)
// 3. 主线程通过future获取结果
log.debug("主线程等待结果");
log.debug("主线程获取线程池线程结果是 {}", future.get()); // 同步阻塞等待
}
}
Netty Future
@Slf4j
public class TestNettyFuture{
public static void main(String[] args){
NioEventLoopGroup group = new NioEventLoopGroup();
// 每个EventLoop中只有一个线程
EventLoop eventLoop = group.next();
// 返回Netty中Future对象
Future<Integer> future = eventLoop.submit(new Callable<Integer>(){
@Override
public Integer call() throws Exception{
log.debug("执行计算");
Thread.sleep(1000);
return 70;
}
});
// 3. 主线程通过future获取结果
// log.debug("主线程等待结果");
// log.debug("主线程获取线程池线程结果是 {}", future.get()); // 同步方式获取结果
// 异步获取结果:
future.addListener(new GenericFutureListener<Future<? super Integer>>(){
@Override
public void operationComplete(Future<? super Integer> future) throws Exception{
log.debug("接收的结果是:{}", future.getNow()); // 因为执行了回调方法那么一定已经获取到了结果,那么可以用非阻塞的getNow()方法立即获取结果,不需要阻塞
}
});
}
}
Promise(如开发RPC框架时很有用)
// JDK Future与Netty Future有个共同的特点:即Future对象不是由我们自己创建的,而是向线程池中提交任务时返回的Future对象,Future的创建权与结果的设置权都不是我们可以控制的,通过Promise对象可以我们灵活的配置
@Slf4j
public class TestNettyPromise{
// 1. 准备EventLoop对象
EventLoop eventLoop = new NioEventLoopGroup().next();
// 2. 可以主动创建promise 是一个结果容器
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(() -> {
// 3. 任意一个线程执行计算,完毕后向promise中填充结果
log.debug("开始计算。。。");
try{
Thread.sleep(1000);
}catch(Exception e){
e.printStackTrace();
}
promise.setSuccess(80);
}).start();
// 4. 接收结果的线程
log.debug("等待结果。。。");
log.debug("结果是:{}", promise.get());
// 异步方式 同Netty Future上面代码
}
ChannelFuture连接问题
public class EventLoopClient{
public static void main(String[] args) throws InterruptedException{
// 带有 Future、promise的类型都是和异步方法配套使用,用来处理结果
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>(){
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception{
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080)); // connect线程是异步非阻塞,真正连接的是另一个线程(NioEventLoop中的一个线程nio线程),main线程发起了调用connect,不用等待结果可以继续向下运行
/** 解决方法一:
channelFuture.sync(); // 如果不调用sync方法,那么会无阻塞向下执行,但此时connect还未成功建立。解决方法一:使用sync方法同步处理结果
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello word");
**/
/** 解决方法二:
* 使用addListener(回调对象)方法异步处理结果,不是由main线程而是由nio线程执行
**/
channelFuture.addListener(new ChannelFutureListener(){
@Override
// 在nio线程连接建立好之后,会调用operationComplete
public void operationComplete(ChannelFuture future) throws Exception{
Channel channel = future.channel();
log.debug("{}", channel);
channel.writeAndFlush("hello word");
}
});
}
}
ChannelFuture关闭问题
// 获取CloseFuture对象,在关闭发生以后,所做的处理,有两种处理方法:1、同步处理关闭2、异步处理关闭
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>(){
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception{
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture.sync().channel();
log.debug("{}", channel);
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while(true){
String line = scanner.nextLine();
if("q".equals(line)){
channel.close; // close异步操作
// log.debug("处理关闭后的操作"); // 不能在这里善后,由于close异步
break;
}
channel.writeAndFlush(line);
}
}).start();
// 获取ClosedFuture对象, 1)同步处理关闭 2)异步处理关闭
ChannelFuture closeFuture = channel.closeFuture();
/**
* closeFuture.sync(); // 同步处理
* log.debug("这里处理关闭之后的操作");
*/
closeFuture.addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception{
log.debug("处理关闭之后的操作"); // 异步处理
group.shutdownGracefully(); // 优雅停止线程
}
});
1.3.4 Handler & Pipeline
编写自己业务都是在Handler中编写,使用Netty中自带的Handler简化工作
下面研究Pipeline中Handler的执行流程
Pipeline好比流水线,Handler好比流水线上一道道的工序,中间流动的就是需要处理的数据
ChannelHandler用来处理Channel上的各种事件,分为两种:入站Handler(做数据的读取操作)、出站Handler(做数据的写出操作),所有ChannelHandler被连成一串,就是Pipeline
- 入站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果
- 出站处理器通常是ChannelOutboundHandlerAdapter的子类,主要对协会结果进行加工
Pipeline中如果有多个Handler是如何执行的?流程是怎样的?:
@Slf4j
public class TestPipeline{
public static void main(String[] args){
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>(){
@Override
protected void initChannel(NioSocketChannel ch) throws Exception{
// 1、通过channel拿到pipeline
ChannelPipeline pipeline = ch.pipeline;
// 2、添加处理器 addLast()字面意思是把handler添加到流水线的最后一个位置,但是Netty会自动多加两个handler:head 和 tail,所加的handler是tail handler之前
// 因此,下面的pipeline结构是:head -> handler1 -> handler2 -> handler3 -> handler4 -> handler5 -> handler6 -> tail;底层是一个双向链表 入站处理器 最终结果log打印结果为: 1 2 3
// 出站处理器 只有当往channel中写入数据以后才会触发 最终log打印结果为 1 2 3 6 5 4,出站为从尾往前走
pipeline.addLast("handler1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
log.debug("1");
ByteBuf buf = (ByteBuf)msg;
String name = buf.toString(Charset.defaultCharset());
super.channelRead(ctx, name); // 调用pipeline中的下一个handler,将执行权交给下一个handler,并且会将handler的处理结果传递给下一个handler channelRead是唤醒pipeline中下一个【入站处理器】handler
// 或者使用 ctx.fireChannelRead(student);
}
});
pipeline.addLast("handler2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
// 这里msg拿到的是上一个handler处理后的name
log.debug("2");
Student student = new Student(name.toString());
super.channelRead(ctx, student);
}
});
pipeline.addLast("handler3", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
log.debug("3");
// 分配了一个byteBuffer对象 然后向其中写入字节,该字节是将字符串转换为字节数组
// ch.writeAndFlush(ctx.alloc().buffer.writeBytes("server...".getBytes())); // 这里写入操作为了触发以下的出站处理器
ctx.writeAndFlush(ctx.alloc().buffer.writeBytes("server...".getBytes())); // ctx是从当前的处理器向前寻找出站处理器,因此并不会执行handler 4 5 6 而NioSocketChannel是从tail向前找
// 可更换handler位置进行验证:head -> handler1 -> handler2 -> handler4 -> handler3 -> handler5 -> handler6 -> tail
}
});
pipeline.addLast("handler4", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
log.debug("4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("handler5", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
log.debug("5");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("handler6", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
log.debug("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
@Data
@AllArgsConstructor
static class Student{
private String name;
}
}
调试工具
如果写很多handler那么其中执行顺序各种业务比较复杂时,每次测试需要启动一个客户端服务端,所以测试代码会很多。所以使用Netty提供的EmbeddedChannel测试的channel,这样就无需启动客户端或服务端了。
@Slf4j
public class TestEmbeddedChannel{
public static void main(String[] args){
ChannelInboundHandlerAdapter h1 = channelRead(ctx, msg) -> {
log.debug("1");
super.channelRead(ctx, msg);
};
ChannelInboundHandlerAdapter h2 = channelRead(ctx, msg) -> {
log.debug("2");
super.channelRead(ctx, msg);
};
ChannelOutboundHandlerAdapter h3 = write(ctx, msg, promise) -> {
log.debug("3");
super.write(ctx, msg, promise);
};
ChannelOutboundHandlerAdapter h4 = write(ctx, msg, promise) -> {
log.debug("4");
super.write(ctx, msg, promise);
};
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 模拟入站操作
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
// 模拟出站操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("word".getBytes()));
}
}
1.3.5 ByteBuf
ByteBuf是对Nio中ByteBuffer增强 是对字节数据的封装
public class TestByteBuf{
public static void main(String[] args){
// ByteBuf中的容量是可以动态扩容的,而Netty中ByteBuffer就不可
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
sout(buf);
StringBuilder sb = new StringBuilder();
for(int i = 0; i < 300; i++){
sb.append("a");
}
buf.writeBytes(sb.toString().getBytes());
sout(buf);
}
}
直接内存 vs 堆内存
直接内存:分配效率低,读写效率高(使用的是系统内存)
堆内存:分配效率高,读写效率低
可以使用下面的代码来创建池化基于堆的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
也可以使用下面的代码来创建池化基于直接内存的ByteBuf
ByteBuf buffer = ByteBufferAllocator.DEFAULT.directBuffer(10);
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起使用。
- 直接内存对GC压力小,因为这部分内存不受JVM垃圾回收的管理,但也要注意及时主动释放。
池化 vs 非池化
池化的最大意义在于可以重用ByteBuf,优点有
- 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加GC压力。
- 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率。
- 高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置
-Dio.netty.allocator.type={unpooled|pooled}
4.1以后,非Android平台默认启用池化类实现,Android平台启用非池化实现
4.1之前,池化功能还不成熟,默认是非池化实现
ByteBuf组成
注意:
- 这些方法的未指明返回值的,其返回值都是ByteBuf,意味着可以链式调用
- 网络编程中一般采用大端方式 即不带LE 默认习惯是Big Endian
扩容
扩容规则是
- 如果写入后数据大小未超过512,则选择下一个16的正数倍,例如写入后大小为12,则扩容后capacity是16
- 如果写入后数据大小超过512,则选择下一个2^n,例如写入后大小为513,则扩容后capacity是2^10=1024(2^9=512已经不够了)
- 扩容不能超过max capacity会报错
读取
buffer.readByte();
读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分
如果需要重复读取,可以在读前先做个标记mark buffer.markReaderIndex()
重置到标记位置reset buffer.resetReaderIndex()
还有办法是采用get开头的一系列方法,这些方法不会改变read index
ByteBuf的内存回收 retain & release
由于Netty中有堆外内存的ByteBuf实现,堆外内存最好是手动来释放,而不是等GC垃圾回收。
- UnpooledHealByteBuf使用的是JVM内存,只需等GC回收内存即可
- UnpooledDirectByteBuf使用的就是直接内存了,需要特殊的方法来回收内存
- PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存
回收内存的源码实现,请关注下面方法的不同实现
protected abstract void deallocate()
Netty这里采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口
- 每个ByteBuf对象的初始计数为1
- 调用release方法计数减一,如果计数为0,ByteBuf内存被回收
- 调用retain方法计数加1,表示调用者没用完之前,其它handler即使调用了release也不会造成回收
- 当计数为0时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用
谁来负责release呢?一般不可以直接在finally中release,因为pipeline中handler可能会将其中的ByteBuf传递给下一个handler,如果finally release掉,那么下一个就无法使用了。
请思考,因为pipeline的存在,一般需要将ByteBuf传递给下一个ChannelHandler,如果在finally中release了,就时区了传递性
基本规则是,谁是最后的使用者,谁负责release,详细分析如下:
可在head 和 tail部分 release
slice
以前所说的零拷贝:主要是指文件channel向socket channel传输数据的时候,可以不经过java内存,直接从文件到socket网络设备,这样减少了多次内存复制。Netty中的零拷贝主要是减少内存复制,但不是指操作系统层面的。
【零拷贝】的体现之一,对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf并没有发生内存复制,还是使用原始的ByteBuf内存,切片后的ByteBuf维护独立的read,write指针。逻辑上的操作,并没有进行内存复制。
一般使用slilce需要搭配retain()方法使计数加1,防止被release的ByteBuf被释放
duplicate
【零拷贝】的体现之一,就好比截取了原始ByteBuf所有内容,并且没有max capacity的限制,也是与原始ByteBuf使用同一块底层内存,只是读写指针是独立的
copy
会将底层内存数据进行深拷贝,因此无论读写,都与原始ByteBuf无关
ByteBuf优势
- 池化 可以重用池中ByteBuf实例,更节约内存,减少内存溢出的可能
- 读写指针分离,不需要像ByteBuffer一样切换读写模式
- 可以自动扩容
- 支持链式调用
- 很多地方体现零拷贝,例如slice、duplicate、CompositeByteBuf
1.4 双向通信
// 编写server
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception{
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
ByteBuf buffer = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
sout(buffer.toString(Charset.defaultCharset()));
// 思考:需要释放buffer吗?
}
});
}
})
// 怎么回应呢?只需要为channelRead添加逻辑
public void channelRead(ChannelHandlerContext ctx, Object msg){
// ...
// 建议使用ctx.alloc() 创建 ByteBuf
ByteBuf response = ctx.alloc().buffer(20);
response.writeBytes("hello".getBytes());
ctx.writeAndFlush(response);
// 思考:需要释放response吗
}
// 编写client,这次使用了 channelActive事件,它会在连接建立后触发
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception{
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception{
// 建议使用 ctx.alloc() 创建 ByteBuf
ByteBuf buffer = ctx.alloc().buffer(10);
// 首次建立连接,发送hello信息
buffer.writeBytes("hello".getBytes());
ctx.writeAndFlush(buffer);
// 思考:需要释放buffer 吗?
}
});
}
})
.connect("127.0.0.1", 8080);
// 客户端接收,与服务器接收代码类似,在客户端的ChannelInboundHandlerAdapter中加入channelRead事件
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
ByteBuf buffer = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
sout(buffer.toString(Charset.defaultCharset()));
// 思考:需要释放buffer吗?
}
Java Socket是全双工的:在任意时刻,线路上存在A 到 B 和 B 到 A的双向信号传输,即使是阻塞IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读