关于用多线程时任务分配与结果收集的有关问题的设计讨论
关于用多线程时任务分配与结果收集的问题的设计讨论
感觉到这个思想还是跟Hadoop靠近的, 但小任务还是惊动不起Hadoop的。
问下, 我这里的这些代码, 可以通过哪个对象来调用shutdown?
哦,我明白了,你使用的接口是Executor,这里可以使用ExecutorService作为接口的,他的里面就有shutdown()方法。另对于这样的任务,从性能的角度,可以考虑下使用Executors.newCachedThreadPool();
问下, 我这里的这些代码, 可以通过哪个对象来调用shutdown?
假定任务是这样的: 对于给定字符串,如“threadTest”, 给它再续上个“@”, 最后返回结果。 这些单条任务间没有任何的依赖关系。
假定这样的字符串很多, 如1000个, 而可以创建的线程数不能多于35个。
于是, 就有了线程创建后, 分配任务的问题, 怎么能在代码上优雅地把给定任务组,分配到线程池中, 然后再去收集这些线程处理后的结果?
想用java.util.concurrency中的Future, 可这个好像不能支持批量结果的收集吧?
于是, 用线程很原始地写了个粗暴分配任务与收集结果的代码, 这个很丑陋的。 大家说怎么改进,设计?
========下面是我粗陋实现的代码===========
package rmn.thread; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; public class TaskAssignAndResultCollection { private final static int DEFAULT_THREAD_NUM = 5; private int threadNum = DEFAULT_THREAD_NUM; private Worker[] threads = null; public TaskAssignAndResultCollection(int threadNum) { super(); if (threadNum == 0) { threadNum = DEFAULT_THREAD_NUM; } else { this.threadNum = threadNum; } } public Map<String, String> processStringBatchly( String[] datas) { if (threads == null) { synchronized (this) { threads = new Worker[threadNum]; for(int i = 0 ; i < threadNum; i++) { threads[i] = new Worker(); } } } // 怎么把domainName分配给线程, 让它们自己运行去?平均分配, int domainSize = datas.length; int domainNamePerThread = domainSize / threadNum; int leftDomainName = domainSize % threadNum; List<String> listDomainName = Arrays.asList(datas); //先每个线程平均地分domainNamePerThread个DomainName, int endIndex = 0; for (int i=0; i<threadNum; i++) { int beginIndex = i * domainNamePerThread; int step = domainNamePerThread; endIndex = beginIndex + step; List<String> subDomainNames = new ArrayList<String>(listDomainName.subList(beginIndex, endIndex)); threads[i].setDomainNameList(subDomainNames); } //然后,再把剩下的逐个分配。 for(int i=0; i< leftDomainName; i++) { threads[i].addDomainName(listDomainName.get(endIndex + i)); } for(Worker thread : threads ) { thread.start(); try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } Map<String, String> totalResult = new HashMap<String, String>(); for(Worker thread : threads) { totalResult.putAll(thread.getResultCollector()); } return totalResult; } public static void main(String[] args) { String[] datas = new String[] {"baidu.com", "sohu.com", "163.com", "iteye.com"}; TaskAssignAndResultCollection c = new TaskAssignAndResultCollection(3); Map<String, String> resultCollector = c.processStringBatchly(datas); c.showMsg(resultCollector); } private void showMsg(Map<String, String> result) { for(Map.Entry<String, String> me : result.entrySet()) { String data = me.getKey(); String r = me.getValue(); String msg = "原始值[" + data + "]" + " 处理后[" + r + "]" ; System.out.println(msg); } } } class Worker extends Thread { private List<String> datas; private Map<String, String> resultCollector = new HashMap<String, String>(); public void run() { for (String d : datas) { String result = d + "@"; resultCollector.put(d, result); } } public void setDomainNameList(List<String> subDomainNames) { datas = subDomainNames; } public void addDomainName(String domainName) { if (datas == null ) { datas = new ArrayList<String>(); } datas.add(domainName); } public Map<String, String> getResultCollector() { return resultCollector; } }
1 楼
rmn190
2011-07-27
有些改进, 但还是不理想, 具体实现代码如下:
package rmn.thread; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Executors; public class TaskAssignAndResultCollection2 { private final static int DEFAULT_THREAD_NUM = 5; private int threadNum = DEFAULT_THREAD_NUM; public TaskAssignAndResultCollection2(int threadNum) { super(); if (threadNum == 0) { threadNum = DEFAULT_THREAD_NUM; } else { this.threadNum = threadNum; } } public List<RequestAndResult> processStringBatchlyCompl( String[] datas) throws InterruptedException, ExecutionException { Executor e = Executors.newFixedThreadPool(threadNum); CompletionService<RequestAndResult> ecs = new ExecutorCompletionService<RequestAndResult>(e); for (String d : datas ) { //对于每一个数据都要创建一个CallableWorker, //很不花算。再怎么改进?能不能创建一个Callable? ecs.submit(new CallableWorker(d)); } List<RequestAndResult> results = new ArrayList<RequestAndResult>(); for(int i =0; i<datas.length; i++) { RequestAndResult r = ecs.take().get(); results.add(r); } return results; } public static void main(String[] args) throws Exception { String[] datas = new String[] {"baidu.com", "sohu.com", "163.com", "iteye.com"}; TaskAssignAndResultCollection2 c = new TaskAssignAndResultCollection2(3); List<RequestAndResult> rs = c.processStringBatchlyCompl(datas); for(RequestAndResult rr : rs) { String data = rr.data; String r = rr.afterProcessed; String msg = "原始值[" + data + "]" + " 处理后[" + r + "]" ; System.out.println(msg); } } } class RequestAndResult { String data; String afterProcessed; public RequestAndResult(String data, String afterProcessed) { super(); this.data = data; this.afterProcessed = afterProcessed; } } class CallableWorker implements Callable<RequestAndResult> { String data; public CallableWorker(String data) { super(); this.data = data; } @Override public RequestAndResult call() throws Exception { String afterProcessed = data + "@"; RequestAndResult r = new RequestAndResult(data, afterProcessed); return r; } }
2 楼
rmn190
2011-07-27
我用了ExecutorCompletionService后, 通过Eclipse发现, 程序完成后, 并没有彻底退出。
用正文中提到的粗陋实现时, Eclipse显示彻底退出了。
这是怎么回事?
1, Eclipse的显示有问题?
2, 还是我使用ExecutorCompletionService不当?
用正文中提到的粗陋实现时, Eclipse显示彻底退出了。
这是怎么回事?
1, Eclipse的显示有问题?
2, 还是我使用ExecutorCompletionService不当?
3 楼
fjjiaboming
2011-07-27
Hadoop
ReduceMap
ReduceMap
4 楼
rmn190
2011-07-27
fjjiaboming 写道
Hadoop
ReduceMap
ReduceMap
感觉到这个思想还是跟Hadoop靠近的, 但小任务还是惊动不起Hadoop的。
5 楼
admires
2011-07-27
需要执行Executor.shutdown();
你这是执行完任务没有关闭线程池..
你这是执行完任务没有关闭线程池..
6 楼
rmn190
2011-07-27
admires 写道
需要执行Executor.shutdown();
你这是执行完任务没有关闭线程池..
你这是执行完任务没有关闭线程池..
Executor e = Executors.newFixedThreadPool(threadNum); CompletionService<RequestAndResult> ecs = new ExecutorCompletionService<RequestAndResult>(e); for (String d : datas ) { //对于每一个数据都要创建一个CallableWorker, //很不花算。再怎么改进?能不能创建一个Callable? ecs.submit(new CallableWorker(d)); } List<RequestAndResult> results = new ArrayList<RequestAndResult>(); for(int i =0; i<datas.length; i++) { RequestAndResult r = ecs.take().get(); results.add(r); }
问下, 我这里的这些代码, 可以通过哪个对象来调用shutdown?
7 楼
admires
2011-07-28
rmn190 写道
admires 写道
需要执行Executor.shutdown();
你这是执行完任务没有关闭线程池..
你这是执行完任务没有关闭线程池..
哦,我明白了,你使用的接口是Executor,这里可以使用ExecutorService作为接口的,他的里面就有shutdown()方法。另对于这样的任务,从性能的角度,可以考虑下使用Executors.newCachedThreadPool();
Executor e = Executors.newFixedThreadPool(threadNum); CompletionService<RequestAndResult> ecs = new ExecutorCompletionService<RequestAndResult>(e); for (String d : datas ) { //对于每一个数据都要创建一个CallableWorker, //很不花算。再怎么改进?能不能创建一个Callable? ecs.submit(new CallableWorker(d)); } List<RequestAndResult> results = new ArrayList<RequestAndResult>(); for(int i =0; i<datas.length; i++) { RequestAndResult r = ecs.take().get(); results.add(r); }
问下, 我这里的这些代码, 可以通过哪个对象来调用shutdown?
8 楼
rmn190
2011-07-29
线程池对象threads中上次调用数据的清空, 及清空时的线程安全问题。