Java澶氱嚎绋?闃诲闃熷垪BlockingQueue

Java澶氱嚎绋?闃诲闃熷垪BlockingQueue .
  • 鍓嶈█锛?/strong>聽聽聽聽聽鍦ㄦ柊澧炵殑Concurrent鍖呬腑锛孊lockingQueue寰堝ソ鐨勮В鍐充簡澶氱嚎绋嬩腑锛屽浣曢珮鏁堝畨鍏ㄢ€滀紶杈撯€濇暟鎹殑闂銆傞€氳繃杩欎簺楂樻晥骞朵笖绾跨▼瀹夊叏鐨勯槦鍒楃被锛屼负鎴戜滑蹇€熸惌寤洪珮璐ㄩ噺鐨勫绾跨▼绋嬪簭甯︽潵鏋佸ぇ鐨勪究鍒┿€傛湰鏂囪缁嗕粙缁嶄簡BlockingQueue瀹跺涵涓殑鎵€鏈夋垚鍛橈紝鍖呮嫭浠栦滑鍚勮嚜鐨勫姛鑳戒互鍙婂父瑙佷娇鐢ㄥ満鏅€?br>Java澶氱嚎绋?闃诲闃熷垪BlockingQueue

聽聽聽

浠庝笂鍥炬垜浠彲浠ュ緢娓呮鐪嬪埌锛岄€氳繃涓€涓叡浜殑闃熷垪锛屽彲浠ヤ娇寰楁暟鎹敱闃熷垪鐨勪竴绔緭鍏ワ紝浠庡彟澶栦竴绔緭鍑猴紱甯哥敤鐨勯槦鍒椾富瑕佹湁浠ヤ笅涓ょ锛氾紙褰撶劧閫氳繃涓嶅悓鐨勫疄鐜版柟寮忥紝杩樺彲浠ュ欢浼稿嚭寰堝涓嶅悓绫诲瀷鐨勯槦鍒楋紝DelayQueue灏辨槸鍏朵腑鐨勪竴绉嶏級聽聽鍏堣繘鍏堝嚭锛團IFO锛夛細鍏堟彃鍏ョ殑闃熷垪鐨勫厓绱犱篃鏈€鍏堝嚭闃熷垪锛岀被浼间簬鎺掗槦鐨勫姛鑳姐€備粠鏌愮绋嬪害涓婃潵璇磋繖绉嶉槦鍒椾篃浣撶幇浜嗕竴绉嶅叕骞虫€с€偮犅犲悗杩涘厛鍑猴紙LIFO锛夛細鍚庢彃鍏ラ槦鍒楃殑鍏冪礌鏈€鍏堝嚭闃熷垪锛岃繖绉嶉槦鍒椾紭鍏堝鐞嗘渶杩戝彂鐢熺殑浜嬩欢銆偮犅犅犅犅犅犲绾跨▼鐜涓紝閫氳繃闃熷垪鍙互寰堝鏄撳疄鐜版暟鎹叡浜紝姣斿缁忓吀鐨勨€滅敓浜ц€呪€濆拰鈥滄秷璐硅€呪€濇ā鍨嬩腑锛岄€氳繃闃熷垪鍙互寰堜究鍒╁湴瀹炵幇涓よ€呬箣闂寸殑鏁版嵁鍏变韩銆傚亣璁炬垜浠湁鑻ュ共鐢熶骇鑰呯嚎绋嬶紝鍙﹀鍙堟湁鑻ュ共涓秷璐硅€呯嚎绋嬨€傚鏋滅敓浜ц€呯嚎绋嬮渶瑕佹妸鍑嗗濂界殑鏁版嵁鍏变韩缁欐秷璐硅€呯嚎绋嬶紝鍒╃敤闃熷垪鐨勬柟寮忔潵浼犻€掓暟鎹紝灏卞彲浠ュ緢鏂逛究鍦拌В鍐充粬浠箣闂寸殑鏁版嵁鍏变韩闂銆備絾濡傛灉鐢熶骇鑰呭拰娑堣垂鑰呭湪鏌愪釜鏃堕棿娈靛唴锛屼竾涓€鍙戠敓鏁版嵁澶勭悊閫熷害涓嶅尮閰嶇殑鎯呭喌鍛紵鐞嗘兂鎯呭喌涓嬶紝濡傛灉鐢熶骇鑰呬骇鍑烘暟鎹殑閫熷害澶т簬娑堣垂鑰呮秷璐圭殑閫熷害锛屽苟涓斿綋鐢熶骇鍑烘潵鐨勬暟鎹疮绉埌涓€瀹氱▼搴︾殑鏃跺€欙紝閭d箞鐢熶骇鑰呭繀椤绘殏鍋滅瓑寰呬竴涓嬶紙闃诲鐢熶骇鑰呯嚎绋嬶級锛屼互渚跨瓑寰呮秷璐硅€呯嚎绋嬫妸绱Н鐨勬暟鎹鐞嗗畬姣曪紝鍙嶄箣浜︾劧銆傜劧鑰岋紝鍦╟oncurrent鍖呭彂甯冧互鍓嶏紝鍦ㄥ绾跨▼鐜涓嬶紝鎴戜滑姣忎釜绋嬪簭鍛橀兘蹇呴』鍘昏嚜宸辨帶鍒惰繖浜涚粏鑺傦紝灏ゅ叾杩樿鍏奸【鏁堢巼鍜岀嚎绋嬪畨鍏紝鑰岃繖浼氱粰鎴戜滑鐨勭▼搴忓甫鏉ヤ笉灏忕殑澶嶆潅搴︺€傚ソ鍦ㄦ鏃讹紝寮哄ぇ鐨刢oncurrent鍖呮í绌哄嚭涓栦簡锛岃€屼粬涔熺粰鎴戜滑甯︽潵浜嗗己澶х殑BlockingQueue銆傦紙鍦ㄥ绾跨▼棰嗗煙锛氭墍璋撻樆濉烇紝鍦ㄦ煇浜涙儏鍐典笅浼氭寕璧风嚎绋嬶紙鍗抽樆濉烇級锛屼竴鏃︽潯浠舵弧瓒筹紝琚寕璧风殑绾跨▼鍙堜細鑷姩琚敜閱掞級涓嬮潰涓ゅ箙鍥炬紨绀轰簡BlockingQueue鐨勪袱涓父瑙侀樆濉炲満鏅細

Java澶氱嚎绋?闃诲闃熷垪BlockingQueue濡傚乏鍥炬墍绀猴細褰撻槦鍒椾腑娌℃湁鏁版嵁鐨勬儏鍐典笅锛屾秷璐硅€呯鐨勬墍鏈夌嚎绋嬮兘浼氳鑷姩闃诲锛堟寕璧凤級锛岀洿鍒版湁鏁版嵁鏀惧叆闃熷垪

Java澶氱嚎绋?闃诲闃熷垪BlockingQueue濡傚乏鍥炬墍绀猴細褰撻槦鍒椾腑濉弧鏁版嵁鐨勬儏鍐典笅锛岀敓浜ц€呯鐨勬墍鏈夌嚎绋嬮兘浼氳鑷姩闃诲锛堟寕璧凤級锛岀洿鍒伴槦鍒椾腑鏈夌┖鐨勪綅缃紝绾跨▼琚嚜鍔ㄥ敜閱掋€?/em>聽聽聽聽聽杩欎篃鏄垜浠湪澶氱嚎绋嬬幆澧冧笅锛屼负浠€涔堥渶瑕丅lockingQueue鐨勫師鍥犮€備綔涓築lockingQueue鐨勪娇鐢ㄨ€咃紝鎴戜滑鍐嶄篃涓嶉渶瑕佸叧蹇冧粈涔堟椂鍊欓渶瑕侀樆濉炵嚎绋嬶紝浠€涔堟椂鍊欓渶瑕佸敜閱掔嚎绋嬶紝鍥犱负杩欎竴鍒嘊lockingQueue閮界粰浣犱竴鎵嬪寘鍔炰簡銆傛棦鐒禕lockingQueue濡傛绁為€氬箍澶э紝璁╂垜浠竴璧锋潵瑙佽瘑涓嬪畠鐨勫父鐢ㄦ柟娉曪細BlockingQueue鐨勬牳蹇冩柟娉曪細鏀惧叆鏁版嵁锛毬犅爋ffer(anObject):琛ㄧず濡傛灉鍙兘鐨勮瘽,灏哸nObject鍔犲埌BlockingQueue閲?鍗冲鏋淏lockingQueue鍙互瀹圭撼,聽聽聽聽鍒欒繑鍥瀟rue,鍚﹀垯杩斿洖false.锛堟湰鏂规硶涓嶉樆濉炲綋鍓嶆墽琛屾柟娉曠殑绾跨▼锛壜犅爋ffer(E聽o,聽long聽timeout,聽TimeUnit聽unit),鍙互璁惧畾绛夊緟鐨勬椂闂达紝濡傛灉鍦ㄦ寚瀹氱殑鏃堕棿鍐咃紝杩樹笉鑳藉線闃熷垪涓犅犅犅犲姞鍏lockingQueue锛屽垯杩斿洖澶辫触銆偮犅爌ut(anObject):鎶奱nObject鍔犲埌BlockingQueue閲?濡傛灉BlockQueue娌℃湁绌洪棿,鍒欒皟鐢ㄦ鏂规硶鐨勭嚎绋嬭闃绘柇聽聽聽聽鐩村埌BlockingQueue閲岄潰鏈夌┖闂村啀缁х画.鑾峰彇鏁版嵁锛毬犅爌oll(time):鍙栬蛋BlockingQueue閲屾帓鍦ㄩ浣嶇殑瀵硅薄,鑻ヤ笉鑳界珛鍗冲彇鍑?鍒欏彲浠ョ瓑time鍙傛暟瑙勫畾鐨勬椂闂?聽聽聽聽鍙栦笉鍒版椂杩斿洖null;聽聽poll(long聽timeout,聽TimeUnit聽unit)锛氫粠BlockingQueue鍙栧嚭涓€涓槦棣栫殑瀵硅薄锛屽鏋滃湪鎸囧畾鏃堕棿鍐咃紝聽聽聽聽闃熷垪涓€鏃︽湁鏁版嵁鍙彇锛屽垯绔嬪嵆杩斿洖闃熷垪涓殑鏁版嵁銆傚惁鍒欑煡閬撴椂闂磋秴鏃惰繕娌℃湁鏁版嵁鍙彇锛岃繑鍥炲け璐ャ€偮犅爐ake():鍙栬蛋BlockingQueue閲屾帓鍦ㄩ浣嶇殑瀵硅薄,鑻lockingQueue涓虹┖,闃绘柇杩涘叆绛夊緟鐘舵€佺洿鍒奥犅犅犅燘lockingQueue鏈夋柊鐨勬暟鎹鍔犲叆;聽聽聽drainTo():涓€娆℃€т粠BlockingQueue鑾峰彇鎵€鏈夊彲鐢ㄧ殑鏁版嵁瀵硅薄锛堣繕鍙互鎸囧畾鑾峰彇鏁版嵁鐨勪釜鏁帮級锛屄犅犅犅犅犻€氳繃璇ユ柟娉曪紝鍙互鎻愬崌鑾峰彇鏁版嵁鏁堢巼锛涗笉闇€瑕佸娆″垎鎵瑰姞閿佹垨閲婃斁閿併€?/p>

  • 甯歌BlockingQueue鍦ㄤ簡瑙d簡BlockingQueue鐨勫熀鏈姛鑳藉悗锛岃鎴戜滑鏉ョ湅鐪婤lockingQueue瀹跺涵澶ц嚧鏈夊摢浜涙垚鍛橈紵 Java澶氱嚎绋?闃诲闃熷垪BlockingQueue - 閲忓瓙浜虹敓 - 閲忓瓙浜虹敓鐨勫崥瀹?/li>
  • BlockingQueue鎴愬憳璇︾粏浠嬬粛1. ArrayBlockingQueue聽聽聽聽聽 鍩轰簬鏁扮粍鐨勯樆濉為槦鍒楀疄鐜帮紝鍦ˋrrayBlockingQueue鍐呴儴锛岀淮鎶や簡涓€涓畾闀挎暟缁勶紝浠ヤ究缂撳瓨闃熷垪涓殑鏁版嵁瀵硅薄锛岃繖鏄竴涓父鐢ㄧ殑闃诲闃熷垪锛岄櫎浜嗕竴涓畾闀挎暟缁勫锛孉rrayBlockingQueue鍐呴儴杩樹繚瀛樼潃涓や釜鏁村舰鍙橀噺锛屽垎鍒爣璇嗙潃闃熷垪鐨勫ご閮ㄥ拰灏鹃儴鍦ㄦ暟缁勪腑鐨勪綅缃€偮?ArrayBlockingQueue鍦ㄧ敓浜ц€呮斁鍏ユ暟鎹拰娑堣垂鑰呰幏鍙栨暟鎹紝閮芥槸鍏辩敤鍚屼竴涓攣瀵硅薄锛岀敱姝や篃鎰忓懗鐫€涓よ€呮棤娉曠湡姝e苟琛岃繍琛岋紝杩欑偣灏ゅ叾涓嶅悓浜嶭inkedBlockingQueue锛涙寜鐓у疄鐜板師鐞嗘潵鍒嗘瀽锛孉rrayBlockingQueue瀹屽叏鍙互閲囩敤鍒嗙閿侊紝浠庤€屽疄鐜扮敓浜ц€呭拰娑堣垂鑰呮搷浣滅殑瀹屽叏骞惰杩愯銆侱oug Lea涔嬫墍浠ユ病杩欐牱鍘诲仛锛屼篃璁告槸鍥犱负ArrayBlockingQueue鐨勬暟鎹啓鍏ュ拰鑾峰彇鎿嶄綔宸茬粡瓒冲杞诲阀锛屼互鑷充簬寮曞叆鐙珛鐨勯攣鏈哄埗锛岄櫎浜嗙粰浠g爜甯︽潵棰濆鐨勫鏉傛€у锛屽叾鍦ㄦ€ц兘涓婂畬鍏ㄥ崰涓嶅埌浠讳綍渚垮疁銆?ArrayBlockingQueue鍜孡inkedBlockingQueue闂磋繕鏈変竴涓槑鏄剧殑涓嶅悓涔嬪鍦ㄤ簬锛屽墠鑰呭湪鎻掑叆鎴栧垹闄ゅ厓绱犳椂涓嶄細浜х敓鎴栭攢姣佷换浣曢澶栫殑瀵硅薄瀹炰緥锛岃€屽悗鑰呭垯浼氱敓鎴愪竴涓澶栫殑Node瀵硅薄銆傝繖鍦ㄩ暱鏃堕棿鍐呴渶瑕侀珮鏁堝苟鍙戝湴澶勭悊澶ф壒閲忔暟鎹殑绯荤粺涓紝鍏跺浜嶨C鐨勫奖鍝嶈繕鏄瓨鍦ㄤ竴瀹氱殑鍖哄埆銆傝€屽湪鍒涘缓ArrayBlockingQueue鏃讹紝鎴戜滑杩樺彲浠ユ帶鍒跺璞$殑鍐呴儴閿佹槸鍚﹂噰鐢ㄥ叕骞抽攣锛岄粯璁ら噰鐢ㄩ潪鍏钩閿併€?. LinkedBlockingQueue聽聽聽聽聽 鍩轰簬閾捐〃鐨勯樆濉為槦鍒楋紝鍚孉rrayListBlockingQueue绫讳技锛屽叾鍐呴儴涔熺淮鎸佺潃涓€涓暟鎹紦鍐查槦鍒楋紙璇ラ槦鍒楃敱涓€涓摼琛ㄦ瀯鎴愶級锛屽綋鐢熶骇鑰呭線闃熷垪涓斁鍏ヤ竴涓暟鎹椂锛岄槦鍒椾細浠庣敓浜ц€呮墜涓幏鍙栨暟鎹紝骞剁紦瀛樺湪闃熷垪鍐呴儴锛岃€岀敓浜ц€呯珛鍗宠繑鍥烇紱鍙湁褰撻槦鍒楃紦鍐插尯杈惧埌鏈€澶у€肩紦瀛樺閲忔椂锛圠inkedBlockingQueue鍙互閫氳繃鏋勯€犲嚱鏁版寚瀹氳鍊硷級锛屾墠浼氶樆濉炵敓浜ц€呴槦鍒楋紝鐩村埌娑堣垂鑰呬粠闃熷垪涓秷璐规帀涓€浠芥暟鎹紝鐢熶骇鑰呯嚎绋嬩細琚敜閱掞紝鍙嶄箣瀵逛簬娑堣垂鑰呰繖绔殑澶勭悊涔熷熀浜庡悓鏍风殑鍘熺悊銆傝€孡inkedBlockingQueue涔嬫墍浠ヨ兘澶熼珮鏁堢殑澶勭悊骞跺彂鏁版嵁锛岃繕鍥犱负鍏跺浜庣敓浜ц€呯鍜屾秷璐硅€呯鍒嗗埆閲囩敤浜嗙嫭绔嬬殑閿佹潵鎺у埗鏁版嵁鍚屾锛岃繖涔熸剰鍛崇潃鍦ㄩ珮骞跺彂鐨勬儏鍐典笅鐢熶骇鑰呭拰娑堣垂鑰呭彲浠ュ苟琛屽湴鎿嶄綔闃熷垪涓殑鏁版嵁锛屼互姝ゆ潵鎻愰珮鏁翠釜闃熷垪鐨勫苟鍙戞€ц兘銆備綔涓哄紑鍙戣€咃紝鎴戜滑闇€瑕佹敞鎰忕殑鏄紝濡傛灉鏋勯€犱竴涓狶inkedBlockingQueue瀵硅薄锛岃€屾病鏈夋寚瀹氬叾瀹归噺澶у皬锛孡inkedBlockingQueue浼氶粯璁や竴涓被浼兼棤闄愬ぇ灏忕殑瀹归噺锛圛nteger.MAX_VALUE锛夛紝杩欐牱鐨勮瘽锛屽鏋滅敓浜ц€呯殑閫熷害涓€鏃﹀ぇ浜庢秷璐硅€呯殑閫熷害锛屼篃璁歌繕娌℃湁绛夊埌闃熷垪婊¢樆濉炰骇鐢燂紝绯荤粺鍐呭瓨灏辨湁鍙兘宸茶娑堣€楁畣灏戒簡銆侫rrayBlockingQueue鍜孡inkedBlockingQueue鏄袱涓渶鏅€氫篃鏄渶甯哥敤鐨勯樆濉為槦鍒楋紝涓€鑸儏鍐典笅锛屽湪澶勭悊澶氱嚎绋嬮棿鐨勭敓浜ц€呮秷璐硅€呴棶棰橈紝浣跨敤杩欎袱涓被瓒充互銆備笅闈㈢殑浠g爜婕旂ず浜嗗浣曚娇鐢˙lockingQueue锛?/li>
  • [java] view plaincopyprint?
    1. import聽java.util.concurrent.BlockingQueue;聽聽
    2. import聽java.util.concurrent.ExecutorService;聽聽
    3. import聽java.util.concurrent.Executors;聽聽
    4. import聽java.util.concurrent.LinkedBlockingQueue;聽聽
    5. 聽聽聽
    6. /**
    7. 聽*聽@author聽jackyuj
    8. 聽*/聽聽
    9. publicclass聽BlockingQueueTest聽{聽聽
    10. 聽聽聽
    11. 聽聽聽聽publicstaticvoid聽main(String[]聽args)聽throws聽InterruptedException聽{聽聽
    12. 聽聽聽聽聽聽聽聽//聽澹版槑涓€涓閲忎负10鐨勭紦瀛橀槦鍒?聽聽
    13. 聽聽聽聽聽聽聽聽BlockingQueue<String>聽queue聽=聽new聽LinkedBlockingQueue<String>(10);聽聽
    14. 聽聽聽
    15. 聽聽聽聽聽聽聽聽Producer聽producer1聽=聽new聽Producer(queue);聽聽
    16. 聽聽聽聽聽聽聽聽Producer聽producer2聽=聽new聽Producer(queue);聽聽
    17. 聽聽聽聽聽聽聽聽Producer聽producer3聽=聽new聽Producer(queue);聽聽
    18. 聽聽聽聽聽聽聽聽Consumer聽consumer聽=聽new聽Consumer(queue);聽聽
    19. 聽聽聽
    20. 聽聽聽聽聽聽聽聽//聽鍊熷姪Executors 聽聽
    21. 聽聽聽聽聽聽聽聽ExecutorService聽service聽=聽Executors.newCachedThreadPool();聽聽
    22. 聽聽聽聽聽聽聽聽//聽鍚姩绾跨▼ 聽聽
    23. 聽聽聽聽聽聽聽聽service.execute(producer1);聽聽
    24. 聽聽聽聽聽聽聽聽service.execute(producer2);聽聽
    25. 聽聽聽聽聽聽聽聽service.execute(producer3);聽聽
    26. 聽聽聽聽聽聽聽聽service.execute(consumer);聽聽
    27. 聽聽聽
    28. 聽聽聽聽聽聽聽聽//聽鎵ц10s 聽聽
    29. 聽聽聽聽聽聽聽聽Thread.sleep(10聽*聽1000);聽聽
    30. 聽聽聽聽聽聽聽聽producer1.stop();聽聽
    31. 聽聽聽聽聽聽聽聽producer2.stop();聽聽
    32. 聽聽聽聽聽聽聽聽producer3.stop();聽聽
    33. 聽聽聽
    34. 聽聽聽聽聽聽聽聽Thread.sleep(2000);聽聽
    35. 聽聽聽聽聽聽聽聽//聽閫€鍑篍xecutor 聽聽
    36. 聽聽聽聽聽聽聽聽service.shutdown();聽聽
    37. 聽聽聽聽}聽聽
    38. }聽聽
    39. 聽聽
    40. import聽java.util.Random;聽聽
    41. import聽java.util.concurrent.BlockingQueue;聽聽
    42. import聽java.util.concurrent.TimeUnit;聽聽
    43. /**
    44. 聽*聽娑堣垂鑰呯嚎绋?/span>聽
    45. 聽*
    46. 聽*聽@author聽jackyuj
    47. 聽*/聽聽
    48. publicclass聽Consumer聽implements聽Runnable聽{聽聽
    49. 聽聽聽聽public聽Consumer(BlockingQueue<String>聽queue)聽{聽聽
    50. 聽聽聽聽聽聽聽聽this.queue聽=聽queue;聽聽
    51. 聽聽聽聽}聽聽
    52. 聽聽聽聽publicvoid聽run()聽{聽聽
    53. 聽聽聽聽聽聽聽聽System.out.println("鍚姩娑堣垂鑰呯嚎绋嬶紒");聽聽
    54. 聽聽聽聽聽聽聽聽Random聽r聽=聽new聽Random();聽聽
    55. 聽聽聽聽聽聽聽聽boolean聽isRunning聽=聽true;聽聽
    56. 聽聽聽聽聽聽聽聽try聽{聽聽
    57. 聽聽聽聽聽聽聽聽聽聽聽聽while聽(isRunning)聽{聽聽
    58. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println("姝d粠闃熷垪鑾峰彇鏁版嵁...");聽聽
    59. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽String聽data聽=聽queue.poll(2,聽TimeUnit.SECONDS);聽聽
    60. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽if聽(null聽!=聽data)聽{聽聽
    61. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println("鎷垮埌鏁版嵁锛?聽+聽data);聽聽
    62. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println("姝e湪娑堣垂鏁版嵁锛?聽+聽data);聽聽
    63. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));聽聽
    64. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽else聽{聽聽
    65. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽//聽瓒呰繃2s杩樻病鏁版嵁锛岃涓烘墍鏈夌敓浜х嚎绋嬮兘宸茬粡閫€鍑猴紝鑷姩閫€鍑烘秷璐圭嚎绋嬨€?聽聽
    66. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽isRunning聽=聽false;聽聽
    67. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
    68. 聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
    69. 聽聽聽聽聽聽聽聽}聽catch聽(InterruptedException聽e)聽{聽聽
    70. 聽聽聽聽聽聽聽聽聽聽聽聽e.printStackTrace();聽聽
    71. 聽聽聽聽聽聽聽聽聽聽聽聽Thread.currentThread().interrupt();聽聽
    72. 聽聽聽聽聽聽聽聽}聽finally聽{聽聽
    73. 聽聽聽聽聽聽聽聽聽聽聽聽System.out.println("閫€鍑烘秷璐硅€呯嚎绋嬶紒");聽聽
    74. 聽聽聽聽聽聽聽聽}聽聽
    75. 聽聽聽聽}聽聽
    76. 聽聽聽聽private聽BlockingQueue<String>聽queue;聽聽
    77. 聽聽聽聽privatestaticfinalint聽聽聽聽聽聽DEFAULT_RANGE_FOR_SLEEP聽=聽1000;聽聽
    78. }聽聽
    79. import聽java.util.Random;聽聽
    80. import聽java.util.concurrent.BlockingQueue;聽聽
    81. import聽java.util.concurrent.TimeUnit;聽聽
    82. import聽java.util.concurrent.atomic.AtomicInteger;聽聽
    83. /**
    84. 聽*聽鐢熶骇鑰呯嚎绋?/span>聽
    85. 聽*
    86. 聽*聽@author聽jackyuj
    87. 聽*/聽聽
    88. publicclass聽Producer聽implements聽Runnable聽{聽聽
    89. 聽聽聽聽public聽Producer(BlockingQueue聽queue)聽{聽聽
    90. 聽聽聽聽聽聽聽聽this.queue聽=聽queue;聽聽
    91. 聽聽聽聽}聽聽
    92. 聽聽聽聽publicvoid聽run()聽{聽聽
    93. 聽聽聽聽聽聽聽聽String聽data聽=聽null;聽聽
    94. 聽聽聽聽聽聽聽聽Random聽r聽=聽new聽Random();聽聽
    95. 聽聽聽聽聽聽聽聽System.out.println("鍚姩鐢熶骇鑰呯嚎绋嬶紒");聽聽
    96. 聽聽聽聽聽聽聽聽try聽{聽聽
    97. 聽聽聽聽聽聽聽聽聽聽聽聽while聽(isRunning)聽{聽聽
    98. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println("姝e湪鐢熶骇鏁版嵁...");聽聽
    99. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));聽聽
    100. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽data聽=聽"data:"聽+聽count.incrementAndGet();聽聽
    101. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println("灏嗘暟鎹細"聽+聽data聽+聽"鏀惧叆闃熷垪...");聽聽
    102. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽if聽(!queue.offer(data,聽2,聽TimeUnit.SECONDS))聽{聽聽
    103. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println("鏀惧叆鏁版嵁澶辫触锛?聽+聽data);聽聽
    104. 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
    105. 聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
    106. 聽聽聽聽聽聽聽聽}聽catch聽(InterruptedException聽e)聽{聽聽
    107. 聽聽聽聽聽聽聽聽聽聽聽聽e.printStackTrace();聽聽
    108. 聽聽聽聽聽聽聽聽聽聽聽聽Thread.currentThread().interrupt();聽聽
    109. 聽聽聽聽聽聽聽聽}聽finally聽{聽聽
    110. 聽聽聽聽聽聽聽聽聽聽聽聽System.out.println("閫€鍑虹敓浜ц€呯嚎绋嬶紒");聽聽
    111. 聽聽聽聽聽聽聽聽}聽聽
    112. 聽聽聽聽}聽聽
    113. 聽聽聽聽publicvoid聽stop()聽{聽聽
    114. 聽聽聽聽聽聽聽聽isRunning聽=聽false;聽聽
    115. 聽聽聽聽}聽聽
    116. 聽聽聽聽privatevolatileboolean聽聽聽聽聽聽isRunning聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽=聽true;聽聽
    117. 聽聽聽聽private聽BlockingQueue聽queue;聽聽
    118. 聽聽聽聽privatestatic聽AtomicInteger聽聽count聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽=聽new聽AtomicInteger();聽聽
    119. 聽聽聽聽privatestaticfinalint聽聽聽聽聽聽DEFAULT_RANGE_FOR_SLEEP聽=聽1000;聽聽
    120. }聽聽
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
     
    /**
     * @author jackyuj
     */
    public class BlockingQueueTest {
     
        public static void main(String[] args) throws InterruptedException {
            // 澹版槑涓€涓閲忎负10鐨勭紦瀛橀槦鍒?
            BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
     
            Producer producer1 = new Producer(queue);
            Producer producer2 = new Producer(queue);
            Producer producer3 = new Producer(queue);
            Consumer consumer = new Consumer(queue);
     
            // 鍊熷姪Executors
            ExecutorService service = Executors.newCachedThreadPool();
            // 鍚姩绾跨▼
            service.execute(producer1);
            service.execute(producer2);
            service.execute(producer3);
            service.execute(consumer);
     
            // 鎵ц10s
            Thread.sleep(10 * 1000);
            producer1.stop();
            producer2.stop();
            producer3.stop();
     
            Thread.sleep(2000);
            // 閫€鍑篍xecutor
            service.shutdown();
        }
    }
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    /**
     * 娑堣垂鑰呯嚎绋?
     *
     * @author jackyuj
     */
    public class Consumer implements Runnable {
        public Consumer(BlockingQueue<String> queue) {
            this.queue = queue;
        }
        public void run() {
            System.out.println("鍚姩娑堣垂鑰呯嚎绋嬶紒");
            Random r = new Random();
            boolean isRunning = true;
            try {
                while (isRunning) {
                    System.out.println("姝d粠闃熷垪鑾峰彇鏁版嵁...");
                    String data = queue.poll(2, TimeUnit.SECONDS);
                    if (null != data) {
                        System.out.println("鎷垮埌鏁版嵁锛? + data);
                        System.out.println("姝e湪娑堣垂鏁版嵁锛? + data);
                        Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                    } else {
                        // 瓒呰繃2s杩樻病鏁版嵁锛岃涓烘墍鏈夌敓浜х嚎绋嬮兘宸茬粡閫€鍑猴紝鑷姩閫€鍑烘秷璐圭嚎绋嬨€?
                        isRunning = false;
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            } finally {
                System.out.println("閫€鍑烘秷璐硅€呯嚎绋嬶紒");
            }
        }
        private BlockingQueue<String> queue;
        private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;
    }
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    /**
     * 鐢熶骇鑰呯嚎绋?
     *
     * @author jackyuj
     */
    public class Producer implements Runnable {
        public Producer(BlockingQueue queue) {
            this.queue = queue;
        }
        public void run() {
            String data = null;
            Random r = new Random();
            System.out.println("鍚姩鐢熶骇鑰呯嚎绋嬶紒");
            try {
                while (isRunning) {
                    System.out.println("姝e湪鐢熶骇鏁版嵁...");
                    Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                    data = "data:" + count.incrementAndGet();
                    System.out.println("灏嗘暟鎹細" + data + "鏀惧叆闃熷垪...");
                    if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                        System.out.println("鏀惧叆鏁版嵁澶辫触锛? + data);
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            } finally {
                System.out.println("閫€鍑虹敓浜ц€呯嚎绋嬶紒");
            }
        }
        public void stop() {
            isRunning = false;
        }
        private volatile boolean      isRunning               = true;
        private BlockingQueue queue;
        private static AtomicInteger  count                   = new AtomicInteger();
        private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;
    }
    
  • 3.聽DelayQueue聽聽聽聽聽聽DelayQueue涓殑鍏冪礌鍙湁褰撳叾鎸囧畾鐨勫欢杩熸椂闂村埌浜嗭紝鎵嶈兘澶熶粠闃熷垪涓幏鍙栧埌璇ュ厓绱犮€侱elayQueue鏄竴涓病鏈夊ぇ灏忛檺鍒剁殑闃熷垪锛屽洜姝ゅ線闃熷垪涓彃鍏ユ暟鎹殑鎿嶄綔锛堢敓浜ц€咃級姘歌繙涓嶄細琚樆濉烇紝鑰屽彧鏈夎幏鍙栨暟鎹殑鎿嶄綔锛堟秷璐硅€咃級鎵嶄細琚樆濉炪€備娇鐢ㄥ満鏅細聽聽DelayQueue浣跨敤鍦烘櫙杈冨皯锛屼絾閮界浉褰撳阀濡欙紝甯歌鐨勪緥瀛愭瘮濡備娇鐢ㄤ竴涓狣elayQueue鏉ョ鐞嗕竴涓秴鏃舵湭鍝嶅簲鐨勮繛鎺ラ槦鍒椼€?.聽PriorityBlockingQueue聽聽聽聽聽聽鍩轰簬浼樺厛绾х殑闃诲闃熷垪锛堜紭鍏堢骇鐨勫垽鏂€氳繃鏋勯€犲嚱鏁颁紶鍏ョ殑Compator瀵硅薄鏉ュ喅瀹氾級锛屼絾闇€瑕佹敞鎰忕殑鏄疨riorityBlockingQueue骞朵笉浼氶樆濉炴暟鎹敓浜ц€咃紝鑰屽彧浼氬湪娌℃湁鍙秷璐圭殑鏁版嵁鏃讹紝闃诲鏁版嵁鐨勬秷璐硅€呫€傚洜姝や娇鐢ㄧ殑鏃跺€欒鐗瑰埆娉ㄦ剰锛岀敓浜ц€呯敓浜ф暟鎹殑閫熷害缁濆涓嶈兘蹇簬娑堣垂鑰呮秷璐规暟鎹殑閫熷害锛屽惁鍒欐椂闂翠竴闀匡紝浼氭渶缁堣€楀敖鎵€鏈夌殑鍙敤鍫嗗唴瀛樼┖闂淬€傚湪瀹炵幇PriorityBlockingQueue鏃讹紝鍐呴儴鎺у埗绾跨▼鍚屾鐨勯攣閲囩敤鐨勬槸鍏钩閿併€?.聽SynchronousQueue聽聽聽聽聽聽涓€绉嶆棤缂撳啿鐨勭瓑寰呴槦鍒楋紝绫讳技浜庢棤涓粙鐨勭洿鎺ヤ氦鏄擄紝鏈夌偣鍍忓師濮嬬ぞ浼氫腑鐨勭敓浜ц€呭拰娑堣垂鑰咃紝鐢熶骇鑰呮嬁鐫€浜у搧鍘婚泦甯傞攢鍞粰浜у搧鐨勬渶缁堟秷璐硅€咃紝鑰屾秷璐硅€呭繀椤讳翰鑷幓闆嗗競鎵惧埌鎵€瑕佸晢鍝佺殑鐩存帴鐢熶骇鑰咃紝濡傛灉涓€鏂规病鏈夋壘鍒板悎閫傜殑鐩爣锛岄偅涔堝涓嶈捣锛屽ぇ瀹堕兘鍦ㄩ泦甯傜瓑寰呫€傜浉瀵逛簬鏈夌紦鍐茬殑BlockingQueue鏉ヨ锛屽皯浜嗕竴涓腑闂寸粡閿€鍟嗙殑鐜妭锛堢紦鍐插尯锛夛紝濡傛灉鏈夌粡閿€鍟嗭紝鐢熶骇鑰呯洿鎺ユ妸浜у搧鎵瑰彂缁欑粡閿€鍟嗭紝鑰屾棤闇€鍦ㄦ剰缁忛攢鍟嗘渶缁堜細灏嗚繖浜涗骇鍝佸崠缁欓偅浜涙秷璐硅€咃紝鐢变簬缁忛攢鍟嗗彲浠ュ簱瀛樹竴閮ㄥ垎鍟嗗搧锛屽洜姝ょ浉瀵逛簬鐩存帴浜ゆ槗妯″紡锛屾€讳綋鏉ヨ閲囩敤涓棿缁忛攢鍟嗙殑妯″紡浼氬悶鍚愰噺楂樹竴浜涳紙鍙互鎵归噺涔板崠锛夛紱浣嗗彟涓€鏂归潰锛屽張鍥犱负缁忛攢鍟嗙殑寮曞叆锛屼娇寰椾骇鍝佷粠鐢熶骇鑰呭埌娑堣垂鑰呬腑闂村鍔犱簡棰濆鐨勪氦鏄撶幆鑺傦紝鍗曚釜浜у搧鐨勫強鏃跺搷搴旀€ц兘鍙兘浼氶檷浣庛€偮犅犲0鏄庝竴涓猄ynchronousQueue鏈変袱绉嶄笉鍚岀殑鏂瑰紡锛屽畠浠箣闂存湁鐫€涓嶅お涓€鏍风殑琛屼负銆傚叕骞虫ā寮忓拰闈炲叕骞虫ā寮忕殑鍖哄埆:聽聽濡傛灉閲囩敤鍏钩妯″紡锛歋ynchronousQueue浼氶噰鐢ㄥ叕骞抽攣锛屽苟閰嶅悎涓€涓狥IFO闃熷垪鏉ラ樆濉炲浣欑殑鐢熶骇鑰呭拰娑堣垂鑰咃紝浠庤€屼綋绯绘暣浣撶殑鍏钩绛栫暐锛浡犅犱絾濡傛灉鏄潪鍏钩妯″紡锛圫ynchronousQueue榛樿锛夛細SynchronousQueue閲囩敤闈炲叕骞抽攣锛屽悓鏃堕厤鍚堜竴涓狶IFO闃熷垪鏉ョ鐞嗗浣欑殑鐢熶骇鑰呭拰娑堣垂鑰咃紝鑰屽悗涓€绉嶆ā寮忥紝濡傛灉鐢熶骇鑰呭拰娑堣垂鑰呯殑澶勭悊閫熷害鏈夊樊璺濓紝鍒欏緢瀹规槗鍑虹幇楗ユ复鐨勬儏鍐碉紝鍗冲彲鑳芥湁鏌愪簺鐢熶骇鑰呮垨鑰呮槸娑堣垂鑰呯殑鏁版嵁姘歌繙閮藉緱涓嶅埌澶勭悊銆?/li>
  • 灏忕粨聽聽BlockingQueue涓嶅厜瀹炵幇浜嗕竴涓畬鏁撮槦鍒楁墍鍏锋湁鐨勫熀鏈姛鑳斤紝鍚屾椂鍦ㄥ绾跨▼鐜涓嬶紝浠栬繕鑷姩绠$悊浜嗗绾块棿鐨勮嚜鍔ㄧ瓑寰呬簬鍞ら啋鍔熻兘锛屼粠鑰屼娇寰楃▼搴忓憳鍙互蹇界暐杩欎簺缁嗚妭锛屽叧娉ㄦ洿楂樼骇鐨勫姛鑳姐€?/li>