java.util.concurrent鍖呭簲鐢
java.util.concurrent鍖呭簲鐢?
Executor聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽 锛氬叿浣揜unnable浠诲姟鐨勬墽琛岃€呫€?br>ExecutorService聽聽聽聽聽聽聽聽聽 锛氫竴涓嚎绋嬫睜绠$悊鑰咃紝鍏跺疄鐜扮被鏈夊绉嶏紝鎴戜細浠嬬粛涓€閮ㄥ垎銆傛垜浠兘鎶奟unnable,Callable鎻愪氦鍒版睜涓鍏惰皟搴︺€?br>Semaphore聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽 锛氫竴涓鏁颁俊鍙烽噺
ReentrantLock聽聽聽聽聽聽聽聽聽聽聽 锛氫竴涓彲閲嶅叆鐨勪簰鏂ラ攣瀹?Lock锛屽姛鑳界被浼約ynchronized锛屼絾瑕佸己澶х殑澶氥€?br>Future聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽 锛氭槸涓嶳unnable,Callable杩涜浜や簰鐨勬帴鍙o紝姣斿涓€涓嚎绋嬫墽琛岀粨鏉熷悗鍙栬繑鍥炵殑缁撴灉绛夌瓑锛岃繕鎻愪緵浜哻ancel缁堟绾跨▼銆?br>BlockingQueue聽聽聽聽聽聽聽聽聽聽聽 锛氶樆濉為槦鍒椼€?br>CompletionService聽聽聽聽聽聽聽 : ExecutorService鐨勬墿灞曪紝鍙互鑾峰緱绾跨▼鎵ц缁撴灉鐨?br>CountDownLatch聽聽聽聽聽聽聽聽聽聽 锛氫竴涓悓姝ヨ緟鍔╃被锛屽湪瀹屾垚涓€缁勬鍦ㄥ叾浠栫嚎绋嬩腑鎵ц鐨勬搷浣滀箣鍓嶏紝瀹冨厑璁镐竴涓垨澶氫釜绾跨▼涓€鐩寸瓑寰呫€?
CyclicBarrier聽聽聽聽聽聽聽聽聽聽聽 锛氫竴涓悓姝ヨ緟鍔╃被锛屽畠鍏佽涓€缁勭嚎绋嬩簰鐩哥瓑寰咃紝鐩村埌鍒拌揪鏌愪釜鍏叡灞忛殰鐐?
Future聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽 锛欶uture 琛ㄧず寮傛璁$畻鐨勭粨鏋溿€?br>ScheduledExecutorService 锛氫竴涓?ExecutorService锛屽彲瀹夋帓鍦ㄧ粰瀹氱殑寤惰繜鍚庤繍琛屾垨瀹氭湡鎵ц鐨勫懡浠ゃ€?/p>
鎺ヤ笅鏉ラ€愪竴浠嬬粛
Executors涓昏鏂规硶璇存槑 newCachedThreadPool锛堟棤鐣岀嚎绋嬫睜锛屽彲浠ヨ繘琛岃嚜鍔ㄧ嚎绋嬪洖鏀讹級 newSingleThreadExecutor锛堝崟涓悗鍙扮嚎绋嬶級 杩欎簺鏂规硶杩斿洖鐨勯兘鏄疎xecutorService瀵硅薄锛岃繖涓璞″彲浠ョ悊瑙d负灏辨槸涓€涓嚎绋嬫睜銆?br>杩欎釜绾跨▼姹犵殑鍔熻兘杩樻槸姣旇緝瀹屽杽鐨勩€傚彲浠ユ彁浜や换鍔ubmit()鍙互缁撴潫绾跨▼姹爏hutdown()銆?/p>
import java.util.concurrent.ExecutorService; public class MyExecutor extends Thread { 铏界劧鎵撳嵃浜嗕竴浜涗俊鎭紝浣嗘槸鐪嬬殑涓嶆槸闈炲父娓呮櫚锛岃繖涓嚎绋嬫睜鏄浣曞伐浣滅殑锛屾垜浠潵灏嗕紤鐪犵殑鏃堕棿璋冮暱10鍊嶃€?br>Thread.sleep((int)(Math.random()*10000)); 鍐嶆潵鐪嬶紝浼氭竻妤氱湅鍒板彧鑳芥墽琛?涓嚎绋嬨€傚綋鎵ц瀹屼竴涓嚎绋嬪悗锛屾墠浼氬張鎵ц涓€涓柊鐨勭嚎绋嬶紝涔熷氨鏄锛屾垜浠皢鎵€鏈夌殑绾跨▼鎻愪氦鍚庯紝绾跨▼姹犱細绛夊緟鎵ц瀹屾渶鍚巗hutdown銆傛垜浠篃浼氬彂鐜帮紝鎻愪氦鐨勭嚎绋嬭鏀惧埌涓€涓€滄棤鐣岄槦鍒楅噷鈥濄€傝繖鏄竴涓湁搴忛槦鍒楋紙BlockingQueue锛岃繖涓笅闈細璇村埌锛夈€?/p>
鍙﹀瀹冧娇鐢ㄤ簡Executors鐨勯潤鎬佸嚱鏁扮敓鎴愪竴涓浐瀹氱殑绾跨▼姹狅紝椤惧悕鎬濅箟锛岀嚎绋嬫睜鐨勭嚎绋嬫槸涓嶄細閲婃斁鐨勶紝鍗充娇瀹冩槸Idle銆?br>杩欏氨浼氫骇鐢熸€ц兘闂锛屾瘮濡傚鏋滅嚎绋嬫睜鐨勫ぇ灏忎负200锛屽綋鍏ㄩ儴浣跨敤瀹屾瘯鍚庯紝鎵€鏈夌殑绾跨▼浼氱户缁暀鍦ㄦ睜涓紝鐩稿簲鐨勫唴瀛樺拰绾跨▼鍒囨崲锛坵hile(true)+sleep寰幆锛夐兘浼氬鍔犮€?br>濡傛灉瑕侀伩鍏嶈繖涓棶棰橈紝灏卞繀椤荤洿鎺ヤ娇鐢═hreadPoolExecutor()鏉ユ瀯閫犮€傚彲浠ュ儚閫氱敤鐨勭嚎绋嬫睜涓€鏍疯缃€滄渶澶х嚎绋嬫暟鈥濄€佲€滄渶灏忕嚎绋嬫暟鈥濆拰鈥滅┖闂茬嚎绋媖eepAlive鐨勬椂闂粹€濄€?/p>
Semaphore 閫氬父鐢ㄤ簬闄愬埗鍙互璁块棶鏌愪簺璧勬簮锛堢墿鐞嗘垨閫昏緫鐨勶級鐨勭嚎绋嬫暟鐩€備緥濡傦紝涓嬮潰鐨勭被浣跨敤淇″彿閲忔帶鍒跺鍐呭姹犵殑璁块棶锛?/p>
杩欓噷鏄竴涓疄闄呯殑鎯呭喌锛屽ぇ瀹舵帓闃熶笂鍘曟墍锛屽帟鎵€鍙湁涓や釜浣嶇疆锛屾潵浜?0涓汉闇€瑕佹帓闃熴€?/p>
import java.util.concurrent.ExecutorService; public class MySemaphore extends Thread { 聽 ReentrantLock ReentrantLock 灏嗙敱鏈€杩戞垚鍔熻幏寰楅攣瀹氾紝骞朵笖杩樻病鏈夐噴鏀捐閿佸畾鐨勭嚎绋嬫墍鎷ユ湁銆傚綋閿佸畾娌℃湁琚彟涓€涓嚎绋嬫墍鎷ユ湁鏃讹紝璋冪敤 lock 鐨勭嚎绋嬪皢鎴愬姛鑾峰彇璇ラ攣瀹氬苟杩斿洖銆傚鏋滃綋鍓嶇嚎绋嬪凡缁忔嫢鏈夎閿佸畾锛屾鏂规硶灏嗙珛鍗宠繑鍥炪€傚彲浠ヤ娇鐢?isHeldByCurrentThread() 鍜?getHoldCount() 鏂规硶鏉ユ鏌ユ鎯呭喌鏄惁鍙戠敓銆?/p>
姝ょ被鐨勬瀯閫犳柟娉曟帴鍙椾竴涓彲閫夌殑鍏钩鍙傛暟銆?br>褰撹缃负 true鏃讹紝鍦ㄥ涓嚎绋嬬殑浜夌敤涓嬶紝杩欎簺閿佸畾鍊惧悜浜庡皢璁块棶鏉冩巿浜堢瓑寰呮椂闂存渶闀跨殑绾跨▼銆傚惁鍒欐閿佸畾灏嗘棤娉曚繚璇佷换浣曠壒瀹氳闂『搴忋€?br>涓庨噰鐢ㄩ粯璁よ缃紙浣跨敤涓嶅叕骞抽攣瀹氾級鐩告瘮锛屼娇鐢ㄥ叕骞抽攣瀹氱殑绋嬪簭鍦ㄨ澶氱嚎绋嬭闂椂琛ㄧ幇涓哄緢浣庣殑鎬讳綋鍚炲悙閲忥紙鍗抽€熷害寰堟參锛屽父甯告瀬鍏舵參锛夛紝浣嗘槸鍦ㄨ幏寰楅攣瀹氬拰淇濊瘉閿佸畾鍒嗛厤鐨勫潎琛℃€ф椂宸紓杈冨皬銆備笉杩囪娉ㄦ剰鐨勬槸锛屽叕骞抽攣瀹氫笉鑳戒繚璇佺嚎绋嬭皟搴︾殑鍏钩鎬с€傚洜姝わ紝浣跨敤鍏钩閿佸畾鐨勪紬澶氱嚎绋嬩腑鐨勪竴鍛樺彲鑳借幏寰楀鍊嶇殑鎴愬姛鏈轰細锛岃繖绉嶆儏鍐靛彂鐢熷湪鍏朵粬娲诲姩绾跨▼娌℃湁琚鐞嗗苟涓旂洰鍓嶅苟鏈寔鏈夐攣瀹氭椂銆傝繕瑕佹敞鎰忕殑鏄紝鏈畾鏃剁殑 tryLock 鏂规硶骞舵病鏈変娇鐢ㄥ叕骞宠缃€傚洜涓哄嵆浣垮叾浠栫嚎绋嬫鍦ㄧ瓑寰咃紝鍙璇ラ攣瀹氭槸鍙敤鐨勶紝姝ゆ柟娉曞氨鍙互鑾峰緱鎴愬姛銆?/p>
寤鸿鎬绘槸 绔嬪嵆瀹炶返锛屼娇鐢?try 鍧楁潵璋冪敤 lock锛屽湪涔嬪墠/涔嬪悗鐨勬瀯閫犱腑锛屾渶鍏稿瀷鐨勪唬鐮佸涓嬶細 聽聽 public void m() { 鎴戠殑渚嬪瓙锛?br>import java.util.concurrent.ExecutorService; public class MyReentrantLock extends Thread{ public static void main(String args[]){
newFixedThreadPool锛?/strong>鍥哄畾澶у皬绾跨▼姹狅級
鍒涘缓涓€涓彲閲嶇敤鍥哄畾绾跨▼闆嗗悎鐨勭嚎绋嬫睜锛屼互鍏变韩鐨勬棤鐣岄槦鍒楁柟寮忔潵杩愯杩欎簺绾跨▼锛堝彧鏈夎璇锋眰鐨勮繃鏉ワ紝灏变細鍦ㄤ竴涓槦鍒楅噷绛夊緟鎵ц锛夈€傚鏋滃湪鍏抽棴鍓嶇殑鎵ц鏈熼棿鐢变簬澶辫触鑰屽鑷翠换浣曠嚎绋嬬粓姝紝閭d箞涓€涓柊绾跨▼灏嗕唬鏇垮畠鎵ц鍚庣画鐨勪换鍔★紙濡傛灉闇€瑕侊級銆?/p>
鍒涘缓涓€涓彲鏍规嵁闇€瑕佸垱寤烘柊绾跨▼鐨勭嚎绋嬫睜锛屼絾鏄湪浠ュ墠鏋勯€犵殑绾跨▼鍙敤鏃跺皢閲嶇敤瀹冧滑銆傚浜庢墽琛屽緢澶氱煭鏈熷紓姝ヤ换鍔$殑绋嬪簭鑰岃█锛岃繖浜涚嚎绋嬫睜閫氬父鍙彁楂樼▼搴忔€ц兘銆傝皟鐢?execute 灏嗛噸鐢ㄤ互鍓嶆瀯閫犵殑绾跨▼锛堝鏋滅嚎绋嬪彲鐢級銆傚鏋滅幇鏈夌嚎绋嬫病鏈夊彲鐢ㄧ殑锛屽垯鍒涘缓涓€涓柊绾跨▼骞舵坊鍔犲埌姹犱腑銆傜粓姝㈠苟浠庣紦瀛樹腑绉婚櫎閭d簺宸叉湁 60 绉掗挓鏈浣跨敤鐨勭嚎绋嬨€傚洜姝わ紝闀挎椂闂翠繚鎸佺┖闂茬殑绾跨▼姹犱笉浼氫娇鐢ㄤ换浣曡祫婧愩€傛敞鎰忥紝鍙互浣跨敤 ThreadPoolExecutor 鏋勯€犳柟娉曞垱寤哄叿鏈夌被浼煎睘鎬т絾缁嗚妭涓嶅悓锛堜緥濡傝秴鏃跺弬鏁帮級鐨勭嚎绋嬫睜銆?/p>
鍒涘缓涓€涓娇鐢ㄥ崟涓?worker 绾跨▼鐨?Executor锛屼互鏃犵晫闃熷垪鏂瑰紡鏉ヨ繍琛岃绾跨▼銆傦紙娉ㄦ剰锛屽鏋滃洜涓哄湪鍏抽棴鍓嶇殑鎵ц鏈熼棿鍑虹幇澶辫触鑰岀粓姝簡姝ゅ崟涓嚎绋嬶紝閭d箞濡傛灉闇€瑕侊紝涓€涓柊绾跨▼灏嗕唬鏇垮畠鎵ц鍚庣画鐨勪换鍔★級銆傚彲淇濊瘉椤哄簭鍦版墽琛屽悇涓换鍔★紝骞朵笖鍦ㄤ换鎰忕粰瀹氱殑鏃堕棿涓嶄細鏈夊涓嚎绋嬫槸娲诲姩鐨勩€備笌鍏朵粬绛夋晥鐨?newFixedThreadPool(1) 涓嶅悓锛屽彲淇濊瘉鏃犻渶閲嶆柊閰嶇疆姝ゆ柟娉曟墍杩斿洖鐨勬墽琛岀▼搴忓嵆鍙娇鐢ㄥ叾浠栫殑绾跨▼銆?/p>
import java.util.concurrent.Executors;
private int index;
public MyExecutor(int i){
聽聽 this.index=i;
}
public void run(){
聽聽 try{
聽聽聽 System.out.println("["+this.index+"] start....");
聽聽聽 Thread.sleep((int)(Math.random()*1000));
聽聽聽 System.out.println("["+this.index+"] end.");
聽聽 }
聽聽 catch(Exception e){
聽聽聽 e.printStackTrace();
聽聽 }
}
public static void main(String args[]){
聽聽 ExecutorService service=Executors.newFixedThreadPool(4);
聽聽 for(int i=0;i<10;i++){
聽聽聽 service.execute(new MyExecutor(i));
聽聽聽 //service.submit(new MyExecutor(i));
聽聽 }
聽聽 System.out.println("submit finish");
聽聽 service.shutdown();
}
}
杩欎釜灏辨槸绾跨▼姹犲熀鏈敤娉曘€?br>
Semaphore
涓€涓鏁颁俊鍙烽噺銆備粠姒傚康涓婅锛屼俊鍙烽噺缁存姢浜嗕竴涓鍙泦鍚堛€傚鏈夊繀瑕侊紝鍦ㄨ鍙彲鐢ㄥ墠浼氶樆濉炴瘡涓€涓?acquire()锛岀劧鍚庡啀鑾峰彇璇ヨ鍙€傛瘡涓?release() 娣诲姞涓€涓鍙紝浠庤€屽彲鑳介噴鏀句竴涓鍦ㄩ樆濉炵殑鑾峰彇鑰呫€備絾鏄紝涓嶄娇鐢ㄥ疄闄呯殑璁稿彲瀵硅薄锛孲emaphore 鍙鍙敤璁稿彲鐨勫彿鐮佽繘琛岃鏁帮紝骞堕噰鍙栫浉搴旂殑琛屽姩銆?/p>
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
Semaphore position;
private int id;
public MySemaphore(int i,Semaphore s){
聽聽 this.id=i;
聽聽 this.position=s;
}
public void run(){
聽聽 try{
聽聽聽 if(position.availablePermits()>0){
聽聽聽聽 System.out.println("椤惧["+this.id+"]杩涘叆鍘曟墍锛屾湁绌轰綅");
聽聽聽 }
聽聽聽 else{
聽聽聽聽 System.out.println("椤惧["+this.id+"]杩涘叆鍘曟墍锛屾病绌轰綅锛屾帓闃?);
聽聽聽 }
聽聽聽 position.acquire();
聽聽聽 System.out.println("椤惧["+this.id+"]鑾峰緱鍧戜綅");
聽聽聽 Thread.sleep((int)(Math.random()*1000));
聽聽聽 System.out.println("椤惧["+this.id+"]浣跨敤瀹屾瘯");
聽聽聽 position.release();
聽聽 }
聽聽 catch(Exception e){
聽聽聽 e.printStackTrace();
聽聽 }
}
public static void main(String args[]){
聽聽 ExecutorService list=Executors.newCachedThreadPool();
聽聽 Semaphore position=new Semaphore(2);
聽聽 for(int i=0;i<10;i++){
聽聽聽 list.submit(new MySemaphore(i+1,position));
聽聽 }
聽聽 list.shutdown();
聽聽 position.acquireUninterruptibly(2);
聽聽 System.out.println("浣跨敤瀹屾瘯锛岄渶瑕佹竻鎵簡");
聽聽 position.release(2);
}
}
涓€涓彲閲嶅叆鐨勪簰鏂ラ攣瀹?Lock锛屽畠鍏锋湁涓庝娇鐢?synchronized 鏂规硶鍜岃鍙ユ墍璁块棶鐨勯殣寮忕洃瑙嗗櫒閿佸畾鐩稿悓鐨勪竴浜涘熀鏈涓哄拰璇箟锛屼絾鍔熻兘鏇村己澶с€?/p>
class X {
聽聽 private final ReentrantLock lock = new ReentrantLock();
聽聽 // ...
聽聽聽聽 lock.lock(); // block until condition holds
聽聽聽聽 try {
聽聽聽聽聽聽 // ... method body
聽聽聽聽 } finally {
聽聽聽聽聽聽 lock.unlock()
聽聽聽聽 }
聽聽 }
}
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
TestReentrantLock lock;
private int id;
public MyReentrantLock(int i,TestReentrantLock test){
聽聽 this.id=i;
聽聽 this.lock=test;
}
public void run(){
聽聽 lock.print(id);
}
聽聽 ExecutorService service=Executors.newCachedThreadPool();
聽聽 TestReentrantLock lock=new TestReentrantLock();
聽聽 for(int i=0;i<10;i++){
聽聽聽 service.submit(new MyReentrantLock(i,lock));
聽聽 }
聽聽 service.shutdown();
}
}
class TestReentrantLock{
private ReentrantLock lock=new ReentrantLock();
public void print(int str){
聽聽 try{
聽聽聽 lock.lock();
聽聽聽 System.out.println(str+"鑾峰緱");
聽聽聽 Thread.sleep((int)(Math.random()*1000));
聽聽 }
聽聽 catch(Exception e){
聽聽聽 e.printStackTrace();
聽聽 }
聽聽 finally{
聽聽聽 System.out.println(str+"閲婃斁");
聽聽聽 lock.unlock();
聽聽 }
}
}