玩命加载中🤣🤣🤣

Java队列相关笔记


Java队列相关笔记

队列

flowchart LR
t1[Thread1]
subgraph BlockingQueue
task1
task2
task3
end
t1 --put--> BlockingQueue
BlockingQueue --take--> t2[Thread2]

Thread1 往阻塞队列中添加元素,而 Thread2 从阻塞队列中移除元素

阻塞队列:

  • 当队列是空时,从队列中获取元素的操作将会被阻塞
  • 当队列时满时,从队列里添加元素的操作将会被阻塞

常用

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列

API

方法类型 失败抛异常 返回布尔值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove(e) poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用

同步队列SynchronousQueue

只能生产一个消费一个

public class SynchronousQueueDemo {
    public static void main(String[] args) {
        SynchronousQueue<String> queue = new SynchronousQueue<>();

        new Thread(() -> {
            //先放3个数据再说
            try {
                System.out.println(Thread.currentThread().getName() + "推入 >>> " + "a");
                queue.put("a");

                System.out.println(Thread.currentThread().getName() + "推入 >>> " + "b");
                queue.put("b");

                System.out.println(Thread.currentThread().getName() + "推入 >>> " + "c");
                queue.put("c");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, "t1").start();

        new Thread(() -> {
            //没3秒钟取一次
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + "来取 >>> " + queue.take());


                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + "来取 >>> " + queue.take());


                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + "来取 >>> " + queue.take());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, "t2").start();
    }
}
t1推入 >>> a
t2来取 >>> a
t1推入 >>> b
t2来取 >>> b
t1推入 >>> c
t2来取 >>> c

传统生产/消费者

  1. 线程 > 操作(方法) > 资源类(资源+锁)
  2. 判断 > 干活 > 通知
  3. 防止虚假唤醒机制

代码示例

public class ProdConsumerTraditionDemo {

    public static void main(String[] args) {
        ShareData shareData = new ShareData();
        new Thread(() -> {
            for (int i = 1; i <= 5; i++) {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }, "consumer").start();

        new Thread(() -> {
            for (int i = 1; i <= 5; i++) {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }, "producer").start();
    }
}

//资源类
class ShareData {
    int number = 0; //资源
    private Lock lock = new ReentrantLock(); //锁
    private Condition condition = lock.newCondition();

    //操作方法(生产)
    public void increment() throws Exception {
        lock.lock();
        try {
            //判断
            while (number != 0) { //防止虚假唤醒
                condition.await();
            }
            //干活
            number++;
            System.out.println(Thread.currentThread().getName() + "::" + number);
            //通知
            condition.signalAll();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }

    }

    //操作方法(消费)
    public void decrement() throws Exception {
        lock.lock();
        try {
            while (number == 0) { //防止虚假唤醒
                condition.await();
            }
            //干活
            number--;
            System.out.println(Thread.currentThread().getName() + "::" + number);
            //通知
            condition.signalAll();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }
}

拓展: Condition的精确唤醒

A, B, C三个线程, A打印5次 >> 然后B打印10次 >> 然后C打印15次, 来10轮

  1. 定义一个标志位
  2. 定义多个条件
  3. 标志位代表当前身份, 用来唤醒下一个条件
  4. 剩余逻辑与上一条类似
public class ConditionDemo {
    public static void main(String[] args) {
        ShareData2 shareData2 = new ShareData2();
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                shareData2.print(1);
            }, "A").start();
            new Thread(() -> {
                shareData2.print(2);
            }, "B").start();
            new Thread(() -> {
                shareData2.print(3);
            }, "C").start();
        }
    }
}


class ShareData2 {
    private int number = 1; //A:1 B:2 C:3
    private Lock lock = new ReentrantLock();
    Condition c1 = lock.newCondition();
    Condition c2 = lock.newCondition();
    Condition c3 = lock.newCondition();

    public void print(int current) {
        lock.lock();
        try {
            while (number != current) {
                switch (current) {
                    case 1:
                        c1.await();
                        break;
                    case 2:
                        c2.await();
                        break;
                    case 3:
                        c3.await();
                        break;
                }
            }
            //干活
            for (int i = 1; i <= 5 * current; i++) {
                System.out.println(Thread.currentThread().getName() + "::" + i);
            }
            //改变标志位
            number = current == 3 ? 1 : current + 1;
            //通知下一个线程
            switch (current) {
                case 1:
                    c2.signal();
                    break;
                case 2:
                    c3.signal();
                    break;
                case 3:
                    c1.signal();
                    break;
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }

    }

}
A::1
A::2
A::3
A::4
A::5
B::1
B::2
B::3
B::4
B::5
B::6
B::7
B::8
B::9
B::10
C::1
C::2
C::3
C::4


.......

阻塞队列线程交互

public class ProdConsumerQueueDemo {
    public static void main(String[] args) {
        ShareResource shareResource = new ShareResource(new ArrayBlockingQueue<>(5));
        new Thread(() -> {
            try {
                shareResource.consumer();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, "consumer").start();
        new Thread(() -> {
            try {
                shareResource.producer();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, "producer").start();
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        shareResource.stop(); //停止任务队列
    }
}

class ShareResource {
    private volatile boolean FLAG = true;
    private AtomicInteger atomicInteger = new AtomicInteger();
    private BlockingQueue<String> queue;

    public ShareResource(BlockingQueue<String> queue) {
        this.queue = queue;
        System.out.println(queue.getClass().getName());
    }

    public void producer() throws Exception {
        String data; //队列数据
        boolean offer; //是否成功
        while (FLAG) {
            data = atomicInteger.incrementAndGet() + "";
            offer = queue.offer(data, 2L, TimeUnit.SECONDS); //等待超时2s
            if (offer) {
                System.out.println(Thread.currentThread().getName() + "\t插入队列::" + data + " 成功");
            } else {
                System.out.println(Thread.currentThread().getName() + "\t插入队列::" + data + " 失败");
            }
            TimeUnit.SECONDS.sleep(1); //间隔1秒插入队列
        }
        System.out.println(Thread.currentThread().getName() + "\t停止生产");
    }

    public void consumer() throws Exception {
        while (FLAG) {
            String poll = queue.poll(2L, TimeUnit.SECONDS); //获取队列数据, 等待两秒钟
            if (null != poll && !"".equals(poll)) {
                System.out.println(Thread.currentThread().getName() + "\t消费队列::" + poll);
            } else {
                FLAG = false;
                System.out.println(Thread.currentThread().getName() + "\t消费队列超时");
                return; //退出循环
            }
        }
    }
    public void stop() {
        FLAG = false;
    }
}
java.util.concurrent.ArrayBlockingQueue
producer	插入队列::1 成功
consumer	消费队列::1
producer	插入队列::2 成功
consumer	消费队列::2
producer	插入队列::3 成功
consumer	消费队列::3
producer	插入队列::4 成功
consumer	消费队列::4
producer	插入队列::5 成功
consumer	消费队列::5
producer	停止生产
consumer	消费队列超时

Callable

a等b计算完成

public class CallableDemo {
    public static void main(String[] args) {
        FutureTask<Integer> task = new FutureTask<>(new MyThread());
        new Thread(task, "A").start();
        try {
            int a = 6;
            System.out.println("a已计算完成 >>> " + a);
            while (!task.isDone()) {
                TimeUnit.SECONDS.sleep(1);
            }
            Integer b = task.get();
            System.out.println("b已计算完成 >>> " + b);
            System.out.println("计算b+a= " +(b + a));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}

class MyThread implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        TimeUnit.SECONDS.sleep(3);
        return 3;
    }
}

线程池

主要就是控制运行的线程数量, 处理过程中将任务放入队列, 然后在线程创建后启动这些任务, 如果线程数量超过了最大数量, 则排队等候, 等其它线程执行完毕, 再从队列中取出任务来执行

  • Executors.newFixedThreadPool(int)
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • Executors.newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  • Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

七大参数

参数 说明 银行
corePoolSize 线程池中的常驻核心线程数 当值窗口,假设2
maximumPoolSize 线程池能够容纳同时执行的最大线程数,此值必须大于等于1 银行最大窗口,假设5(候客区满了,开设加班窗口,再来的人加塞处理,等候的继续等候)
keepAliveTime 多余的空闲线程的存活时间。
当前线程池数量超过corePoolSizel时,当空闲时间达到 keepAliveTime 值时,
多余空闲线程会被销毁直到只剩下 corePoolSize 个线程为止
空闲下来后,加班窗口下班
unit keepAliveTime的单位
workQueue 任务队列,被提交但尚未被执行的任务 候客区
threadFactory 表示生成线程池中工作线程的线程工厂 工作人员的制服/胸卡,一般用默认
handler 拒绝策略,表示当队列满了并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时 人员已满

拒绝策略

拒绝策略 说明
AbortPolicy(默认) 直接抛出 RejectedExecutionException 异常组织系统正常运行
CallerRunsPolicy "调用者运行"一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新的任务流量 谁调用我,就找谁执行
DiscardOldestPolicy 抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务 丢老的
DiscardPolicy 直接丢弃任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种方案 丢新的

自定义线程池Demo

public class ThreadPoolDemo {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                5,
                1,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy()
                );
        for (int i = 0; i < 9; i++) {
            threadPoolExecutor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "::办理业务");
            });
        }
    }
}
pool-1-thread-3::办理业务
pool-1-thread-2::办理业务
pool-1-thread-3::办理业务
pool-1-thread-4::办理业务
pool-1-thread-1::办理业务
pool-1-thread-5::办理业务
pool-1-thread-3::办理业务
pool-1-thread-2::办理业务

最大线程数参数参考

  • CPU密集型:CPU核心数+1

  • IO密集型:

    • CPU核心数 * 2
    • CPU核数 / (1- 阻塞系数) 阻塞系数一般0.8-0.9

死锁Demo

class ShareData implements Runnable{
    private String lockA;
    private String lockB;

    public ShareData(String lockA, String lockB) {
        this.lockA = lockA;
        this.lockB = lockB;
    }
    @Override
    public void run() {
        synchronized (lockA) {
            System.out.println("get::" + lockA);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            synchronized (lockB) {
                System.out.println("get::" + lockB);
            }
        }
    }
}

public class DeadLockDemo {
    public static void main(String[] args) {
        String lockA = "lockA";
        String lockB = "lockB";
        new Thread(new ShareData(lockA, lockB), "t1").start();
        new Thread(new ShareData(lockB, lockA), "t2").start();
    }
}

console

get::lockA
get::lockB

排查死锁

jps + jstatck

Found one Java-level deadlock:
=============================
"t2":
  waiting to lock monitor 0x000000001d1e2f98 (object 0x000000076e0bf650, a java.lang.String),
  which is held by "t1"
"t1":
  waiting to lock monitor 0x000000001d1e5618 (object 0x000000076e0bf688, a java.lang.String),
  which is held by "t2"

Java stack information for the threads listed above:
===================================================
"t2":
        at cn.itcast.interview.thread_pool.ShareData.run(DeadLockDemo.java:28)
        - waiting to lock <0x000000076e0bf650> (a java.lang.String)
        - locked <0x000000076e0bf688> (a java.lang.String)
        at java.lang.Thread.run(Thread.java:748)
"t1":
        at cn.itcast.interview.thread_pool.ShareData.run(DeadLockDemo.java:28)
        - waiting to lock <0x000000076e0bf688> (a java.lang.String)
        - locked <0x000000076e0bf650> (a java.lang.String)
        at java.lang.Thread.run(Thread.java:748)

Found 1 deadlock.

文章作者: 👑Dee👑
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 👑Dee👑 !
  目录