Java队列相关笔记
队列
flowchart LR t1[Thread1] subgraph BlockingQueue task1 task2 task3 end t1 --put--> BlockingQueue BlockingQueue --take--> t2[Thread2]
Thread1 往阻塞队列中添加元素,而 Thread2 从阻塞队列中移除元素
阻塞队列:
- 当队列是空时,从队列中获取元素的操作将会被阻塞
- 当队列时满时,从队列里添加元素的操作将会被阻塞
常用
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列
- SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列
API
方法类型 | 失败抛异常 | 返回布尔值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove(e) | poll() | take() | poll(time, unit) |
检查 | element() | peek() | 不可用 | 不可用 |
同步队列SynchronousQueue
只能生产一个消费一个
public class SynchronousQueueDemo {
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();
new Thread(() -> {
//先放3个数据再说
try {
System.out.println(Thread.currentThread().getName() + "推入 >>> " + "a");
queue.put("a");
System.out.println(Thread.currentThread().getName() + "推入 >>> " + "b");
queue.put("b");
System.out.println(Thread.currentThread().getName() + "推入 >>> " + "c");
queue.put("c");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "t1").start();
new Thread(() -> {
//没3秒钟取一次
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "来取 >>> " + queue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "来取 >>> " + queue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "来取 >>> " + queue.take());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "t2").start();
}
}
t1推入 >>> a
t2来取 >>> a
t1推入 >>> b
t2来取 >>> b
t1推入 >>> c
t2来取 >>> c
传统生产/消费者
- 线程 > 操作(方法) > 资源类(资源+锁)
- 判断 > 干活 > 通知
- 防止虚假唤醒机制
代码示例
public class ProdConsumerTraditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
shareData.decrement();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}, "consumer").start();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
shareData.increment();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}, "producer").start();
}
}
//资源类
class ShareData {
int number = 0; //资源
private Lock lock = new ReentrantLock(); //锁
private Condition condition = lock.newCondition();
//操作方法(生产)
public void increment() throws Exception {
lock.lock();
try {
//判断
while (number != 0) { //防止虚假唤醒
condition.await();
}
//干活
number++;
System.out.println(Thread.currentThread().getName() + "::" + number);
//通知
condition.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
//操作方法(消费)
public void decrement() throws Exception {
lock.lock();
try {
while (number == 0) { //防止虚假唤醒
condition.await();
}
//干活
number--;
System.out.println(Thread.currentThread().getName() + "::" + number);
//通知
condition.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
拓展: Condition的精确唤醒
A, B, C三个线程, A打印5次 >> 然后B打印10次 >> 然后C打印15次, 来10轮
- 定义一个标志位
- 定义多个条件
- 标志位代表当前身份, 用来唤醒下一个条件
- 剩余逻辑与上一条类似
public class ConditionDemo {
public static void main(String[] args) {
ShareData2 shareData2 = new ShareData2();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
shareData2.print(1);
}, "A").start();
new Thread(() -> {
shareData2.print(2);
}, "B").start();
new Thread(() -> {
shareData2.print(3);
}, "C").start();
}
}
}
class ShareData2 {
private int number = 1; //A:1 B:2 C:3
private Lock lock = new ReentrantLock();
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();
Condition c3 = lock.newCondition();
public void print(int current) {
lock.lock();
try {
while (number != current) {
switch (current) {
case 1:
c1.await();
break;
case 2:
c2.await();
break;
case 3:
c3.await();
break;
}
}
//干活
for (int i = 1; i <= 5 * current; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i);
}
//改变标志位
number = current == 3 ? 1 : current + 1;
//通知下一个线程
switch (current) {
case 1:
c2.signal();
break;
case 2:
c3.signal();
break;
case 3:
c1.signal();
break;
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
A::1
A::2
A::3
A::4
A::5
B::1
B::2
B::3
B::4
B::5
B::6
B::7
B::8
B::9
B::10
C::1
C::2
C::3
C::4
.......
阻塞队列线程交互
public class ProdConsumerQueueDemo {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource(new ArrayBlockingQueue<>(5));
new Thread(() -> {
try {
shareResource.consumer();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, "consumer").start();
new Thread(() -> {
try {
shareResource.producer();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, "producer").start();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
shareResource.stop(); //停止任务队列
}
}
class ShareResource {
private volatile boolean FLAG = true;
private AtomicInteger atomicInteger = new AtomicInteger();
private BlockingQueue<String> queue;
public ShareResource(BlockingQueue<String> queue) {
this.queue = queue;
System.out.println(queue.getClass().getName());
}
public void producer() throws Exception {
String data; //队列数据
boolean offer; //是否成功
while (FLAG) {
data = atomicInteger.incrementAndGet() + "";
offer = queue.offer(data, 2L, TimeUnit.SECONDS); //等待超时2s
if (offer) {
System.out.println(Thread.currentThread().getName() + "\t插入队列::" + data + " 成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t插入队列::" + data + " 失败");
}
TimeUnit.SECONDS.sleep(1); //间隔1秒插入队列
}
System.out.println(Thread.currentThread().getName() + "\t停止生产");
}
public void consumer() throws Exception {
while (FLAG) {
String poll = queue.poll(2L, TimeUnit.SECONDS); //获取队列数据, 等待两秒钟
if (null != poll && !"".equals(poll)) {
System.out.println(Thread.currentThread().getName() + "\t消费队列::" + poll);
} else {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t消费队列超时");
return; //退出循环
}
}
}
public void stop() {
FLAG = false;
}
}
java.util.concurrent.ArrayBlockingQueue
producer 插入队列::1 成功
consumer 消费队列::1
producer 插入队列::2 成功
consumer 消费队列::2
producer 插入队列::3 成功
consumer 消费队列::3
producer 插入队列::4 成功
consumer 消费队列::4
producer 插入队列::5 成功
consumer 消费队列::5
producer 停止生产
consumer 消费队列超时
Callable
a等b计算完成
public class CallableDemo {
public static void main(String[] args) {
FutureTask<Integer> task = new FutureTask<>(new MyThread());
new Thread(task, "A").start();
try {
int a = 6;
System.out.println("a已计算完成 >>> " + a);
while (!task.isDone()) {
TimeUnit.SECONDS.sleep(1);
}
Integer b = task.get();
System.out.println("b已计算完成 >>> " + b);
System.out.println("计算b+a= " +(b + a));
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(3);
return 3;
}
}
线程池
主要就是控制运行的线程数量, 处理过程中将任务放入队列, 然后在线程创建后启动这些任务, 如果线程数量超过了最大数量, 则排队等候, 等其它线程执行完毕, 再从队列中取出任务来执行
- Executors.newFixedThreadPool(int)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- Executors.newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
七大参数
参数 | 说明 | 银行 |
---|---|---|
corePoolSize | 线程池中的常驻核心线程数 | 当值窗口,假设2 |
maximumPoolSize | 线程池能够容纳同时执行的最大线程数,此值必须大于等于1 | 银行最大窗口,假设5(候客区满了,开设加班窗口,再来的人加塞处理,等候的继续等候) |
keepAliveTime | 多余的空闲线程的存活时间。 当前线程池数量超过corePoolSizel时,当空闲时间达到 keepAliveTime 值时, 多余空闲线程会被销毁直到只剩下 corePoolSize 个线程为止 |
空闲下来后,加班窗口下班 |
unit | keepAliveTime的单位 | |
workQueue | 任务队列,被提交但尚未被执行的任务 | 候客区 |
threadFactory | 表示生成线程池中工作线程的线程工厂 | 工作人员的制服/胸卡,一般用默认 |
handler | 拒绝策略,表示当队列满了并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时 | 人员已满 |
拒绝策略
拒绝策略 | 说明 | |
---|---|---|
AbortPolicy(默认) | 直接抛出 RejectedExecutionException 异常组织系统正常运行 | |
CallerRunsPolicy | "调用者运行"一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新的任务流量 | 谁调用我,就找谁执行 |
DiscardOldestPolicy | 抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务 | 丢老的 |
DiscardPolicy | 直接丢弃任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种方案 | 丢新的 |
自定义线程池Demo
public class ThreadPoolDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
1,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
for (int i = 0; i < 9; i++) {
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + "::办理业务");
});
}
}
}
pool-1-thread-3::办理业务
pool-1-thread-2::办理业务
pool-1-thread-3::办理业务
pool-1-thread-4::办理业务
pool-1-thread-1::办理业务
pool-1-thread-5::办理业务
pool-1-thread-3::办理业务
pool-1-thread-2::办理业务
最大线程数参数参考
CPU密集型:CPU核心数+1
IO密集型:
- CPU核心数 * 2
- CPU核数 / (1- 阻塞系数) 阻塞系数一般0.8-0.9
死锁Demo
class ShareData implements Runnable{
private String lockA;
private String lockB;
public ShareData(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA) {
System.out.println("get::" + lockA);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (lockB) {
System.out.println("get::" + lockB);
}
}
}
}
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new ShareData(lockA, lockB), "t1").start();
new Thread(new ShareData(lockB, lockA), "t2").start();
}
}
console
get::lockA
get::lockB
排查死锁
jps
+ jstatck
Found one Java-level deadlock:
=============================
"t2":
waiting to lock monitor 0x000000001d1e2f98 (object 0x000000076e0bf650, a java.lang.String),
which is held by "t1"
"t1":
waiting to lock monitor 0x000000001d1e5618 (object 0x000000076e0bf688, a java.lang.String),
which is held by "t2"
Java stack information for the threads listed above:
===================================================
"t2":
at cn.itcast.interview.thread_pool.ShareData.run(DeadLockDemo.java:28)
- waiting to lock <0x000000076e0bf650> (a java.lang.String)
- locked <0x000000076e0bf688> (a java.lang.String)
at java.lang.Thread.run(Thread.java:748)
"t1":
at cn.itcast.interview.thread_pool.ShareData.run(DeadLockDemo.java:28)
- waiting to lock <0x000000076e0bf688> (a java.lang.String)
- locked <0x000000076e0bf650> (a java.lang.String)
at java.lang.Thread.run(Thread.java:748)
Found 1 deadlock.