Java线程类1
一、java.util.concurrent.CountDownLatch
通常线程是并发运行,并且不容易预测到哪个线程先执行,哪些后执行,所以通常在执行顺序上是相当公平的。但是,因为创建对象或者执行某些操作总是需要一定的时间,所以还是很难保证真正的公平。这时就需要某种机制来控制线程的运行时机。就比如说赛跑,必须等待所有的选手都已经站在起跑线上才能开始,而java.util.concurrent.CountDownLatch就是这样一个控制器,用于保证所有选手都在起跑线上。
例如以下程序:
for(int i = 0;i< 10;i++){ new Thread("Thread "+i){ public void run() { System.out.println(getName()+" started"); } }.start(); Thread.sleep(50); }
如果没有某种保证机制,那基本上可以预测结果如下:
Thread 0 started Thread 1 started Thread 2 started Thread 3 started Thread 4 started Thread 5 started Thread 6 started Thread 7 started Thread 8 started Thread 9 started
因为后面的线程还没开始创建,前面的线程已经启跑了。
为了让他们在同一时间开跑,而不管创建的时机,可以如下:
//创建CountDownLatch,并指定countDown必须被调用10次 final CountDownLatch countDownLatch = new CountDownLatch(10); for(int i = 0;i< 10;i++){ new Thread("Thread "+i){ public void run() { try { //在继续运行之前等待条件满足,也即所有的线程都准备好 countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(getName()+" started"); } }.start(); Thread.sleep(50); //每创建完一个线程,调用一次countDown()方法,当高用到10次时,即条件满足 countDownLatch.countDown(); }
下面是某次运行的结果:
Thread 0 started Thread 8 started Thread 7 started Thread 6 started Thread 5 started Thread 1 started Thread 3 started Thread 2 started Thread 4 started Thread 9 started
同上面的结果比较,可以看出,结果基本已经是不可测的了。并且,所以的线程会在等待了差不多半秒之后同步运行。
简单的说 java.util.concurrent.CountDownLatch 就是一个倒数器,倒数到 0 后就开始执行所有等待的操作。
二、java.util.concurrent.CyclicBarrier
上面的 java.util.concurrent.CountDownLatch 可以用于控制线程同时执行,而 java.util.concurrent.CyclicBarrier 则可以用来控制线程收尾工作。例如,还是上面那个线程,如果我们想在所有的线程执行完后,再打印一下 Done ,则可以如下实现:
final CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Runnable() { @Override public void run() { System.out.println("Done"); } }); for(int i = 0; i<10;i++){ new Thread("Thread "+i){ public void run() { System.out.println(getName()); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }.start(); }
运行结果如下:
Thread 0 Thread 1 Thread 5 Thread 3 Thread 6 Thread 4 Thread 7 Thread 8 Thread 2 Thread 9 Done
根据提示可以看到,它可以用于做收尾工作,比如说我们有一个查找程序,其中使用了多线程去分别查找,然后可以最后使用 CyclicBarrier 来将结果合并(当然也有其他的方法,这里只是说它可以做这件事)。
三、java.util.concurrent.Semaphore
信号量用于控制同时访问某一资源的线程数,例如系统上的某个文件,只想被最多5个线程同时使用,就可以用信号量在控制了。每次同过 acquire() 方法,获取访问权限,同时信号量减1,当信号量减为0时则不能有新的线程再获得权限;用release()方法释放权限,信号量加1,则可以有新的线程获得权限,例如:
final Semaphore semaphore = new Semaphore(2, true); for(int i = 0;i<5;i++){ new Thread("Thread "+i){ public void run() { try { String threadName = getName(); System.out.println(threadName+"\t is waiting"); semaphore.acquire(); System.out.println(threadName+"\t is running"); sleep(30000); System.out.println(threadName+"\t is finished"); } catch (InterruptedException e) { e.printStackTrace(); }finally{ semaphore.release(); } } }.start(); }
某次运行的结果:
Thread 0 is waiting Thread 1 is waiting Thread 1 is running Thread 3 is waiting Thread 4 is waiting Thread 0 is running Thread 2 is waiting
可以看到,5个线程都启动了,但是只有2个线程进入了running状态,其他3个处于waiting状态,30秒之后,先前2个running状态的线程finish了之后,剩下其中2个等待的线程才有机会开始进入运行状态:
Thread 1 is finished Thread 0 is finished Thread 3 is running Thread 4 is running
四、java.util.concurrent.Phaser
Java 7中新增了Phaser类,它有点类似于上面介绍的CyclicBarrier和CountDownLatch,但是它支持多阶段的运行/等待,并且支持增加移除观察者。比如说某个操作需要3步完成,其中每一步都需要多个线程配合:
final Phaser phaser = new Phaser(5); for(int i = 0;i<5;i++){ new Thread("Thread "+i){ public void run() { System.out.println(getName()+" are ready"); phaser.arriveAndAwaitAdvance(); System.out.println(getName()+" finished phase1"); phaser.arriveAndAwaitAdvance(); System.out.println(getName()+" finished phase2"); } }.start(); }
某次运行的结果可能如下:
Thread 0 are ready Thread 2 are ready Thread 1 are ready Thread 3 are ready Thread 4 are ready Thread 4 finished phase1 Thread 1 finished phase1 Thread 3 finished phase1 Thread 0 finished phase1 Thread 2 finished phase1 Thread 2 finished phase2 Thread 1 finished phase2 Thread 3 finished phase2 Thread 0 finished phase2 Thread 4 finished phase2
也可以像CyclicBarrier一样,当满足某个条件时才开始工作,例如:
final Phaser phaser = new Phaser(5); phaser.register(); for(int i = 0;i<5;i++){ new Thread("Thread "+i){ public void run() { System.out.println(getName()+" are ready"); phaser.arriveAndAwaitAdvance(); System.out.println(getName()+" finished phase1"); phaser.arriveAndAwaitAdvance(); System.out.println(getName()+" finished phase2"); } }.start(); } System.out.println("Wait for a moment before start"); Thread.sleep(2000); phaser.arriveAndDeregister();
一开始可能输出:
Thread 0 are ready Thread 2 are ready Thread 1 are ready Wait for a moment before start Thread 3 are ready Thread 4 are ready
2秒后接着输出:
Thread 3 finished phase1 Thread 0 finished phase1 Thread 2 finished phase1 Thread 4 finished phase1 Thread 1 finished phase1 Thread 1 finished phase2 Thread 3 finished phase2 Thread 4 finished phase2 Thread 2 finished phase2 Thread 0 finished phase2
可以通过重写它的onAdvance()方法来实现在每个阶段结束后执行某个自定义的操作:
final Phaser phaser = new Phaser(5){ @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("============="+phase+"============="); return super.onAdvance(phase, registeredParties); } }; phaser.register(); for(int i = 0;i<5;i++){ new Thread("Thread "+i){ public void run() { System.out.println(getName()+" are ready"); phaser.arriveAndAwaitAdvance(); System.out.println(getName()+" finished phase1"); phaser.arriveAndAwaitAdvance(); System.out.println(getName()+" finished phase2"); } }.start(); } System.out.println("Wait for a moment before start"); Thread.sleep(2000); phaser.arriveAndDeregister();
这里在每个阶段的条件满足后插入一段分隔符,输出结果可能为:
Thread 0 are ready Thread 2 are ready Thread 1 are ready Wait for a moment before start Thread 4 are ready Thread 3 are ready =============0============= Thread 3 finished phase1 Thread 1 finished phase1 Thread 2 finished phase1 Thread 0 finished phase1 Thread 4 finished phase1 =============1============= Thread 4 finished phase2 Thread 3 finished phase2 Thread 2 finished phase2 Thread 0 finished phase2 Thread 1 finished phase2