BlockingQueue兑现生产者消费者模式
BlockingQueue实现生产者消费者模式
package com.sunrise.mywork2.concurrent.queue;
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class IndexingService
{
private static final int CAPACITY = 1000;
private static final File POISON = new File("");
private final IndexerThread consumer = new IndexerThread();
private final CrawlerThread producer = new CrawlerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;
public IndexingService(File root,final FileFilter fileFilter){
this.root = root;
this.queue = new LinkedBlockingQueue<File>(CAPACITY);
this.fileFilter = new FileFilter()
{
public boolean accept(File pathname)
{
return pathname.isDirectory()||fileFilter.accept(pathname);
}
};
}
private boolean alreadyIndexed(File f){
return false;
}
class CrawlerThread extends Thread{
public void run(){
try{
crawl(root);
}catch (InterruptedException e) {
}finally{
while(true){
try{
queue.put(POISON);
break;
}catch (InterruptedException e1) {
}
}
}
}
private void crawl(File root) throws InterruptedException{
File[] entries = root.listFiles(fileFilter);
if(entries!=null){
for(File entry:entries){
if(entry.isDirectory())
crawl(entry);
else if(!alreadyIndexed(entry))
queue.put(entry);
}
}
}
}
class IndexerThread extends Thread{
public void run(){
try{
while(true){
File file = queue.take();
if(file == POISON)
break;
else
indexFile(file);
}
}catch (InterruptedException e) {
}
}
public void indexFile(File file){
//TODO
System.out.println(file.getName());
}
}
public void start(){
producer.start();
consumer.start();
}
public void stop(){
producer.interrupt();
}
public void awaitTermination() throws InterruptedException{
consumer.join();
}
/**
* test it
* @param args
*/
public static void main(String[] args)
{
File root = new File("F:\\");
FileFilter fileFilter = new FileFilter()
{
public boolean accept(File f)
{
return f.getName().lastIndexOf(".java")>-1;
}
};
IndexingService is = new IndexingService(root, fileFilter);
is.start();
}
}
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class IndexingService
{
private static final int CAPACITY = 1000;
private static final File POISON = new File("");
private final IndexerThread consumer = new IndexerThread();
private final CrawlerThread producer = new CrawlerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;
public IndexingService(File root,final FileFilter fileFilter){
this.root = root;
this.queue = new LinkedBlockingQueue<File>(CAPACITY);
this.fileFilter = new FileFilter()
{
public boolean accept(File pathname)
{
return pathname.isDirectory()||fileFilter.accept(pathname);
}
};
}
private boolean alreadyIndexed(File f){
return false;
}
class CrawlerThread extends Thread{
public void run(){
try{
crawl(root);
}catch (InterruptedException e) {
}finally{
while(true){
try{
queue.put(POISON);
break;
}catch (InterruptedException e1) {
}
}
}
}
private void crawl(File root) throws InterruptedException{
File[] entries = root.listFiles(fileFilter);
if(entries!=null){
for(File entry:entries){
if(entry.isDirectory())
crawl(entry);
else if(!alreadyIndexed(entry))
queue.put(entry);
}
}
}
}
class IndexerThread extends Thread{
public void run(){
try{
while(true){
File file = queue.take();
if(file == POISON)
break;
else
indexFile(file);
}
}catch (InterruptedException e) {
}
}
public void indexFile(File file){
//TODO
System.out.println(file.getName());
}
}
public void start(){
producer.start();
consumer.start();
}
public void stop(){
producer.interrupt();
}
public void awaitTermination() throws InterruptedException{
consumer.join();
}
/**
* test it
* @param args
*/
public static void main(String[] args)
{
File root = new File("F:\\");
FileFilter fileFilter = new FileFilter()
{
public boolean accept(File f)
{
return f.getName().lastIndexOf(".java")>-1;
}
};
IndexingService is = new IndexingService(root, fileFilter);
is.start();
}
}