自定义线程池demo
流程图
graph LR subgraph Thread Pool t1[t1] t2[t2] t3[t3] end subgraph Blocking Queue task1(task1) task2(task2) task3(task3) end m[main] m --put--> task3 task3 --> task2 task2 --> task1 worker[[worker]] task1 --poll--> t1 -.- worker task1 -.poll.-> t2 -.- worker task1 -.poll.-> t3 -.- worker
角色说明:
Blocking Queue: 阻塞队列, main线程负责将任务提交到阻塞队列中, 用于存储任务
task: 待执行任务
Thread Pool: 线程池, 用于存储线程对象
t: 线程对象, 用来执行每个任务
worker: 此处将每个线程比作工人
类图说明
classDiagram class BlockingQueue~T~ { -Deque~T~ queue // 阻塞队列 -ReentrantLock lock // 锁 -Condition fullWaitCond -Condition emptyWaitCond -int capcity // 队列大小 +poll(long timeout, TimeUnit unit) T +put(T t) void } class ThreadPool { -BlockingQueue~Runnable~ taskQueue // 任务队列 -HashSet~Worker~ works // 线程集合 -int coreSize // 核心线程数 -long timeout // 超时时间 -TimeUnit timeUnit // 时间单位 +execute(Runnable task) void } class Worker{ -Runnable task // 线程任务 +run() void } Thread <|-- Worker: 继承 ThreadPool *-- BlockingQueue~T~: 组合 ThreadPool o-- Worker: 聚合
代码部分
阻塞队列
阻塞队列核心功能是阻塞获取(线程池使用)和推入待执行任务(main线程使用)
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* <p>Description:
* 阻塞队列, 采用生产者消费者模式
*/
public class BlockingQueue<T> {
// 1. 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 2. 锁
private ReentrantLock lock = new ReentrantLock();
// 3. 生产者条件
private Condition fullWaitCond = lock.newCondition();
// 4. 消费者条件
private Condition emptyWaitCond = lock.newCondition();
// 5. 容量
private int capcity;
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
// 阻塞获取
public T poll(long timeout, TimeUnit unit) {
// 尝试获取锁
lock.lock();
try {
// 将时间转为纳秒
long nanos = unit.toNanos(timeout);
// 如果任务队列为空, 则进循环尝试
while (queue.isEmpty()) {
// 如果等待时间为0, 则返回空
if (nanos <= 0) {
return null;
}
// 进入空闲等待, 如果在此期间被唤醒, 则重置等待时间(用于防止虚假唤醒)
try {
nanos = emptyWaitCond.awaitNanos(nanos);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 提取队列首个任务
T t = queue.removeFirst();
// 唤醒消费者消费
fullWaitCond.signal();
return t;
} finally {
lock.unlock();
}
}
public void put(T t) {
// 尝试获取锁
lock.lock();
try {
while (queue.size() == capcity) {
// 如果容量满, 则循环等待
try {
// 进入饱和等待
fullWaitCond.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 添加至队列
queue.addLast(t);
// 加完队列后, 通知消费者消费
emptyWaitCond.signal();
} finally {
lock.unlock();
}
}
}
线程池
import lombok.extern.slf4j.Slf4j;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
@Slf4j(topic = "ThreadPool")
public class ThreadPool {
/**
* 任务队列
*/
private BlockingQueue<Runnable> taskQueue;
// 线程集合: 此处将线程做一个包装
private HashSet<Worker> works = new HashSet<>();
// 核心线程数
private int coreSize;
// 超时时间, 一旦超过超时时间, 则关闭线程
private long timeout;
private TimeUnit timeUnit;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
}
/**
* 执行
*
* @param task
*/
public void execute(Runnable task) {
try {
// 如果当前线程 < 核心线程数时
if (works.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("新增 worker {} task={}", worker, task);
works.add(worker);
worker.start();
} else {
log.debug("加入任务队列 {}", task);
taskQueue.put(task);
}
} catch (Exception e) {
} finally {
}
}
/**
* 线程对象
*/
public class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行任务
// 1) 当有任务时
// 2) 当任务队列不为空时
while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
log.debug("正在执行...{} ", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (works) {
log.debug("worker 被移除 {}", this);
works.remove(this);
}
}
}
}
测试类
@Slf4j
class TestPool{
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
// 由主线程给线程池提交任务
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(() ->
log.debug("{}", j)
);
}
}
}
线程池工作流程
JDK自带
graph LR; c([添加任务]) q2(线程数 < coreSize) q3(阻塞队列是否已满) q4(线程数 < maxSize) a1[添加工作线程并执行] a2[添加至阻塞队列并等待工作线程来提取] a3[添加救急线程并执行] r[拒绝任务] q4 --否--> r c --> q2 --否--> q3 --是--> q4 q2 --是--> a1 q3 --否--> a2 q4 --是--> a3
graph LR; c(添加任务) q2false q3 q4false a1[添加工作线程并执行] a2[添加至阻塞队列并等待工作线程来提取] a3[添加救急线程并执行] r[拒绝任务] q4 --否--> r c --> q2 --否--> q3 --是--> q4 q2 --是--> a1 q3 --否--> a2 q4 --是--> a3
Tomcat NIO EndPoint
全流程
graph LR; ll(LimitLatch) --> a(acceptor) sc1(SocketChannel 1) sc2(SocketChannel 2) a --> sc1 a --> sc2 p(Poller) sc1 --有读--> p sc2 --有读--> p subgraph Executor w1(worker 1) w2(worker 2) end p --socketProcessor--> w1 p --socketProcessor--> w2
- LimitLatch 用来限流, 可以控制最大连接个数, 类似 JUC 中的 Semphore
- Acceptor只负责[接收新的 socket 连接]
- Poller 只负责监听 socket channel 是否有 [可读的 I/O 事件]
- 一旦可读, 封装一个任务对象(socketProcessor), 提交给 Executor 线程池处理
- Executor 线程池中的工作线程最终负责 [处理请求]
网络层流程
graph LR; c([添加任务]) q1(任务数 < coreSize) q2(任务数 < maxSize) a1[加入队列] a2[创建救急线程] c --> q1 --是--> a1 q1 --否--> q2 --否--> a1 q2 --是--> a2
graph LR; c(添加任务) q1false q2false a1[加入队列] a2[创建救急线程] c --> q1 --是--> a1 q1 --否--> q2 --否--> a1 q2 --是--> a2