优秀是一种习惯!!!
Solo  当前访客:0 开始使用

AmethystFOB

记录精彩人生

网络编程基础

2023-11-28 13:25:14 amethystfob
0  评论    0  浏览

Netty

一、NIO基础(Non-blocking io 非阻塞IO)

1、三大组件 【管道、缓冲区、选择器】

1、1 Channel & Buffer

channel 类似于stream,它是读写数据的双向通道,可以从channel将数据读入buffer,也可以将buffer中数据写入channel,而之前的stream要么是输入,要么是输出,channel比stream更为底层

常见的channel有:

  • FileChannel 【作为文件传输通道】
  • DatagramChannel 【做UDP网络传输时传输通道】
  • SocketChannel 【做TCP时传输通道】
  • ServerSocketChannel 【做TCP时传输通道 专用服务器】

buffer则用来缓冲读写数据,常见的buffer有

  • ByteBuffer 【抽象类】
    • MappedByteBuffer 【实现类】
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

1、2 Selector

2、ByteBuffer

2、1 ByteBuffer正确使用姿势

  1. 向buffer写入数据,例如调用 channel.read(buffer)
  2. 调用flip()切换至读模式
  3. 从buffer读取数据,例如调用buffer.get()
  4. 调用clear()或compact()切换至写模式 【可以通过compact() 解决黏包、半包问题(简易方式通过分隔符解决,但还有更加高效方式)】
  5. 重复 1-4

2、2 ByteBuffer 结构

ByteBuffer有以下重要属性

  • capacity
  • position
  • limit

2、3 ByteBuffer常见方法

分配空间 Netty可动态修改空间大小

ByteBuffer.allocate(10);		// class java.nio.HeapByteBuffer   - Java堆内存 读写效率较低 受到 GC 的影响
ByteBuffer.allocateDirect(10);	// class java.nio.DirectByteBuffer - 直接内存 读写效率高(少一次拷贝(零拷贝)) 使用的是系统内存,不会受到Java垃圾回收的影响,但是分配的效率低,如果使用不当会造成内存泄漏,使用后需要合理释放【Netty中对其进行了封装,效率有较大提升】



向buffer读写数据

/** 向buffer中写入数据
 *  有两种办法
 * 		* 调用 channel 的 read() 方法
 *      * 调用 buffer 自己的 put() 方法
 */
int readByte = channel.read(buf);
// 和
buf.put((byte) 127);
/** 从 buffer 读取数据
 *  同样有两种办法
 * 		* 调用 channel 的write方法
 * 		* 调用 buffer 自己的 get方法
 */
int writeBytes = channel.write(buf);
// 和
byte b = buf.get();
/**
 * get 方法会让 position 读指针向后走,如果想重复读取数据
 * 可以调用rewind方法将position重新置位0
 * 或者可以调用get(int i)方法获取索引 i 的内容,它不会移动读指针
 */

/**
 * rewind 从头开始读 
 * mark & reset
 * mark 做一个标记,记录position位置,reset是将position重置到 mark 的位置
 */

字符串与ByteBuffer互转

// 1、字符串转为ByteBuffer
ByteBuffer buffer1 = ByteBuffer.allocate(16);
buffer1.put("hello".getBytes());
showBuf(buffer1);

// 2、Charset
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("helloWord");
showBuf(buffer2);

// 3、wrap
ByteBuffer buffer3 = ByteBuffer.wrap("helloWord".getBytes());
showBuf(buffer3);

// 4、转为字符串
String str1 = StandardCharsets.UTF_8.decode(buffer2).toString();
sout(str1);

buffer1.flip();
String str2 = StandardCharsets.UTF_8.decode(buffer1).toString();
sout(Str2);

2、4 Scattering Reads

分散读取【eg: onetwothree】 减少数据在buffer中的拷贝

// 以下try catch结构自动关闭文件处理
try(FileChannel channel = new RandomAccessFile(name:"words.txt", mode:"r").getChannel()){
    ByteBuffer b1 = ByteBuffer.allocate(3);
    ByteBuffer b2 = ByteBuffer.allocate(3);
    ByteBuffer b3 = ByteBuffer.allocate(5);
    channel.read(new ByteBuffer[]{b1, b2, b3});
    b1.flip();
    b2.flip();
    b3.flip();
    showBuf(b1);
    showBuf(b2);
    showBuf(b3);
}catch(IOException e){
  
}

2、5 Gathering Writes

集中写入 【eg:helloworld你好】减少数据在buffer中的拷贝

ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello");
ByteBuffer b1 = StandardCharsets.UTF_8.encode("world");
ByteBuffer b1 = StandardCharsets.UTF_8.encode("你好");
try(FileChannel channel = new RandomAccessFile("words2.txt", "rw").getChannel()){
    channel.write(new ByteBuffer[]{b1, b2, b3});
}catch(IOException){
  
}

3、文件编程

3、1 FileChannel

注:

FileChannel 只能工作在阻塞模式下(即不能配合Selector使用)

获取

不能直接打开FileChannel,必须通过FileInputStream、FileOutputStream 或者 RandomAccessFile来获取 FileChannel,它们都有getChannel 方法

  • 通过 FileInputStream 获取的 channel 只能读
  • 通过 FileOutputStream 获取的 channel 只能写
  • 通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile时的读写模式决定

读取

会从 channel 读取数据填充 ByteBuffer,返回值表示读到了多少字节,-1表示到达了文件的末尾

int readBytes = channel.read(buffer);

写入

正确的写入姿势如下:
在while 中调用 channel.write 是因为 write 方法并不能保证一次将buffer中的内容全部写入channel

ByteBuffer buffer = ...;
buffer.put(...); // 存入数据
buffer.flip();	 // 切换读模式
while(buffer.hasRemaining){
    channel.write(buffer);
}

关闭

channel 必须关闭,不过调用了FileInputStream、FileOutputStream 或者 RandomAccessFile的close方法会间接地调用channel的close方法

位置

大小
size方法获取文件大小

强制写入

操作系统出于性能考虑,会将数据缓存,不是立刻写入磁盘,可以调用force(true) 方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘

3、2 两个Channel传输数据

// 即数据拷贝
try(
	FileChannel from = new FileInputStream("data.txt").getChannel();
    FileChannel to = new FileOutputStream("to.txt").getChannel();
){
//    from.transferTo(0, from.size(), to);
    // 只要带有transferTo采用零拷贝底层进行优化 效率高,但是传输的文件最大只能是2G数据,所以多次传输改进如下
    long size = from.size();
    // left 变量代表还剩余多少字节
    for(long left = size; left > 0; ){
        left -= from.transferTo((size-left), left, to);
    }
} catch(IOException e){
    e.printStackTrace();
}

3、3 Path

jdk7引入了Path 和 Paths类

  • Path用来表示文件路径
  • Paths是工具类,用来获取Path实例
Path source = Paths.get("1.txt");	// 相对路径 使用 user.dir 环境变量来定位 1.txt
Path source = Paths.get("d:\\1.txt");	// 绝对路径 代表了 d:\1.txt
Path source = Paths.get("d:/1.txt");	// 绝对路径 代表了 d:\1.txt
Path projects = Paths.get("d:\\data", "projects");	// 代表了 d:\data\projects

3、4 Files

检查文件是否存在

Path path = Paths.get("helloword/data.txt");
sout(Files.exists(path));

创建一级目录

Path path = paths.get("helloword/d1");
Files.createDirectory(path);
  • 如果目录已经存在,会抛异常 FileAlreadyExistsException
  • 不能一次创建多级目录,否则会抛异常 NoSuchFileException

创建多级目录

Path path = Paths.get("helloword/d1/d2");
Files.createDirectories(path);

拷贝文件

Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/target.txt");
Files.copy(source, target);
  • 如果文件已经存在,会抛异常 FileAlreadyExistsException

如果希望用source覆盖掉target,需要用StandardCopyOption来控制

File.copy(source, target, StandardCopyOption.REPLACE_EXISTION);

移动文件

删除文件

删除目录

通过访问者模式实现文件、目录的操作:

  1. 遍历目录下的所有文件即目录:
public boolean traverseDirectories(String directoryPath){
        try {
            AtomicInteger dirCount = new AtomicInteger();
            AtomicInteger fileCount = new AtomicInteger();
            Files.walkFileTree(Paths.get(directoryPath), new SimpleFileVisitor<Path>(){
                @Override
                public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                    System.out.println("========>"+dir);
                    dirCount.incrementAndGet();
                    return super.preVisitDirectory(dir, attrs);
                }

                @Override
                public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                    System.out.println(file);
                    fileCount.incrementAndGet();
                    return super.visitFile(file, attrs);
                }
            });
            System.out.println("dir count is "+dirCount);
            System.out.println("file count is "+fileCount);
            return true;
        } catch (IOException e) {
            log.error("拷贝目录失败!原因是:"+e.getMessage());
            return false;
        }
    }
  1. 拷贝目录及其下面所有文件与目录
public boolean copyDirectories(String source, String target){
        try {
            Files.walk(Paths.get(source)).forEach(path -> {
                try {
                    String targetName = path.toString().replace(source, target);
                    if (Files.isDirectory(path)){
                        Files.createDirectory(Paths.get(targetName));
                    }
                    if (Files.isRegularFile(path)){
                        Files.copy(path, Paths.get(targetName));
                    }
                } catch (IOException e) {
                    log.error("子目录或文件拷贝失败,原因是:"+e.getMessage());
                    return;
                }
            });
            return true;
        } catch (IOException e) {
            log.error("拷贝目录失败,原因是:"+e.getMessage());
            return false;
        }
    }
  1. 删除目录及其下面的所有文件与目录
public boolean deleteDirectories(String directoryPath){
        try {
            Files.walkFileTree(Paths.get(directoryPath), new SimpleFileVisitor<Path>(){
                @Override
                public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                    System.out.println("========>进入"+dir);
                    return super.preVisitDirectory(dir, attrs);
                }

                @Override
                public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                    System.out.println(file);
                    Files.delete(file);
                    return super.visitFile(file, attrs);
                }

                @Override
                public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
                    System.out.println("<========退出"+dir);
                    Files.delete(dir);
                    return super.postVisitDirectory(dir, exc);
                }
            });
            return true;
        } catch (Exception e) {
            log.error("删除目录失败!原因是:"+e.getMessage());
            return false;
        }
    }

4、网络编程

4、1 非阻塞 与 阻塞

阻塞
  • 阻塞模式下,相关方法都会导致线程暂停

    • ServerSocketChannel.accept会在没有连接建立时让线程暂停
    • SocketChannel.read会在没有数据可读时让线程暂停
    • 阻塞的表现其实就是线程暂停了,暂停期间不会占用CPU,但线程相当于闲置
  • 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持

  • 但多线程下,有新的问题,体现在以下方面:

    • 32位JVM 一个线程 320k,64位JVM 一个线程1024k,如果连接数过多,必然导致OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
    • 可以采用线程池技术来减少线程数和线程上下文切换,但即使使用了线程池,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长链接,只适合短连接
非阻塞
  • 非阻塞模式下,相关方法都不会让线程暂停

    • 在ServerSocketChannel.accept在没有连接建立时,会返回null,继续运行
    • SocketChannel.read在没有数据可读时,会返回0,但线程不必阻塞,可以去执行其它SocketChannel的read或是去执行ServerSocketChannel.accept
    • 写数据时,线程只是等待数据写入Channel即可,无需等Channel通过网络把数据发送出去
  • 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了CPU

  • 数据复制过程中,线程实际还是阻塞的(AIO改进的地方)

多路复用 (Selector)是**同步**的:一个线程使用selector完成多个事件监控

线程必须配合 Selector才能完成对多个 Channel可读写事件的监控,这称为多路复用

  • 多路复用仅针对网络IO、普通文件IO没法利用多路复用
  • 如果不用 Selector的非阻塞模式,那么 Channel读取到的字节很多时候都是0,而Selector能够保证
    • 有可连接事件时才去连接
    • 有可读事件才去读取
    • 有可写事件才去写入
      • 限于网络传输能力,Channel未必时时可写,一旦Channel可写,会触发Selector的可写事件
阻塞示例:单线程方式
/**
 * 服务端
 */
@Slf4j
public Class Server{
    public static void main(String[] args) throws IOException{
        // 使用NIO来理解阻塞模式,单线程 处理多个客户端连接
        // 0. ByteBuffer
        ByteBuffer buffer = ByteBuffer.allocate(16);
        // 1. 创建服务器
        ServerSocketChannel ssc = ServerSocketChannel.open();
        // 2. 绑定监听端口
        ssc.bind(new InetSocketAddress(8080));
        // 3. 连接集合
        List<SocketChannel> channels = new ArrayList<>();
        while(true){
            // 4. accept 建立与客户端连接,SocketChannel 用来与客户端之间通讯
            log.debug("conecting...");
            SocketChannel sc = ssc.accept();	// accept默认阻塞的 阻塞方法 线程停止运行 直到client连接
            log.debug("connected...{}", sc);
            channels.add(sc);
            for(SocketChannel channel: channels){
                // 5. 接收客户端发送的数据
                log.debug("before read... {}", sc);
                channel.read(buffer);		// read()也是阻塞方法 线程停止运行 直到client向server端发送数据 【clent端debug模式下右键--Evaluate Expression可执行一个表达式发送数据--code fragment下执行sc.write(Charset.defaultCharset().encode("hello"));--Evaluate】  客户端执行一次向服务端发送数据后就无法再一次发送数据,因为服务端已经又再次阻塞到accept()方法处了。 除非新建立一个客户端,可在Edit Configuration中配置勾选Allow parallel run则可允许多个客户端并行运行
                buffer.flip();
                sout(buffer);
                buffer.clear();
                log.debug("after read... {}", channel);
            }
        }
    }
}


/**
 * 客户端
 */
public class Client{
	public static void main(String[] args) throws IOException{
    	SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        sout("waiting");
	}  
}
// 总结: 所以单线程下并不能很好工作,一个方法的调用会影响另一个方法的调用,即在做一功能的时候就不能执行另一功能,accept()时就不能read(),read()时又阻塞不能执行其他功能,所以单线程阻塞不合实际。除非每建立一个连接就新建一个线程即可解决,但是现在这样一个线程处理多个连接就不合实际了,阻塞模式在该情况下的问题
非阻塞示例
/**
 * 服务端
 */
@Slf4j
public Class Server{
    public static void main(String[] args) throws IOException{
        // 使用NIO来理解阻塞模式,单线程 处理多个客户端连接
        // 0. ByteBuffer
        ByteBuffer buffer = ByteBuffer.allocate(16);
        // 1. 创建服务器
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false); // 默认true阻塞模式 修改为false非阻塞模式 即accept()方法为非阻塞 线程还会继续运行 如果没有accept建立会返回NULL
        // 2. 绑定监听端口
        ssc.bind(new InetSocketAddress(8080));
        // 3. 连接集合
        List<SocketChannel> channels = new ArrayList<>();
        while(true){
            // 4. accept 建立与客户端连接,SocketChannel 用来与客户端之间通讯
            // log.debug("conecting...");
            SocketChannel sc = ssc.accept();	// accept默认阻塞的 阻塞方法 线程停止运行 直到client连接
			if(sc != null){
                log.debug("connected...{}", sc);
                sc.configureBlocking(false); // 设置SocketChannel非阻塞 read()方法非阻塞
            	channels.add(sc);   
            }
            for(SocketChannel channel: channels){
                // 5. 接收客户端发送的数据
                // log.debug("before read... {}", sc);
                int read = channel.read(buffer);		// read()非阻塞 线程仍在运行 如果没有读到新数据 read返回0
                if(read > 0){
                    buffer.flip();
                    sout(buffer);
                    buffer.clear();
                    log.debug("after read... {}", channel);
                }
            }
        }
    }
}


/**
 * 客户端
 */
public class Client{
	public static void main(String[] args) throws IOException{
    	SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        sout("waiting");
	}  
}
// 总结: 非阻塞模式下 单线程依然可以正确处理多client,但是服务端一直循环增大server压力,现实不合理,现实中不可这样使用。改进需使用到Selector【Selector管理这些Channel,并且管理监测这些Channel上是否有事件发生】
NIO-Selector处理Accept与Read \用完key需remove\处理客户端断开问题
/**
 * 服务端 Selector必须配合非阻塞一起用
 */
@Slf4j
public Class Server{
    public static void main(String[] args) throws IOException{
        // 1.创建Selector来管理多个Channel
        Selector selector = Selector.open();
      
        ByteBuffer buffer = ByteBuffer.allocate(16);
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        // 2.调用Channel的方法建立Selector与Channel之间的联系【Channel注册到Selector上】
        // SelectionKey 是事件发生后,通过它可以知道事件和哪个Channel发生的事件 【事件有四种类型 一、accept 会在有连接请求时触发 二、connect 是客户端的,连接建立后触发 三、read 可读事件 四、可写事件】
        SelectionKey sscKey = ssc.register(selector, 0, null); // 第二个参数0表示不关注任何事件
        // 该key只关注accept事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        log.debug("register key:{}", sscKey);
      
        ssc.bind(new InetSocketAddress(8080));
        List<SocketChannel> channels = new ArrayList<>();
        while(true){
            // 其中看是否有事件发生 3.select()方法
            selector.select(); // 该方法解决非阻塞问题:即没有事件发生会让线程阻塞,当四种事件之一发生了那么会继续向下运行    注!:select在事件未处理时,它不会阻塞,但是如果就是不需要处理,那么可以使用cancel()方法(即select()方法事件发生后要么处理要么取消不能置之不理)
            /**
             * select()何时不阻塞:
             * + 事件发生时
             *         - 客户端发起连接请求,会触发accept事件
             *         - 客户端发送数据过来,客户端正常、异常关闭时,都会触发read事件,另外如果发送的数据大于buffer缓冲区,会触发多次读取事件
             *         - channel可写,会触发write事件
             *         - 在linux下nio bug发生时
             * + 调用selector.wakeup()
             * + 调用selector.close()
             * + selector所在线程interrupt打断
             */
            // 4.处理事件
            //selector.selectedKeys(); // 拿到所有的事件集合,包含所有发生的事件,要删除集合中元素需采用迭代器删除
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // 需要根据客户端不同的事件类型做不同的处理,所以需要区分事件类型
            while(iter.hasNext){
                SelectionKey key = iter.next(); // 这里的key为sscKey
                log.debug("key:{}".key);
                // 5.区分事件类型
                if(key.isAcceptable){ // accept事件
                    ServerSocketChannel channel = (ServerSocketChannel)key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);	// 如果第一次事件发生后已经处理过了,再次执行到该位置时去处理事件那么会报空指针异常,所以当处理完事件后需手动移除事件
                    SelectionKey scKey = sc.register(selector, 0, null);
                    scKey.interestOps(SelectionKey.OP_READ);
                }else if(key.isReadable){ // read事件
                    try{
                        SocketChannel channel = (SocketChannel)key.channel();// 拿到触发事件的channel
                        ByteBUffer buffer = ByteBuffer.allocate(16);
                        int read = channel.read(buffer); // 若客户端强退,会触发read事件,该处会报错,捕获异常后会循环调用,key需要处理,则cancel()反注册调。  注!:无论客户端是正常断开还是异常断开都会触发读事件,如果正常断开则不走catch块,read()会返回-1
                        if(read == -1){
                            key.cancel();
                        }else{
                            buffer.flip();
                            sout(Charset.defaultCharset().decode(buffer)); 
                        }
                    } catch(Exception e){
                        key.cancel(); // 客户端断开 需要将该key反注册
                    }

                }
                iter.remove(); // 必不可少,用完key需remove
            }
        }
    }
}


/**
 * 客户端
 */
public class Client{
	public static void main(String[] args) throws IOException{
    	SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        sout("waiting");
	}  
}
NIO-Selector处理消息边界问题

  • 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽。
  • 另一种思路是按分隔符拆分,缺点是效率低
  • TLV格式,即Type类型、Length长度、Value数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的buffer,缺点是buffer需要提前分配,如果内容过大,则影响server吞吐量
  • Http1.1是TLV格式
  • Http2.0是LTV格式

处理消息边界-附件与扩容
/**
 * 服务端 Selector必须配合非阻塞一起用
 */
@Slf4j
public Class Server{
    public static void main(String[] args) throws IOException{
        Selector selector = Selector.open();
      
        ByteBuffer buffer = ByteBuffer.allocate(16);
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
      
        SelectionKey sscKey = ssc.register(selector, 0, null);
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        log.debug("register key:{}", sscKey);
      
        ssc.bind(new InetSocketAddress(8080));
        List<SocketChannel> channels = new ArrayList<>();
        while(true){
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while(iter.hasNext){
                SelectionKey key = iter.next(); // 这里的key为sscKey
                log.debug("key:{}".key);
                if(key.isAcceptable){
                    ServerSocketChannel channel = (ServerSocketChannel)key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);
                    ByteBUffer buffer = ByteBuffer.allocate(16); // 每个SocketChannel都需要有一个独有的ByteBuffer  所以需要用到附件attachment,即在ServerSocketChannel中register第三个参数处
                    // 将一个byteBuffer作为附件关联到selectionKey上
                    SelectionKey scKey = sc.register(selector, 0, buffer);
                    scKey.interestOps(SelectionKey.OP_READ);
                }else if(key.isReadable){
                    try{
                        SocketChannel channel = (SocketChannel)key.channel();
                        // 获取selectionKey 上关联的附件
                        ByteBuffer buffer = (ByteBuffer)key.attachment();
                        int read = channel.read(buffer);
                        if(read == -1){
                            key.cancel();
                        }else{
                            //buffer.flip();
                            //sout(Charset.defaultCharset().decode(buffer)); 
                            split(buffer);
                            if(buffer.position() == buffer.limit()){ // 如果一个内容都没有被消耗读取,需要扩容
                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                                buffer.flip();
                                newBuffer.put(buffer);
                                key.attach(newBuffer); // 替换key的buffer
                            }
                        }
                    } catch(Exception e){
                        key.cancel();
                    }
                }
                iter.remove();
            }
        }
    }
}

private void split(ByteBuffer source){
    source.flip();
    for(int i = 0; i < source.limit(); i++){
        // 找到一条完整消息
        if(source.get(i) == '\n'){
            int length = i + 1 - source.position();
            // 把这条完整的消息存入新的ByteBuffer
            ByteBuffer target = ByteBuffer.allocate(length);
            // 从 source读, 向target写
            for(int j = 0; j < length; j++){
                target.put(source.get());
            }
            sout(target);
        }
    }
    source.compact();
}

/**
 * 客户端
 */
public class Client{
	public static void main(String[] args) throws IOException{
    	SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        sout("waiting");
	}  
}
// server只可扩容,netty实现了自适应,可扩容可缩小
ByteBuffer大小分配

服务器要关注读的事件,需要从两个方面进行考虑,第一个是处理消息的边界,只要是TCP编程都需考虑消息边界;第二个是ByteBuffer合理分配问题

  • 每个channel都需要记录可能被切分的消息,因为ByteBuffer不是线程安全的,不能被多个channel共同使用,因此需要为每个channel维护一个独立的ByteBuffer
  • ByteBuffer不能太大,比如一个ByteBUffer 1MB的话,要支持百万连接就要1TB的内存,因此需要设计大小可变的ByteBuffer
    • 一种思路是首先分配一个较小的buffer,例如4k,如果数据不够,再分配8k的buffer,将4kbuffer内容拷贝至8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能
    • 另一种思路是用多个数组组成buffer,一个数组不够,把多出来的内容写入新的数组,与掐面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗
NIO-Selector 写入内容过多问题
/**
 * 服务端向客户端写入数据过多示例
 */
public class WriteServer{
    public static void main(String[] args) throws IOException{
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));
        while(true){
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while(iter.hasNext()){
                SelectionKey key = iter.next();
                iter.remove();
                if(key.isAcceptable()){
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                  
                    // 1.向客户端发送大量数据
                    StringBuilder sb = new StringBuilder();
                    for(int i = 0; i < 30000000; i++){
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    while(buffer.hasRemaining()){
                        // 2.返回值为实际写入的字节数
                        int write = sc.write(buffer);
                    }
                }
            }
        }
    }
}

/**
 * 客户端
 */
public class WriteClient{
    public static void main(String[] args){
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
      
        // 3.接收数据
        int count = 0;
        while(true){
            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
            count += sc.read(buffer);
            sout(count);
            buffer.clear();
        }
    }
}

// 服务器无法一次将大量数据发送给客户端,while中虽然可以根据是否发送完不断循环,但是效率并不高:只要没发完就不断循环,相当于卡在了该ServerSocketChannel上了,如果其他ServerSocketChannel来了,那么会执行不了。会多次对write()循环返回多次0,表示缓冲区已满,但是依然在不断循环,在这个时间段内其实可以去执行其它事件操作比如读操作。下面优化:
/**
 * 服务端向客户端写入数据过多示例
 * 对服务端的改进
 */
public class WriteServer{
    public static void main(String[] args) throws IOException{
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));
        while(true){
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while(iter.hasNext()){
                SelectionKey key = iter.next();
                iter.remove();
                if(key.isAcceptable()){
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    SelectionKey scKey = sc.register(selector, 0, null);
                    scKey.interestOps(SelectionKey.OP_READ);
                  
                    // 1.向客户端发送大量数据
                    StringBuilder sb = new StringBuilder();
                    for(int i = 0; i < 30000000; i++){
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    // 2.返回值为实际写入的字节数
                    int write = sc.write(buffer);
                    sout(write);

                    // 3.判断是否有剩余内容
                    if(buffer.hasRemaining()){
                        // 4.关注可写事件          1                   4
                        sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
                        // sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE);
                        // 5.把未写完的数据挂到sckey
                        scKey.attach(buffer);
                    }
                }else if(key.isWritable){
                    ByteBuffer buffer = (ByteBuffer)key.attachment();
                    SocketChannel sc = (SocketChannel)key.channel();
                    int write = sc.write(buffer);
                    sout(write);
                    // 6.清理操作
                    if(!buffer.hasRemaining){
                        key.attach(null); // 需要清除buffer
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); // 不需关注可写事件
                    }
                }
            }
        }
    }
}
利用多线程优化

现在都是多核CPU,设计时要充分考虑别让CPU白白浪费

前面的代码只有一个选择器,没有充分利用多核CPU,如何改进?

分两组选择器:

  • 单线程配一个选择器,专门处理accept事件
  • 创建CPU核心数的线程,每个线程配一个选择器,轮流处理read事件
@Slf4j
public class MultiThreadServer{
    public static void main(String[] args) throws IOException{
        Thread.currentThread().setName("boss");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector boss = Selector.open();
        SelectionKey bossKey = ssc.register(boss, 0, null);
        bossKey.interestOps(SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));
        // 1. 创建固定数量的worker 并初始化
        // Worker worker = new Worker("worker-0");
      
        // 多 worker
        Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];  // 设置为CPU的核心数
        /**
         * 如何拿到CPU的个数
         * + Runtime.getRuntime().availableProcessors()如果工作在docker容器下,因为容器不是物理隔离的,会拿到物理CPU个数,而不是容器中申请时的个数
         * 这个问题在jdk10中修复,使用JVM参数UseContainerSupport配置,默认开启
         */
      
        /**
         * 如果IO比较频繁,CPU用的少,这时参考amd定律,一般大于CPU核心数
         */
        for(int i = 0; i < workers.length; i++){
            workers[i] = new Worker("worker-".concat(Integer.toString(i)));
        }
        // worker.register();
        AtomicInteger index = new AtomicInteger();
        while(true){
            boss.select();
            Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
            while(iter.hasNext){
                SelectionKey key = iter.next();
                iter.remove();
                if(key.isAcceptable()){
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    log.debug("connected...{}", sc.getRemoteAddress());
                    // 2.关联selectors
                    log.debug("before register...{}", sc.getRemoteAddress());
					// round robin 负载均衡 轮询算法
                    workers[index.getAndIncrement() % workers.length].register(sc);
                  
                    //worker.register(sc); // 被boss线程调用
                    // sc.register(worker.selector, SelectionKey.OP_READ, null);
                    log.debug("after register...{}", sc.getRemoteAddress());
                }
            }
        }
    }
  
    @Data
    static class Worker implements Runnable{
      
        private Thread thread;
      
        private Selector selector;
      
        private String name;
      
        private volatile boolean start = false; // 还未初始化
      
        private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>(); // 只要在两个线程之间传递数据的话,可以使用队列作为数据通道
      
        public Worker(String name){
            this.name = name;
        }
      
        // 初始化线程 和 selector
        public void register(SocketChannel sc) throws IOException{
            // 只执行一遍
            if(!start){
                thread = new Thread(this, name);
                thread.start();
                selector = Selector.open();
                start = true;
            }
            // 向队列中添加了任务,但这个任务并没有立刻执行
            queue.add(() -> {
                try{
                    sc.register(selector, SelectionKey.OP_READ, null); // boss
                }catch(){
                    e.printStackTrace();
                }
            });
            // sc.register(selector, SelectionKey.OP_READ, null); // boss线程中执行
            selector.wakeup(); //唤醒select 方法
        }
      
        @Override
        public void run(){
            while(true){
                try{
                    selector.select();	// worker-0 阻塞 
                    Runnable task = queue.poll();	// 队列中取出其他线程数据执行任务
                    if(task != null){
                        task.run();	// 执行了sc.register(selector, SelectionKey.OP_READ, null);
                    }
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while(iter.hasNext()){
                        SelectionKey key = iter.next();
                        iter.remove();
                        if(key.isReadable()){
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            SocketChannel channel = (SocketChannel)key.channel();
                            log.debug("read...{}", channel.getRemoteAddress());
                            channel.read(buffer);
                            buffer.flip();
                            sout(buffer);
                        }
                    }
                }catch(IOException e){
                    e.printStackTrace();
                }
            }
        }
    }
} 

5、NIO 与 BIO

5.1 stream vs channel

  • stream不会自动缓冲数据,channel会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
  • stream仅支持阻塞API,channel同时支持阻塞、非阻塞API,网络channel可配合selector实现多路复用
  • 二者均为全双工,即读写可以同时进行

5.2 IO模型

同步阻塞、同步非阻塞、多路复用、异步阻塞、异步非阻塞

当调用一次channel.read或stream.read后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

  • 等待数据阶段
  • 复制数据阶段

  • 同步:线程自己去获取结果(一个线程)
  • 异步:线程自己不去获取结果,而是由其他线程送结果(至少两个线程)

五种网络模型:参考UNIX网络编程

  • 阻塞IO
  • 非阻塞IO 【用户线程在内核等待数据中是非阻塞的,在内核空间拷贝数据时,用户线程是阻塞的!】
  • 多路复用
  • 信号驱动

5.3 零拷贝

传统IO问题

传统的IO将一个文件通过socket写出,内部工作流程是这样:

即一个读写操作执行了4次数据拷贝,3次用户态与内核态之间的切换。效率低,以下为优化:

NIO优化

通过DirectByteBuf

  • ByteBuffer.allocate(10) HeapByteBuffer 使用的还是java内存
  • ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存

减少了一次数据的拷贝

这里java可以使用DirectByteBuf将堆外内存映射到JVM内存中来直接访问使用

  • 这块内存不受JVM垃圾会受的影响,因此内存地址固定,有助于IO读写
  • java中的DirectByteBuf对象仅维护了此内存的虚引用,内存回收分为两步
    • DirectByteBuf对象被垃圾回收,将虚引用加入引用队列
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
进一步优化(底层采用了linux2.1后提供的sendFile方法),java中对应着两个channel调用transferTo/transferFrom方法拷贝数据


1、java调用transferTo方法后,要从java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用CPU
2、数据从内核缓冲区传输到socket缓冲区,CPU会参与拷贝
3、最后使用DMA将socket缓冲区的数据写入网卡,不会使用CPU
可以看到:

  • 只发生了一次用户态与内核态的切换
  • 数据拷贝了3次
再进一步优化 (linux2.4)


1、java调用transferTo方法后,要从java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用CPU
2、只会将一些offset和length信息考入socket缓冲区,几乎无消耗
3、使用DMA将内核缓冲区的数据写入网卡,不会使用CPU
整个过程中仅发生了一次用户态与内核态的切换,数据拷贝了2次,所谓的【零拷贝】,并不是真正的无拷贝,而是不会拷贝重复数据到JVM内存中,零拷贝的优点有:

  • 更少的用户态与内核态的切换
  • 不利用CPU计算,减少CPU缓存伪共享
  • 零拷贝适合小文件传输

6、AIO

AIO用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(kernel)提供支持

  • windows系统通过了IOCP实现了真正的异步IO
  • Linux系统异步IO在2.6版本引入,但其底层还是用多路复用模拟了异步IO,性能没有优势
文件AIO示例:
@Slf4j
public class AioFileChannel{
    public static void main(String[] args){
        try(AsynchronousFileChannel channel = AsyncahronousFileChannel.open(Paths.get("data.txt"), StandarOpenOption.READ)){
            // 参数1 ByteBUffer
            // 参数2 读取的起始位置
            // 参数3 附件
            // 参数4 回调对象 CompletionHandler
            ByteBuffer buffer = ByteBuffer.allocate(16);
            log.debug("read begin...");
            channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>(){
                @Override // read success
                public void completed(Integer result, ByteBuffer attachment){
                    log.debug("read completed...{}", result);
                    attachment.flip();
                    sout(attachment);
                }
                @Override // read fail
                public void failed(Throwable exc, ByteBuffer attachment){
                    exc.printStackTrace();
                }
            });
            log.debug("read end...");
            /**
             * 这里主线程结束了,用来返回数据的守护线程也会跟着结束!!!
             *
             */
        }catch(IOException e){
            e.printStackTrace();
        }
        System.in.read();// 防止主线程退出
    }
}

标题:网络编程基础
作者:amethystfob
地址:https://newmoon.top/articles/2023/11/28/1701141482324.html

欢迎各路大侠指点留痕:
, ,
TOP