玩命加载中🤣🤣🤣

自定义线程池


自定义线程池demo

流程图

graph LR
	subgraph Thread Pool
        t1[t1]
        t2[t2]
        t3[t3]
	end
	subgraph Blocking Queue
		task1(task1)
		task2(task2)
		task3(task3)
	end
	m[main]
	m  --put--> task3
	task3 --> task2
	task2 --> task1
	worker[[worker]]
	task1 --poll--> t1 -.- worker
	task1 -.poll.-> t2 -.- worker
	task1 -.poll.-> t3 -.- worker

角色说明:

Blocking Queue: 阻塞队列, main线程负责将任务提交到阻塞队列中, 用于存储任务

task: 待执行任务

Thread Pool: 线程池, 用于存储线程对象

t: 线程对象, 用来执行每个任务

worker: 此处将每个线程比作工人

类图说明

classDiagram
class BlockingQueue~T~ {
	-Deque~T~ queue // 阻塞队列
	-ReentrantLock lock // 锁
	-Condition fullWaitCond
	-Condition emptyWaitCond
	-int capcity // 队列大小
	+poll(long timeout, TimeUnit unit) T
	+put(T t) void
}
class ThreadPool {
	-BlockingQueue~Runnable~ taskQueue // 任务队列
	-HashSet~Worker~ works // 线程集合
	-int coreSize // 核心线程数
	-long timeout // 超时时间
	-TimeUnit timeUnit // 时间单位
	+execute(Runnable task) void
}
class Worker{
	-Runnable task // 线程任务
	+run() void
}
Thread <|-- Worker: 继承
ThreadPool *-- BlockingQueue~T~: 组合
ThreadPool o-- Worker: 聚合

代码部分

阻塞队列

阻塞队列核心功能是阻塞获取(线程池使用)和推入待执行任务(main线程使用)

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * <p>Description:
 * 阻塞队列, 采用生产者消费者模式
 */
public class BlockingQueue<T> {
    // 1. 任务队列
    private Deque<T> queue = new ArrayDeque<>();
    // 2. 锁
    private ReentrantLock lock = new ReentrantLock();
    // 3. 生产者条件
    private Condition fullWaitCond = lock.newCondition();
    // 4. 消费者条件
    private Condition emptyWaitCond = lock.newCondition();
    // 5. 容量
    private int capcity;

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }
    // 阻塞获取
    public T poll(long timeout, TimeUnit unit) {
        // 尝试获取锁
        lock.lock();
        try {
            // 将时间转为纳秒
            long nanos = unit.toNanos(timeout);
            // 如果任务队列为空, 则进循环尝试
            while (queue.isEmpty()) {
                // 如果等待时间为0, 则返回空
                if (nanos <= 0) {
                    return null;
                }
                // 进入空闲等待, 如果在此期间被唤醒, 则重置等待时间(用于防止虚假唤醒)
                try {
                    nanos = emptyWaitCond.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            // 提取队列首个任务
            T t = queue.removeFirst();
            // 唤醒消费者消费
            fullWaitCond.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    public void put(T t) {
        // 尝试获取锁
        lock.lock();
        try {
            while (queue.size() == capcity) {
                // 如果容量满, 则循环等待
                try {
                    // 进入饱和等待
                    fullWaitCond.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            // 添加至队列
            queue.addLast(t);
            // 加完队列后, 通知消费者消费
            emptyWaitCond.signal();
        } finally {
            lock.unlock();
        }
    }
}

线程池

import lombok.extern.slf4j.Slf4j;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;

@Slf4j(topic = "ThreadPool")
public class ThreadPool {
    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> taskQueue;

    // 线程集合: 此处将线程做一个包装
    private HashSet<Worker> works = new HashSet<>();

    // 核心线程数
    private int coreSize;

    // 超时时间, 一旦超过超时时间, 则关闭线程
    private long timeout;
    private TimeUnit timeUnit;

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
    }

    /**
     * 执行
     *
     * @param task
     */
    public void execute(Runnable task) {
        try {
            // 如果当前线程 < 核心线程数时
            if (works.size() < coreSize) {
                Worker worker = new Worker(task);
                log.debug("新增 worker {} task={}", worker, task);
                works.add(worker);
                worker.start();
            } else {
                log.debug("加入任务队列 {}", task);
                taskQueue.put(task);
            }
        } catch (Exception e) {

        } finally {

        }
    }

    /**
     * 线程对象
     */
    public class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1) 当有任务时
            // 2) 当任务队列不为空时
            while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
                try {
                    log.debug("正在执行...{} ", task);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            synchronized (works) {
                log.debug("worker 被移除 {}", this);
                works.remove(this);
            }
        }
    }
}

测试类

@Slf4j
class TestPool{
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
        // 由主线程给线程池提交任务
        for (int i = 0; i < 5; i++) {
            int j = i;
            threadPool.execute(() ->
                log.debug("{}", j)
            );
        }
    }
}

线程池工作流程

JDK自带

graph LR;
	c([添加任务])
	q2(线程数 < coreSize)
	q3(阻塞队列是否已满)
	q4(线程数 < maxSize)
	a1[添加工作线程并执行]
	a2[添加至阻塞队列并等待工作线程来提取]
	a3[添加救急线程并执行]
	r[拒绝任务]
	q4 --否--> r
	c --> q2 --否--> q3 --是--> q4
	q2 --是--> a1
	q3 --否--> a2
	q4 --是--> a3
graph LR;
	c(添加任务)
	q2false
	q3
	q4false
	a1[添加工作线程并执行]
	a2[添加至阻塞队列并等待工作线程来提取]
	a3[添加救急线程并执行]
	r[拒绝任务]
	q4 --否--> r
	c --> q2 --否--> q3 --是--> q4
	q2 --是--> a1
	q3 --否--> a2
	q4 --是--> a3

Tomcat NIO EndPoint

全流程

graph LR;
	ll(LimitLatch) --> a(acceptor)
	sc1(SocketChannel 1)
	sc2(SocketChannel 2)
	a --> sc1
	a --> sc2
	p(Poller)
	sc1 --有读--> p
	sc2 --有读--> p
	subgraph Executor
	w1(worker 1)
	w2(worker 2)
	end
	p --socketProcessor--> w1
	p --socketProcessor--> w2
  • LimitLatch 用来限流, 可以控制最大连接个数, 类似 JUC 中的 Semphore
  • Acceptor只负责[接收新的 socket 连接]
  • Poller 只负责监听 socket channel 是否有 [可读的 I/O 事件]
  • 一旦可读, 封装一个任务对象(socketProcessor), 提交给 Executor 线程池处理
  • Executor 线程池中的工作线程最终负责 [处理请求]

网络层流程

graph LR;
	c([添加任务])
	q1(任务数 < coreSize)
	q2(任务数 < maxSize)
	a1[加入队列]
	a2[创建救急线程]
	c --> q1 --是--> a1
	q1 --否--> q2 --否--> a1
	q2 --是--> a2
graph LR;
	c(添加任务)
	q1false
	q2false
	a1[加入队列]
	a2[创建救急线程]
	c --> q1 --是--> a1
	q1 --否--> q2 --否--> a1
	q2 --是--> a2

文章作者: 👑Dee👑
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC 4.0 许可协议。转载请注明来源 👑Dee👑 !
  目录