玩命加载中🤣🤣🤣

NIO


ByteBuffer

结构

  • capacity 容量
  • position 写入(读取)位置
  • limit 写入(读取)限制
  1. 初始状态, 写模式(图1-1)

图1-1

  1. 写入数据后, position为写入位置, limit=capacity(图1-2)

图1-2

  1. flip转换模式后, postition切换为读取位置, limit切换为读取限制(图1-3)

图1-3

  1. 读取4个字节后(图1-4)

图1-4

  1. clear动作, 清除数据, 重置为写模式(图1-5)

图1-5

  1. compact动作, 将未读完的向前压缩, 再重置为写模式(图1-6)

图1-6

常用方法

分配空间

ByteBuffer buffer = ByteBuffer.allocate(10);  // java.nio.HeapByteBuffer  - Java 堆内存(读写效率较低,受到GC的影响)
ByteBuffer buffer = ByteBuffer.allocateDirect(10);  // java.nio.DirectByteBuffer  - 直接内存(读写效率高,少一次拷贝, 分配效率低, 使用不当会造成内存泄漏)

写入数据

try (FileChannel channel = new FileInputStream("1.txt").getChannel()) {
    // channel的读取
	int readBytes = channel.read(buffer);
}
// 或者buffer自己的方法
buffer.put((byte)127)

读取数据

buffer.flip();  // 切换至读模式
byte b = buffer.get();
// 或者channel的方法
int writeByte = channel.write(buffer);

// get方法会让position读指针向后走
buffer.rewind(); // 将position重置
buffer.get(i); // 获取索引内容, 不会移动读指针

字符串转为ByteBuffer

// 1. 字符串转为 ByteBuffer
ByteBuffer buffer1 = ByteBuffer.allocate(16);
buffer1.put("hello".getBytes());

// 2. Charset (自动切换为读模式)
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("hello");
Charset.defaultCharset().encode("hello\nworld");

// 3. wrap (自动切换为读模式)
ByteBuffer buffer3 = ByteBuffer.wrap("hello".getBytes());

// 4. 转为字符串
String str1 = StandardCharsets.UTF_8.decode(buffer2).toString();


// 5. 

黏包、分包demo

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

public class TestByteBufferExamDee {
    public static void main(String[] args) {
        ByteBuffer source = ByteBuffer.allocate(32);
        source.put("Hello,world\nI'm zhangsan\nHo".getBytes());
        split(source);
        source.put("w are you?\n".getBytes());
        split(source);
    }

    private static void split(ByteBuffer source) {
        // 切换读模式
        source.flip();
        // 遍历buffer
        for (int i = 0; i < source.limit(); i++) {
            if (source.get(i) == '\n') {
                // 接收者的长度 = \n 的位置 - 当前读指针位置 + 1
                int l = i - source.position();
                ByteBuffer buffer = ByteBuffer.allocate(l);
                for (int j = 0; j < l; j++) {
                    buffer.put(source.get());
                }
                source.get(); // 此处要让position再接着走一步, 跳过\n
                buffer.flip();
                System.out.println(StandardCharsets.UTF_8.decode(buffer));
            }
        }
        // 读取完重新压缩 buffer(此处不能使用clear)
        source.compact();
    }
}

网络编程

多路复用

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

  • 多路复用仅针对网络 IO、普通文件 IO 没法利用多路复用
  • 如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证
    • 有可连接事件时才去连接
    • 有可读事件才去读取
    • 有可写事件才去写入
      • 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件

Selector模型

处理Read事件demo

  • 服务端
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

/**
 * @author Dee
 * @date 2023/10/19
 * <p>Description: NIO demo, 使用Selector管理
 */
@Slf4j
public class DeeServer {
    public static void main(String[] args) throws IOException {
        // 开启服务器
        ServerSocketChannel ssc = ServerSocketChannel.open();
        // 设置为非阻塞
        ssc.configureBlocking(false);
        // 绑定端口
        ssc.bind(new InetSocketAddress(8080));
        // 创建select管理多个channel
        Selector sel = Selector.open();
        // 注册ssc
        SelectionKey sscKey = ssc.register(sel, 0, null);
        log.info("注册ssc, {}", sscKey);
        // 关注接收事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        while (true) {
            sel.select(); // 无事件阻塞, 有事件继续执行
            log.info("有事件接入");
            // 遍历selKeys
            Iterator<SelectionKey> ite = sel.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey sk = ite.next();
                log.info("迭代sk, {}", sk);
                ite.remove(); // 处理key的时候一定用删除, 否则每次循环处理相同key会npe
                if (sk.isAcceptable()) { // 如果是访问事件
                    // 获取通道
                    SelectableChannel channel = sk.channel();
                    // 强转为 ssc
                    ServerSocketChannel ssch = (ServerSocketChannel) channel;
                    // 建立连接
                    SocketChannel sc = ssch.accept();
                    sc.configureBlocking(false); // 设置为非阻塞
                    // 分配空间
                    ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
                    // 注册sc, 将buffer作为附件关联
                    SelectionKey scKey = sc.register(sel, 0, buffer);
                    // 关注读事件
                    scKey.interestOps(SelectionKey.OP_READ);
                    log.info("连接成功, {}", sc);
                } else if (sk.isReadable()) { // 如果是读取事件
                    try {
                        // 获取通道并强转
                        SocketChannel channel = (SocketChannel) sk.channel();
                        // 获取附件(buffer)
                        ByteBuffer buffer = (ByteBuffer) sk.attachment();
                        int read = channel.read(buffer); // 如果是正常断开, read是-1
                        if (read == -1) {
                            sk.cancel();
                        } else {
                            // 截取\n
                            split(buffer);
                            // 如果截取后发现指针没变, 说明没接到\n, 需要扩容
                            if (buffer.position() == buffer.limit()) {
                                // 创建新的bf并替换attachment
                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() << 1);
                                buffer.flip();
                                newBuffer.put(buffer);
                                sk.attach(newBuffer);
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        sk.cancel(); // 事件取消注册, 因为客户端已经断开, 会产生一个读事件, 因此不处理会死循环
                    }
                }


            }
        }
    }

    /**
     *
     * @param source
     */
    private static void split(ByteBuffer source) {
        source.flip();
        // 循环遍历
        for (int i = 0; i < source.limit(); i++) {
            if (source.get(i) == '\n') {
                int l = i - source.position();
                ByteBuffer buffer = ByteBuffer.allocate(l);
                for (int j = 0; j < l; j++) {
                    buffer.put(source.get());
                }
                source.get();
                buffer.flip(); // 此处别忘了, 转换为读模式
                System.out.println(StandardCharsets.UTF_8.decode(buffer));
            }
        }
        source.compact();
    }
}
  • 客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;

public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        SocketAddress address = sc.getLocalAddress();
        System.out.println("waiting...");
//        sc.write(Charset.defaultCharset().encode("hello\nworld"));
        sc.write(StandardCharsets.UTF_8.encode("helloworldabcdefghdf\n"));
        System.in.read();
//        sc.close();
    }
}

处理Write事件demo

  • 服务端
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

/**
 * @author Dee
 * @date 2023/10/23
 * <p>Description: 处理write写事件
 */
@Slf4j
public class DeeWriteServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));
        while (true) {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey sk = iter.next();
                iter.remove();
                if (sk.isAcceptable()) {
                    // 此处因为知道一定是 ssc的连接, 因此就不再从sk中取通道
                    SocketChannel sc = ssc.accept();
                    log.info("已连接");
                    sc.configureBlocking(false);
                    SelectionKey sck = sc.register(selector, SelectionKey.OP_READ);
                    // 测试写 三百万个长度的 字符串
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 10000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer bf = Charset.defaultCharset().encode(sb.toString());
                    // 先写一次
                    int write = sc.write(bf);
                    System.out.println(write); // 打印写了多少的长度
                    // 防止写不完, 如果还有剩余的继续写
                    if (bf.hasRemaining()) {
                        // 未写完的会触发新的事件, 因此需要关注写事件, 同时也要拿到之前的事件
                        sck.interestOps(sck.interestOps() | SelectionKey.OP_WRITE);
//                        sk.interestOps(sk.interestOps() + SelectionKey.OP_WRITE); // 两种写法
                        // 把buffer存起来
                        sck.attach(bf);
                    }
                } else if (sk.isWritable()) {
                    // 把上次未写完的buffer拿出来
                    ByteBuffer bf = (ByteBuffer) sk.attachment();
                    SocketChannel sc = (SocketChannel) sk.channel();
                    int write = sc.write(bf);
                    System.out.println(write);
                    // 如果没有数据可写了, 则删除附件, 并且去除关注的读事件
                    if (!bf.hasRemaining()) {
                        sk.attach(null);
                        sk.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE);
                    }
                }
            }
        }
    }
}
  • 客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * @author Dee
 * @date 2023/10/24
 * <p>Description:
 */
public class DeeWriteClient {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress(8080));
        int count = 0; // 定义总共读取的长度
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
            count += sc.read(buffer);
            System.out.println("读取的长度:" + count);
        }
    }
}

selector不阻塞情况

  • 事件发生时
    • 客户端发起连接请求,会触发 accept 事件
    • 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
    • channel 可写,会触发 write 事件
    • 在 linux 下 nio bug 发生时
  • 调用 selector.wakeup()
  • 调用 selector.close()
  • selector 所在线程 interrupt

文章作者: 👑Dee👑
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 👑Dee👑 !
  目录