网络编程基础
Netty
一、NIO基础(Non-blocking io 非阻塞IO)
1、三大组件 【管道、缓冲区、选择器】
1、1 Channel & Buffer
channel 类似于stream,它是读写数据的双向通道
,可以从channel将数据读入buffer,也可以将buffer中数据写入channel,而之前的stream要么是输入,要么是输出,channel比stream更为底层
常见的channel有:
- FileChannel 【作为文件传输通道】
- DatagramChannel 【做UDP网络传输时传输通道】
- SocketChannel 【做TCP时传输通道】
- ServerSocketChannel 【做TCP时传输通道 专用服务器】
buffer则用来缓冲读写数据,常见的buffer有
- ByteBuffer 【抽象类】
-
- MappedByteBuffer 【实现类】
- DirectByteBuffer
- HeapByteBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
1、2 Selector
2、ByteBuffer
2、1 ByteBuffer正确使用姿势
- 向buffer写入数据,例如调用 channel.read(buffer)
- 调用flip()切换至读模式
- 从buffer读取数据,例如调用buffer.get()
- 调用clear()或compact()切换至写模式 【可以通过compact() 解决黏包、半包问题(简易方式通过分隔符解决,但还有更加高效方式)】
- 重复 1-4
2、2 ByteBuffer 结构
ByteBuffer有以下重要属性
- capacity
- position
- limit
2、3 ByteBuffer常见方法
分配空间 Netty可动态修改空间大小
ByteBuffer.allocate(10); // class java.nio.HeapByteBuffer - Java堆内存 读写效率较低 受到 GC 的影响
ByteBuffer.allocateDirect(10); // class java.nio.DirectByteBuffer - 直接内存 读写效率高(少一次拷贝(零拷贝)) 使用的是系统内存,不会受到Java垃圾回收的影响,但是分配的效率低,如果使用不当会造成内存泄漏,使用后需要合理释放【Netty中对其进行了封装,效率有较大提升】
向buffer读写数据
/** 向buffer中写入数据
* 有两种办法
* * 调用 channel 的 read() 方法
* * 调用 buffer 自己的 put() 方法
*/
int readByte = channel.read(buf);
// 和
buf.put((byte) 127);
/** 从 buffer 读取数据
* 同样有两种办法
* * 调用 channel 的write方法
* * 调用 buffer 自己的 get方法
*/
int writeBytes = channel.write(buf);
// 和
byte b = buf.get();
/**
* get 方法会让 position 读指针向后走,如果想重复读取数据
* 可以调用rewind方法将position重新置位0
* 或者可以调用get(int i)方法获取索引 i 的内容,它不会移动读指针
*/
/**
* rewind 从头开始读
* mark & reset
* mark 做一个标记,记录position位置,reset是将position重置到 mark 的位置
*/
字符串与ByteBuffer互转
// 1、字符串转为ByteBuffer
ByteBuffer buffer1 = ByteBuffer.allocate(16);
buffer1.put("hello".getBytes());
showBuf(buffer1);
// 2、Charset
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("helloWord");
showBuf(buffer2);
// 3、wrap
ByteBuffer buffer3 = ByteBuffer.wrap("helloWord".getBytes());
showBuf(buffer3);
// 4、转为字符串
String str1 = StandardCharsets.UTF_8.decode(buffer2).toString();
sout(str1);
buffer1.flip();
String str2 = StandardCharsets.UTF_8.decode(buffer1).toString();
sout(Str2);
2、4 Scattering Reads
分散读取【eg: onetwothree】 减少数据在buffer中的拷贝
// 以下try catch结构自动关闭文件处理
try(FileChannel channel = new RandomAccessFile(name:"words.txt", mode:"r").getChannel()){
ByteBuffer b1 = ByteBuffer.allocate(3);
ByteBuffer b2 = ByteBuffer.allocate(3);
ByteBuffer b3 = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{b1, b2, b3});
b1.flip();
b2.flip();
b3.flip();
showBuf(b1);
showBuf(b2);
showBuf(b3);
}catch(IOException e){
}
2、5 Gathering Writes
集中写入 【eg:helloworld你好】减少数据在buffer中的拷贝
ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello");
ByteBuffer b1 = StandardCharsets.UTF_8.encode("world");
ByteBuffer b1 = StandardCharsets.UTF_8.encode("你好");
try(FileChannel channel = new RandomAccessFile("words2.txt", "rw").getChannel()){
channel.write(new ByteBuffer[]{b1, b2, b3});
}catch(IOException){
}
3、文件编程
3、1 FileChannel
注:
FileChannel 只能工作在阻塞模式下(即不能配合Selector使用)
获取
不能直接打开FileChannel,必须通过FileInputStream、FileOutputStream 或者 RandomAccessFile来获取 FileChannel,它们都有getChannel 方法
- 通过 FileInputStream 获取的 channel 只能读
- 通过 FileOutputStream 获取的 channel 只能写
- 通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile时的读写模式决定
读取
会从 channel 读取数据填充 ByteBuffer,返回值表示读到了多少字节,-1表示到达了文件的末尾
int readBytes = channel.read(buffer);
写入
正确的写入姿势如下:
在while 中调用 channel.write 是因为 write 方法并不能保证一次将buffer中的内容全部写入channel
ByteBuffer buffer = ...;
buffer.put(...); // 存入数据
buffer.flip(); // 切换读模式
while(buffer.hasRemaining){
channel.write(buffer);
}
关闭
channel 必须关闭,不过调用了FileInputStream、FileOutputStream 或者 RandomAccessFile的close方法会间接地调用channel的close方法
位置
大小
size方法获取文件大小
强制写入
操作系统出于性能考虑,会将数据缓存,不是立刻写入磁盘,可以调用force(true) 方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘
3、2 两个Channel传输数据
// 即数据拷贝
try(
FileChannel from = new FileInputStream("data.txt").getChannel();
FileChannel to = new FileOutputStream("to.txt").getChannel();
){
// from.transferTo(0, from.size(), to);
// 只要带有transferTo采用零拷贝底层进行优化 效率高,但是传输的文件最大只能是2G数据,所以多次传输改进如下
long size = from.size();
// left 变量代表还剩余多少字节
for(long left = size; left > 0; ){
left -= from.transferTo((size-left), left, to);
}
} catch(IOException e){
e.printStackTrace();
}
3、3 Path
jdk7引入了Path 和 Paths类
- Path用来表示文件路径
- Paths是工具类,用来获取Path实例
Path source = Paths.get("1.txt"); // 相对路径 使用 user.dir 环境变量来定位 1.txt
Path source = Paths.get("d:\\1.txt"); // 绝对路径 代表了 d:\1.txt
Path source = Paths.get("d:/1.txt"); // 绝对路径 代表了 d:\1.txt
Path projects = Paths.get("d:\\data", "projects"); // 代表了 d:\data\projects
3、4 Files
检查文件是否存在
Path path = Paths.get("helloword/data.txt");
sout(Files.exists(path));
创建一级目录
Path path = paths.get("helloword/d1");
Files.createDirectory(path);
- 如果目录已经存在,会抛异常 FileAlreadyExistsException
- 不能一次创建多级目录,否则会抛异常 NoSuchFileException
创建多级目录
Path path = Paths.get("helloword/d1/d2");
Files.createDirectories(path);
拷贝文件
Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/target.txt");
Files.copy(source, target);
- 如果文件已经存在,会抛异常 FileAlreadyExistsException
如果希望用source覆盖掉target,需要用StandardCopyOption来控制
File.copy(source, target, StandardCopyOption.REPLACE_EXISTION);
移动文件
删除文件
删除目录
通过访问者模式实现文件、目录的操作:
- 遍历目录下的所有文件即目录:
public boolean traverseDirectories(String directoryPath){
try {
AtomicInteger dirCount = new AtomicInteger();
AtomicInteger fileCount = new AtomicInteger();
Files.walkFileTree(Paths.get(directoryPath), new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
System.out.println("========>"+dir);
dirCount.incrementAndGet();
return super.preVisitDirectory(dir, attrs);
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
System.out.println(file);
fileCount.incrementAndGet();
return super.visitFile(file, attrs);
}
});
System.out.println("dir count is "+dirCount);
System.out.println("file count is "+fileCount);
return true;
} catch (IOException e) {
log.error("拷贝目录失败!原因是:"+e.getMessage());
return false;
}
}
- 拷贝目录及其下面所有文件与目录
public boolean copyDirectories(String source, String target){
try {
Files.walk(Paths.get(source)).forEach(path -> {
try {
String targetName = path.toString().replace(source, target);
if (Files.isDirectory(path)){
Files.createDirectory(Paths.get(targetName));
}
if (Files.isRegularFile(path)){
Files.copy(path, Paths.get(targetName));
}
} catch (IOException e) {
log.error("子目录或文件拷贝失败,原因是:"+e.getMessage());
return;
}
});
return true;
} catch (IOException e) {
log.error("拷贝目录失败,原因是:"+e.getMessage());
return false;
}
}
- 删除目录及其下面的所有文件与目录
public boolean deleteDirectories(String directoryPath){
try {
Files.walkFileTree(Paths.get(directoryPath), new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
System.out.println("========>进入"+dir);
return super.preVisitDirectory(dir, attrs);
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
System.out.println(file);
Files.delete(file);
return super.visitFile(file, attrs);
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
System.out.println("<========退出"+dir);
Files.delete(dir);
return super.postVisitDirectory(dir, exc);
}
});
return true;
} catch (Exception e) {
log.error("删除目录失败!原因是:"+e.getMessage());
return false;
}
}
4、网络编程
4、1 非阻塞 与 阻塞
阻塞
-
阻塞模式下,相关方法都会导致线程暂停
- ServerSocketChannel.accept会在没有连接建立时让线程暂停
- SocketChannel.read会在没有数据可读时让线程暂停
- 阻塞的表现其实就是线程暂停了,暂停期间不会占用CPU,但线程相当于闲置
-
单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
-
但多线程下,有新的问题,体现在以下方面:
- 32位JVM 一个线程 320k,64位JVM 一个线程1024k,如果连接数过多,必然导致OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
- 可以采用线程池技术来减少线程数和线程上下文切换,但即使使用了线程池,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长链接,只适合短连接
非阻塞
-
非阻塞模式下,相关方法都不会让线程暂停
- 在ServerSocketChannel.accept在没有连接建立时,会返回null,继续运行
- SocketChannel.read在没有数据可读时,会返回0,但线程不必阻塞,可以去执行其它SocketChannel的read或是去执行ServerSocketChannel.accept
- 写数据时,线程只是等待数据写入Channel即可,无需等Channel通过网络把数据发送出去
-
但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了CPU
-
数据复制过程中,线程实际还是阻塞的(AIO改进的地方)
多路复用 (Selector)是**同步**的:一个线程使用selector完成多个事件监控
线程必须配合 Selector才能完成对多个 Channel可读写事件的监控,这称为多路复用
- 多路复用仅针对网络IO、普通文件IO没法利用多路复用
- 如果不用 Selector的非阻塞模式,那么 Channel读取到的字节很多时候都是0,而Selector能够保证
- 有可连接事件时才去连接
- 有可读事件才去读取
- 有可写事件才去写入
- 限于网络传输能力,Channel未必时时可写,一旦Channel可写,会触发Selector的可写事件
阻塞示例:单线程方式
/**
* 服务端
*/
@Slf4j
public Class Server{
public static void main(String[] args) throws IOException{
// 使用NIO来理解阻塞模式,单线程 处理多个客户端连接
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while(true){
// 4. accept 建立与客户端连接,SocketChannel 用来与客户端之间通讯
log.debug("conecting...");
SocketChannel sc = ssc.accept(); // accept默认阻塞的 阻塞方法 线程停止运行 直到client连接
log.debug("connected...{}", sc);
channels.add(sc);
for(SocketChannel channel: channels){
// 5. 接收客户端发送的数据
log.debug("before read... {}", sc);
channel.read(buffer); // read()也是阻塞方法 线程停止运行 直到client向server端发送数据 【clent端debug模式下右键--Evaluate Expression可执行一个表达式发送数据--code fragment下执行sc.write(Charset.defaultCharset().encode("hello"));--Evaluate】 客户端执行一次向服务端发送数据后就无法再一次发送数据,因为服务端已经又再次阻塞到accept()方法处了。 除非新建立一个客户端,可在Edit Configuration中配置勾选Allow parallel run则可允许多个客户端并行运行
buffer.flip();
sout(buffer);
buffer.clear();
log.debug("after read... {}", channel);
}
}
}
}
/**
* 客户端
*/
public class Client{
public static void main(String[] args) throws IOException{
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
sout("waiting");
}
}
// 总结: 所以单线程下并不能很好工作,一个方法的调用会影响另一个方法的调用,即在做一功能的时候就不能执行另一功能,accept()时就不能read(),read()时又阻塞不能执行其他功能,所以单线程阻塞不合实际。除非每建立一个连接就新建一个线程即可解决,但是现在这样一个线程处理多个连接就不合实际了,阻塞模式在该情况下的问题
非阻塞示例
/**
* 服务端
*/
@Slf4j
public Class Server{
public static void main(String[] args) throws IOException{
// 使用NIO来理解阻塞模式,单线程 处理多个客户端连接
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 默认true阻塞模式 修改为false非阻塞模式 即accept()方法为非阻塞 线程还会继续运行 如果没有accept建立会返回NULL
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while(true){
// 4. accept 建立与客户端连接,SocketChannel 用来与客户端之间通讯
// log.debug("conecting...");
SocketChannel sc = ssc.accept(); // accept默认阻塞的 阻塞方法 线程停止运行 直到client连接
if(sc != null){
log.debug("connected...{}", sc);
sc.configureBlocking(false); // 设置SocketChannel非阻塞 read()方法非阻塞
channels.add(sc);
}
for(SocketChannel channel: channels){
// 5. 接收客户端发送的数据
// log.debug("before read... {}", sc);
int read = channel.read(buffer); // read()非阻塞 线程仍在运行 如果没有读到新数据 read返回0
if(read > 0){
buffer.flip();
sout(buffer);
buffer.clear();
log.debug("after read... {}", channel);
}
}
}
}
}
/**
* 客户端
*/
public class Client{
public static void main(String[] args) throws IOException{
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
sout("waiting");
}
}
// 总结: 非阻塞模式下 单线程依然可以正确处理多client,但是服务端一直循环增大server压力,现实不合理,现实中不可这样使用。改进需使用到Selector【Selector管理这些Channel,并且管理监测这些Channel上是否有事件发生】
NIO-Selector处理Accept与Read \用完key需remove\处理客户端断开问题
/**
* 服务端 Selector必须配合非阻塞一起用
*/
@Slf4j
public Class Server{
public static void main(String[] args) throws IOException{
// 1.创建Selector来管理多个Channel
Selector selector = Selector.open();
ByteBuffer buffer = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 2.调用Channel的方法建立Selector与Channel之间的联系【Channel注册到Selector上】
// SelectionKey 是事件发生后,通过它可以知道事件和哪个Channel发生的事件 【事件有四种类型 一、accept 会在有连接请求时触发 二、connect 是客户端的,连接建立后触发 三、read 可读事件 四、可写事件】
SelectionKey sscKey = ssc.register(selector, 0, null); // 第二个参数0表示不关注任何事件
// 该key只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key:{}", sscKey);
ssc.bind(new InetSocketAddress(8080));
List<SocketChannel> channels = new ArrayList<>();
while(true){
// 其中看是否有事件发生 3.select()方法
selector.select(); // 该方法解决非阻塞问题:即没有事件发生会让线程阻塞,当四种事件之一发生了那么会继续向下运行 注!:select在事件未处理时,它不会阻塞,但是如果就是不需要处理,那么可以使用cancel()方法(即select()方法事件发生后要么处理要么取消不能置之不理)
/**
* select()何时不阻塞:
* + 事件发生时
* - 客户端发起连接请求,会触发accept事件
* - 客户端发送数据过来,客户端正常、异常关闭时,都会触发read事件,另外如果发送的数据大于buffer缓冲区,会触发多次读取事件
* - channel可写,会触发write事件
* - 在linux下nio bug发生时
* + 调用selector.wakeup()
* + 调用selector.close()
* + selector所在线程interrupt打断
*/
// 4.处理事件
//selector.selectedKeys(); // 拿到所有的事件集合,包含所有发生的事件,要删除集合中元素需采用迭代器删除
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // 需要根据客户端不同的事件类型做不同的处理,所以需要区分事件类型
while(iter.hasNext){
SelectionKey key = iter.next(); // 这里的key为sscKey
log.debug("key:{}".key);
// 5.区分事件类型
if(key.isAcceptable){ // accept事件
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false); // 如果第一次事件发生后已经处理过了,再次执行到该位置时去处理事件那么会报空指针异常,所以当处理完事件后需手动移除事件
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
}else if(key.isReadable){ // read事件
try{
SocketChannel channel = (SocketChannel)key.channel();// 拿到触发事件的channel
ByteBUffer buffer = ByteBuffer.allocate(16);
int read = channel.read(buffer); // 若客户端强退,会触发read事件,该处会报错,捕获异常后会循环调用,key需要处理,则cancel()反注册调。 注!:无论客户端是正常断开还是异常断开都会触发读事件,如果正常断开则不走catch块,read()会返回-1
if(read == -1){
key.cancel();
}else{
buffer.flip();
sout(Charset.defaultCharset().decode(buffer));
}
} catch(Exception e){
key.cancel(); // 客户端断开 需要将该key反注册
}
}
iter.remove(); // 必不可少,用完key需remove
}
}
}
}
/**
* 客户端
*/
public class Client{
public static void main(String[] args) throws IOException{
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
sout("waiting");
}
}
NIO-Selector处理消息边界问题
- 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽。
- 另一种思路是按分隔符拆分,缺点是效率低
- TLV格式,即Type类型、Length长度、Value数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的buffer,缺点是buffer需要提前分配,如果内容过大,则影响server吞吐量
- Http1.1是TLV格式
- Http2.0是LTV格式
处理消息边界-附件与扩容
/**
* 服务端 Selector必须配合非阻塞一起用
*/
@Slf4j
public Class Server{
public static void main(String[] args) throws IOException{
Selector selector = Selector.open();
ByteBuffer buffer = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key:{}", sscKey);
ssc.bind(new InetSocketAddress(8080));
List<SocketChannel> channels = new ArrayList<>();
while(true){
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext){
SelectionKey key = iter.next(); // 这里的key为sscKey
log.debug("key:{}".key);
if(key.isAcceptable){
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBUffer buffer = ByteBuffer.allocate(16); // 每个SocketChannel都需要有一个独有的ByteBuffer 所以需要用到附件attachment,即在ServerSocketChannel中register第三个参数处
// 将一个byteBuffer作为附件关联到selectionKey上
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
}else if(key.isReadable){
try{
SocketChannel channel = (SocketChannel)key.channel();
// 获取selectionKey 上关联的附件
ByteBuffer buffer = (ByteBuffer)key.attachment();
int read = channel.read(buffer);
if(read == -1){
key.cancel();
}else{
//buffer.flip();
//sout(Charset.defaultCharset().decode(buffer));
split(buffer);
if(buffer.position() == buffer.limit()){ // 如果一个内容都没有被消耗读取,需要扩容
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer);
key.attach(newBuffer); // 替换key的buffer
}
}
} catch(Exception e){
key.cancel();
}
}
iter.remove();
}
}
}
}
private void split(ByteBuffer source){
source.flip();
for(int i = 0; i < source.limit(); i++){
// 找到一条完整消息
if(source.get(i) == '\n'){
int length = i + 1 - source.position();
// 把这条完整的消息存入新的ByteBuffer
ByteBuffer target = ByteBuffer.allocate(length);
// 从 source读, 向target写
for(int j = 0; j < length; j++){
target.put(source.get());
}
sout(target);
}
}
source.compact();
}
/**
* 客户端
*/
public class Client{
public static void main(String[] args) throws IOException{
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
sout("waiting");
}
}
// server只可扩容,netty实现了自适应,可扩容可缩小
ByteBuffer大小分配
服务器要关注读的事件,需要从两个方面进行考虑,第一个是处理消息的边界,只要是TCP编程都需考虑消息边界;第二个是ByteBuffer合理分配问题
- 每个channel都需要记录可能被切分的消息,因为ByteBuffer不是线程安全的,不能被多个channel共同使用,因此需要为每个channel维护一个独立的ByteBuffer
- ByteBuffer不能太大,比如一个ByteBUffer 1MB的话,要支持百万连接就要1TB的内存,因此需要设计大小可变的ByteBuffer
- 一种思路是首先分配一个较小的buffer,例如4k,如果数据不够,再分配8k的buffer,将4kbuffer内容拷贝至8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能
- 另一种思路是用多个数组组成buffer,一个数组不够,把多出来的内容写入新的数组,与掐面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗
NIO-Selector 写入内容过多问题
/**
* 服务端向客户端写入数据过多示例
*/
public class WriteServer{
public static void main(String[] args) throws IOException{
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while(true){
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
if(key.isAcceptable()){
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// 1.向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for(int i = 0; i < 30000000; i++){
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
while(buffer.hasRemaining()){
// 2.返回值为实际写入的字节数
int write = sc.write(buffer);
}
}
}
}
}
}
/**
* 客户端
*/
public class WriteClient{
public static void main(String[] args){
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
// 3.接收数据
int count = 0;
while(true){
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
sout(count);
buffer.clear();
}
}
}
// 服务器无法一次将大量数据发送给客户端,while中虽然可以根据是否发送完不断循环,但是效率并不高:只要没发完就不断循环,相当于卡在了该ServerSocketChannel上了,如果其他ServerSocketChannel来了,那么会执行不了。会多次对write()循环返回多次0,表示缓冲区已满,但是依然在不断循环,在这个时间段内其实可以去执行其它事件操作比如读操作。下面优化:
/**
* 服务端向客户端写入数据过多示例
* 对服务端的改进
*/
public class WriteServer{
public static void main(String[] args) throws IOException{
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while(true){
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
if(key.isAcceptable()){
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
// 1.向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for(int i = 0; i < 30000000; i++){
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 2.返回值为实际写入的字节数
int write = sc.write(buffer);
sout(write);
// 3.判断是否有剩余内容
if(buffer.hasRemaining()){
// 4.关注可写事件 1 4
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
// sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE);
// 5.把未写完的数据挂到sckey
scKey.attach(buffer);
}
}else if(key.isWritable){
ByteBuffer buffer = (ByteBuffer)key.attachment();
SocketChannel sc = (SocketChannel)key.channel();
int write = sc.write(buffer);
sout(write);
// 6.清理操作
if(!buffer.hasRemaining){
key.attach(null); // 需要清除buffer
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); // 不需关注可写事件
}
}
}
}
}
}
利用多线程优化
现在都是多核CPU,设计时要充分考虑别让CPU白白浪费
前面的代码只有一个选择器,没有充分利用多核CPU,如何改进?
分两组选择器:
- 单线程配一个选择器,专门处理accept事件
- 创建CPU核心数的线程,每个线程配一个选择器,轮流处理read事件
@Slf4j
public class MultiThreadServer{
public static void main(String[] args) throws IOException{
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
// 1. 创建固定数量的worker 并初始化
// Worker worker = new Worker("worker-0");
// 多 worker
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()]; // 设置为CPU的核心数
/**
* 如何拿到CPU的个数
* + Runtime.getRuntime().availableProcessors()如果工作在docker容器下,因为容器不是物理隔离的,会拿到物理CPU个数,而不是容器中申请时的个数
* 这个问题在jdk10中修复,使用JVM参数UseContainerSupport配置,默认开启
*/
/**
* 如果IO比较频繁,CPU用的少,这时参考amd定律,一般大于CPU核心数
*/
for(int i = 0; i < workers.length; i++){
workers[i] = new Worker("worker-".concat(Integer.toString(i)));
}
// worker.register();
AtomicInteger index = new AtomicInteger();
while(true){
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while(iter.hasNext){
SelectionKey key = iter.next();
iter.remove();
if(key.isAcceptable()){
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected...{}", sc.getRemoteAddress());
// 2.关联selectors
log.debug("before register...{}", sc.getRemoteAddress());
// round robin 负载均衡 轮询算法
workers[index.getAndIncrement() % workers.length].register(sc);
//worker.register(sc); // 被boss线程调用
// sc.register(worker.selector, SelectionKey.OP_READ, null);
log.debug("after register...{}", sc.getRemoteAddress());
}
}
}
}
@Data
static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false; // 还未初始化
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>(); // 只要在两个线程之间传递数据的话,可以使用队列作为数据通道
public Worker(String name){
this.name = name;
}
// 初始化线程 和 selector
public void register(SocketChannel sc) throws IOException{
// 只执行一遍
if(!start){
thread = new Thread(this, name);
thread.start();
selector = Selector.open();
start = true;
}
// 向队列中添加了任务,但这个任务并没有立刻执行
queue.add(() -> {
try{
sc.register(selector, SelectionKey.OP_READ, null); // boss
}catch(){
e.printStackTrace();
}
});
// sc.register(selector, SelectionKey.OP_READ, null); // boss线程中执行
selector.wakeup(); //唤醒select 方法
}
@Override
public void run(){
while(true){
try{
selector.select(); // worker-0 阻塞
Runnable task = queue.poll(); // 队列中取出其他线程数据执行任务
if(task != null){
task.run(); // 执行了sc.register(selector, SelectionKey.OP_READ, null);
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
if(key.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel)key.channel();
log.debug("read...{}", channel.getRemoteAddress());
channel.read(buffer);
buffer.flip();
sout(buffer);
}
}
}catch(IOException e){
e.printStackTrace();
}
}
}
}
}
5、NIO 与 BIO
5.1 stream vs channel
- stream不会自动缓冲数据,channel会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
- stream仅支持阻塞API,channel同时支持阻塞、非阻塞API,网络channel可配合selector实现多路复用
- 二者均为全双工,即读写可以同时进行
5.2 IO模型
同步阻塞、同步非阻塞、多路复用、异步阻塞、异步非阻塞
当调用一次channel.read或stream.read后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:
- 等待数据阶段
- 复制数据阶段
- 同步:线程自己去获取结果(一个线程)
- 异步:线程自己不去获取结果,而是由其他线程送结果(至少两个线程)
五种网络模型:参考UNIX网络编程
- 阻塞IO
- 非阻塞IO 【用户线程在内核等待数据中是非阻塞的,在内核空间拷贝数据时,用户线程是阻塞的!】
- 多路复用
- 信号驱动
5.3 零拷贝
传统IO问题
传统的IO将一个文件通过socket写出,内部工作流程是这样:
即一个读写操作执行了4次数据拷贝,3次用户态与内核态之间的切换。效率低,以下为优化:
NIO优化
通过DirectByteBuf
- ByteBuffer.allocate(10) HeapByteBuffer 使用的还是java内存
- ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存
减少了一次数据的拷贝
这里java可以使用DirectByteBuf将堆外内存映射到JVM内存中来直接访问使用
- 这块内存不受JVM垃圾会受的影响,因此内存地址固定,有助于IO读写
- java中的DirectByteBuf对象仅维护了此内存的虚引用,内存回收分为两步
- DirectByteBuf对象被垃圾回收,将虚引用加入引用队列
- 通过专门线程访问引用队列,根据虚引用释放堆外内存
- 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
进一步优化(底层采用了linux2.1后提供的sendFile方法),java中对应着两个channel调用transferTo/transferFrom方法拷贝数据
1、java调用transferTo方法后,要从java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用CPU
2、数据从内核缓冲区传输到socket缓冲区,CPU会参与拷贝
3、最后使用DMA将socket缓冲区的数据写入网卡,不会使用CPU
可以看到:
- 只发生了一次用户态与内核态的切换
- 数据拷贝了3次
再进一步优化 (linux2.4)
1、java调用transferTo方法后,要从java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用CPU
2、只会将一些offset和length信息考入socket缓冲区,几乎无消耗
3、使用DMA将内核缓冲区的数据写入网卡,不会使用CPU
整个过程中仅发生了一次用户态与内核态的切换,数据拷贝了2次,所谓的【零拷贝】,并不是真正的无拷贝,而是不会拷贝重复数据到JVM内存中,零拷贝的优点有:
- 更少的用户态与内核态的切换
- 不利用CPU计算,减少CPU缓存伪共享
- 零拷贝适合小文件传输
6、AIO
AIO用来解决数据复制阶段的阻塞问题
- 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
- 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果
异步模型需要底层操作系统(kernel)提供支持
- windows系统通过了IOCP实现了真正的异步IO
- Linux系统异步IO在2.6版本引入,但其底层还是用多路复用模拟了异步IO,性能没有优势
文件AIO示例:
@Slf4j
public class AioFileChannel{
public static void main(String[] args){
try(AsynchronousFileChannel channel = AsyncahronousFileChannel.open(Paths.get("data.txt"), StandarOpenOption.READ)){
// 参数1 ByteBUffer
// 参数2 读取的起始位置
// 参数3 附件
// 参数4 回调对象 CompletionHandler
ByteBuffer buffer = ByteBuffer.allocate(16);
log.debug("read begin...");
channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>(){
@Override // read success
public void completed(Integer result, ByteBuffer attachment){
log.debug("read completed...{}", result);
attachment.flip();
sout(attachment);
}
@Override // read fail
public void failed(Throwable exc, ByteBuffer attachment){
exc.printStackTrace();
}
});
log.debug("read end...");
/**
* 这里主线程结束了,用来返回数据的守护线程也会跟着结束!!!
*
*/
}catch(IOException e){
e.printStackTrace();
}
System.in.read();// 防止主线程退出
}
}