ByteBuffer
结构
- capacity 容量
- position 写入(读取)位置
- limit 写入(读取)限制
- 初始状态, 写模式(图1-1)
- 写入数据后, position为写入位置, limit=capacity(图1-2)
- flip转换模式后, postition切换为读取位置, limit切换为读取限制(图1-3)
- 读取4个字节后(图1-4)
- clear动作, 清除数据, 重置为写模式(图1-5)
- compact动作, 将未读完的向前压缩, 再重置为写模式(图1-6)
常用方法
分配空间
ByteBuffer buffer = ByteBuffer.allocate(10); // java.nio.HeapByteBuffer - Java 堆内存(读写效率较低,受到GC的影响)
ByteBuffer buffer = ByteBuffer.allocateDirect(10); // java.nio.DirectByteBuffer - 直接内存(读写效率高,少一次拷贝, 分配效率低, 使用不当会造成内存泄漏)
写入数据
try (FileChannel channel = new FileInputStream("1.txt").getChannel()) {
// channel的读取
int readBytes = channel.read(buffer);
}
// 或者buffer自己的方法
buffer.put((byte)127)
读取数据
buffer.flip(); // 切换至读模式
byte b = buffer.get();
// 或者channel的方法
int writeByte = channel.write(buffer);
// get方法会让position读指针向后走
buffer.rewind(); // 将position重置
buffer.get(i); // 获取索引内容, 不会移动读指针
字符串转为ByteBuffer
// 1. 字符串转为 ByteBuffer
ByteBuffer buffer1 = ByteBuffer.allocate(16);
buffer1.put("hello".getBytes());
// 2. Charset (自动切换为读模式)
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("hello");
Charset.defaultCharset().encode("hello\nworld");
// 3. wrap (自动切换为读模式)
ByteBuffer buffer3 = ByteBuffer.wrap("hello".getBytes());
// 4. 转为字符串
String str1 = StandardCharsets.UTF_8.decode(buffer2).toString();
// 5.
黏包、分包demo
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public class TestByteBufferExamDee {
public static void main(String[] args) {
ByteBuffer source = ByteBuffer.allocate(32);
source.put("Hello,world\nI'm zhangsan\nHo".getBytes());
split(source);
source.put("w are you?\n".getBytes());
split(source);
}
private static void split(ByteBuffer source) {
// 切换读模式
source.flip();
// 遍历buffer
for (int i = 0; i < source.limit(); i++) {
if (source.get(i) == '\n') {
// 接收者的长度 = \n 的位置 - 当前读指针位置 + 1
int l = i - source.position();
ByteBuffer buffer = ByteBuffer.allocate(l);
for (int j = 0; j < l; j++) {
buffer.put(source.get());
}
source.get(); // 此处要让position再接着走一步, 跳过\n
buffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer));
}
}
// 读取完重新压缩 buffer(此处不能使用clear)
source.compact();
}
}
网络编程
多路复用
单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用
- 多路复用仅针对网络 IO、普通文件 IO 没法利用多路复用
- 如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证
- 有可连接事件时才去连接
- 有可读事件才去读取
- 有可写事件才去写入
- 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件
处理Read事件demo
- 服务端
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
/**
* @author Dee
* @date 2023/10/19
* <p>Description: NIO demo, 使用Selector管理
*/
@Slf4j
public class DeeServer {
public static void main(String[] args) throws IOException {
// 开启服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 设置为非阻塞
ssc.configureBlocking(false);
// 绑定端口
ssc.bind(new InetSocketAddress(8080));
// 创建select管理多个channel
Selector sel = Selector.open();
// 注册ssc
SelectionKey sscKey = ssc.register(sel, 0, null);
log.info("注册ssc, {}", sscKey);
// 关注接收事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
while (true) {
sel.select(); // 无事件阻塞, 有事件继续执行
log.info("有事件接入");
// 遍历selKeys
Iterator<SelectionKey> ite = sel.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey sk = ite.next();
log.info("迭代sk, {}", sk);
ite.remove(); // 处理key的时候一定用删除, 否则每次循环处理相同key会npe
if (sk.isAcceptable()) { // 如果是访问事件
// 获取通道
SelectableChannel channel = sk.channel();
// 强转为 ssc
ServerSocketChannel ssch = (ServerSocketChannel) channel;
// 建立连接
SocketChannel sc = ssch.accept();
sc.configureBlocking(false); // 设置为非阻塞
// 分配空间
ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
// 注册sc, 将buffer作为附件关联
SelectionKey scKey = sc.register(sel, 0, buffer);
// 关注读事件
scKey.interestOps(SelectionKey.OP_READ);
log.info("连接成功, {}", sc);
} else if (sk.isReadable()) { // 如果是读取事件
try {
// 获取通道并强转
SocketChannel channel = (SocketChannel) sk.channel();
// 获取附件(buffer)
ByteBuffer buffer = (ByteBuffer) sk.attachment();
int read = channel.read(buffer); // 如果是正常断开, read是-1
if (read == -1) {
sk.cancel();
} else {
// 截取\n
split(buffer);
// 如果截取后发现指针没变, 说明没接到\n, 需要扩容
if (buffer.position() == buffer.limit()) {
// 创建新的bf并替换attachment
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() << 1);
buffer.flip();
newBuffer.put(buffer);
sk.attach(newBuffer);
}
}
} catch (IOException e) {
e.printStackTrace();
sk.cancel(); // 事件取消注册, 因为客户端已经断开, 会产生一个读事件, 因此不处理会死循环
}
}
}
}
}
/**
*
* @param source
*/
private static void split(ByteBuffer source) {
source.flip();
// 循环遍历
for (int i = 0; i < source.limit(); i++) {
if (source.get(i) == '\n') {
int l = i - source.position();
ByteBuffer buffer = ByteBuffer.allocate(l);
for (int j = 0; j < l; j++) {
buffer.put(source.get());
}
source.get();
buffer.flip(); // 此处别忘了, 转换为读模式
System.out.println(StandardCharsets.UTF_8.decode(buffer));
}
}
source.compact();
}
}
- 客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress address = sc.getLocalAddress();
System.out.println("waiting...");
// sc.write(Charset.defaultCharset().encode("hello\nworld"));
sc.write(StandardCharsets.UTF_8.encode("helloworldabcdefghdf\n"));
System.in.read();
// sc.close();
}
}
处理Write事件demo
- 服务端
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
/**
* @author Dee
* @date 2023/10/23
* <p>Description: 处理write写事件
*/
@Slf4j
public class DeeWriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey sk = iter.next();
iter.remove();
if (sk.isAcceptable()) {
// 此处因为知道一定是 ssc的连接, 因此就不再从sk中取通道
SocketChannel sc = ssc.accept();
log.info("已连接");
sc.configureBlocking(false);
SelectionKey sck = sc.register(selector, SelectionKey.OP_READ);
// 测试写 三百万个长度的 字符串
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 10000000; i++) {
sb.append("a");
}
ByteBuffer bf = Charset.defaultCharset().encode(sb.toString());
// 先写一次
int write = sc.write(bf);
System.out.println(write); // 打印写了多少的长度
// 防止写不完, 如果还有剩余的继续写
if (bf.hasRemaining()) {
// 未写完的会触发新的事件, 因此需要关注写事件, 同时也要拿到之前的事件
sck.interestOps(sck.interestOps() | SelectionKey.OP_WRITE);
// sk.interestOps(sk.interestOps() + SelectionKey.OP_WRITE); // 两种写法
// 把buffer存起来
sck.attach(bf);
}
} else if (sk.isWritable()) {
// 把上次未写完的buffer拿出来
ByteBuffer bf = (ByteBuffer) sk.attachment();
SocketChannel sc = (SocketChannel) sk.channel();
int write = sc.write(bf);
System.out.println(write);
// 如果没有数据可写了, 则删除附件, 并且去除关注的读事件
if (!bf.hasRemaining()) {
sk.attach(null);
sk.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE);
}
}
}
}
}
}
- 客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* @author Dee
* @date 2023/10/24
* <p>Description:
*/
public class DeeWriteClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress(8080));
int count = 0; // 定义总共读取的长度
while (true) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
System.out.println("读取的长度:" + count);
}
}
}
selector不阻塞情况
- 事件发生时
- 客户端发起连接请求,会触发 accept 事件
- 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
- channel 可写,会触发 write 事件
- 在 linux 下 nio bug 发生时
- 调用 selector.wakeup()
- 调用 selector.close()
- selector 所在线程 interrupt