JAVA 多线程(10):join 的哥们和朋友 countDownLatch、CyclicBarrier、Semaphore、Exchanger

Join 方法可以使当前线程等待子线程,如果子线程未结束,则会一致处在wait状态。

因为其内部是通过wait 方法实现的,当执行完毕后会调用notifyAll 释放锁。

CountDownLatch 允许一个或多个线程等待其他线程完成操作,相比join ,能做的事情更多。

private static CountDownLatch countDownLatch = new CountDownLatch(2);
    public static void main(String[] args){
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("begin");
                countDownLatch.countDown();
                System.out.println("middle");
                countDownLatch.countDown();
            }
        });
        t.start();
        try {
            countDownLatch.await();
            System.out.println("end");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

输出:

JAVA 多线程(10):join 的哥们和朋友 countDownLatch、CyclicBarrier、Semaphore、Exchanger

由结果看出,实现的效果与join相同。

对比join ,其构造函数有一个int参数,表示计数器,可以手动控制其需要等待的次数。每次调用countDown 会减去1,其wait方法会一直等待计数器变成0。

用法更加灵活,计数参数N 可以代表N个线程、N个步骤,总之可以*的控制。

如果调用await(long time,TimeUnit unit)方法等待,那么当前线程在等待一定时间后就不会再做等待,而是继续执行当前线程(需要注意的地方是,此时子线程还是在执行中的)

Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("begin");
                countDownLatch.countDown();
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("middle");
                countDownLatch.countDown();
            }
        });
        t.start();
        try {
            countDownLatch.await(2, TimeUnit.SECONDS);
            System.out.println("end");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

输出:

JAVA 多线程(10):join 的哥们和朋友 countDownLatch、CyclicBarrier、Semaphore、Exchanger

 结果看出,主线程main在等待2秒后,发现子线程还是没有执行完毕,则继续执行,此时子线程并没有关闭,所以在等待到达5秒后,继续执行子线程输出middle。

 CyclicBarrier 同步屏障

说明:中文意思为循环的屏障,要求指定线程到达屏障点,会可继续执行,如:

private static CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
    public static void main(String[] args){
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"begin");
                try {
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "end");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        };
        Thread t = new Thread(runnable,"A");
        Thread t2 = new Thread(runnable,"B");
        t.start();
        t2.start();

        try {
            cyclicBarrier.await();
            System.out.println("main end");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

输出:

JAVA 多线程(10):join 的哥们和朋友 countDownLatch、CyclicBarrier、Semaphore、Exchanger

 由结果看出,因为屏障点设置的数量为4,实际上执行了await方法(调用一次则减一)只有3个,2个子线程和一个main线程,所以大家都没得玩了,如果把计算点改为3,就正常了(当计数器为0时,会通知所有阻塞的线程可以继续执行了)

如果计算器过小,比如有3个线程调用了wait,而计数器设置为2,那么前面2个先执行wait的线程会停止阻塞,计算器又会从2开始计算,也就是说,还需要一个线程调用await来释放它,如下:

private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
    public static void main(String[] args){
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"begin");
                try {
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "end");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        };
        Thread t = new Thread(runnable,"A");
        Thread t2 = new Thread(runnable,"B");
        Thread t3 = new Thread(runnable,"C");
        t.start();
        t2.start();
        t3.start();
        try {
            cyclicBarrier.await();
            System.out.println("main end");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

输出

JAVA 多线程(10):join 的哥们和朋友 countDownLatch、CyclicBarrier、Semaphore、Exchanger

关于CyclicBarrier 的构造函数还有一种用法,就是优先执行,也就是说如果我们设置计算器为2,在线程A与线程B调用await后,在停止阻塞AB之前会优先执行默认的线程,如下:

private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
        @Override
        public void run() {
            System.out.println("先执行我");
        }
    });
    public static void main(String[] args){
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"begin");
                try {
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "end");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        };
        Thread t = new Thread(runnable,"A");
        t.start();
        try {
            cyclicBarrier.await();
            System.out.println("main end");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

输出:

JAVA 多线程(10):join 的哥们和朋友 countDownLatch、CyclicBarrier、Semaphore、Exchanger

A线程先执行了输出了begin,然后到达屏障开始等待,此时主线程也同时调用了await,计算器的值2 - 1 - 1 = 0 ,准备释放A 和main,但是要先执行第二个参数线程,所以会先输出“先执行我”,然后A和main线程从block状态转换为runable 状态。

场景:计算多个线程的结果,利用第二个参数(线程)。如:

private CyclicBarrier barrier = new CyclicBarrier(4,this);

    private Executor executor = Executors.newFixedThreadPool(4);

    // 保存
    private ConcurrentMap<String,Integer> map = new ConcurrentHashMap<>();

    private void count(){
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // 记入运算结果
                map.put(Thread.currentThread().getName(),1);
                try {
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 4; i++) {
            executor.execute(runnable);
        }
    }

    @Override
    public void run() {
        int result = 0;
        for (Map.Entry<String,Integer> entry:map.entrySet()) {
            result += entry.getValue();
        }
        //
        map.put("result",result);
        System.out.println(result);
    }

    public static void main(String[] args){
        TestCount testCount = new TestCount();
        testCount.count();
    }

输出:

JAVA 多线程(10):join 的哥们和朋友 countDownLatch、CyclicBarrier、Semaphore、Exchanger

结果输出4。

CountDownLatch 与CyclicBarrier:

CyclicBarrier 对比CountDownLatch 还可以调用reset方法重置。,让线程重新执行一次。


Semaphore:信号

可以控制并发量工具,比如有100个连接需要获取数据库连接,保存数据,但是数据库连接池最大连接数为10,那么可以通过这个工具类来控制,如下:

private static final int COUNT = 100;

    private static ExecutorService threadPool = Executors.newFixedThreadPool(COUNT);

    private static Semaphore semaphore = new Semaphore(10);

    public  static  void main(String[] args){
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire();
                    System.out.println("保存数据");
                    Thread.sleep(2000);
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < COUNT; i++) {
            threadPool.execute(runnable);
        }

        threadPool.shutdown();
    }

输出:
JAVA 多线程(10):join 的哥们和朋友 countDownLatch、CyclicBarrier、Semaphore、Exchanger

 在代码中执行会发现,每次只有10个线程在执行,这就是信号灯的作用~ acquire 方法意为获取许可证,release方法意为归还许可证。

Exchanger:

线程之间交换数据彼此的数据,比如A线程执行到指定同步点后会等待B线程,当B线程也到达时,交换数据,然后各自拜拜~

private static Exchanger<String> exchanger = new Exchanger<>();

    private static ExecutorService pool = Executors.newFixedThreadPool(2);

    public static void main(String[] args){
        pool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String what = exchanger.exchange("钞票");
                    System.out.println("我是线程A,我拿到了"+what);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        pool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String what = exchanger.exchange("香烟");
                    System.out.println("我是线程B,我拿到了"+what);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        pool.shutdown();
    }

输出:

JAVA 多线程(10):join 的哥们和朋友 countDownLatch、CyclicBarrier、Semaphore、Exchanger

 由结果看出,线程A和线程B各自拿到了自己想要的东西~~

 总结:平时使用的最多的应该是CountDownLatch,因为使用场景的关系,用它用的会比较多,这几个工具类各有特点~

像是CyclicBarrier 在执行一些大型计算的时候也许会用到,Semaphore对于一些共享资源的控制,Exchanger对于需要交换信息比较合适,一般是不同的事情并行处理会比较好,而且是必须要交换数据。

对于像是解析excel,json文件执行导入数据的操作个人认为使用CountDownLatch 就足够了~~ 哈哈