java线程模型Master-Worker

这样的模型是最经常使用的并行模式之中的一个,在Nginx源代码中有涉及到有想看的能够去这个大神的博客了解一下http://blog.csdn.net/marcky/article/details/6014733,这位大神写的有些简洁。

从思想的角度来说。它主要由两类进程进行协作:各自是Master进程和Worker进程。Master进程负责接受和分配任务,Worker进程负责处理子任务,当Worker将子任务处理完毕后。将结果返回给Master进程。由Master进程做归纳和汇总。得到终于结果。详细流程能够看此图

java线程模型Master-Worker

这样的模式可以将一个大任务分解成若干个小任务去运行,适合一些耗时比較久的任务,可以提高系统的吞吐量。

一个相对完整的模型应该具备下面功能

java线程模型Master-Worker

在借鉴了java性能优化书上的列子。上面实现了一个简单的Master-Worker模式

package com.thread;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

public class Master_Worker {
	public static void main(String args[])
	 {
		  long start = System.currentTimeMillis();
		 Master_Worker master_Worker = new Master_Worker(new PlusWorker(), 11);
		 for (int i = 0; i < 100; i++) {
			master_Worker.submit(i);
		}
		 master_Worker.execute();
		 int re = 0;
		 Map<String, Object> result_Map = master_Worker.getResultMap();
		 while (result_Map.size()>0||!master_Worker.isComplete()) {
			 Set<String> keysSet = result_Map.keySet();
			 String keyString = null;
			 for (String string : keysSet) {
				keyString = string;
				break;
			}
			 Integer i = null;
			 if (keyString !=null) {
				i = (Integer) result_Map.get(keyString);
			}
			 if (i!=null) {
				re+=i;
			}
			 if (keyString!=null) {
				result_Map.remove(keyString);
			}
		}
		 long end = System.currentTimeMillis();
		 System.out.println("结果:"+re+"-运行之间"+(end-start));
		 int sum = 0;
		 start = System.currentTimeMillis();
		 for (int i = 1; i <= 100; i++) {
			sum+=i*i*i;
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		 end = System.currentTimeMillis();
		 System.out.println("结果:"+sum+"-运行之间"+(end-start));
	 }
	// 任务队列
	protected Queue<Object> workerQueue = new ConcurrentLinkedDeque<>();
	// Worker进程队列
	protected Map<String, Thread> threadMap = new HashMap<>();
	// 子任务处理结果集
	protected Map<String, Object> resultMap = new ConcurrentHashMap<>();

	// 是否全部的子任务都结束了
	public boolean isComplete() {
		for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
			if (entry.getValue().getState() != Thread.State.TERMINATED) {
				return false;
			}
		}
		return true;
	}

	// Master的构造,须要一个Worker进程逻辑,和须要的Worker进程数量
	public Master_Worker(Worker woker, int countWorker) {
		woker.setWorkQueue(workerQueue);
		woker.setResultMap(resultMap);
		for (int i = 0; i < countWorker; i++) {
			threadMap.put(Integer.toString(i),
					new Thread(woker, Integer.toString(i)));
		}

	}

	//返回子任务结果集
	public Map<String, Object> getResultMap()
	{
		return resultMap;
	}
	//提交任务
	public void submit(Object job) {
		workerQueue.add(job);
	}
	public void execute()
	{
		for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
			if (entry.getValue().getState() != Thread.State.TERMINATED) {
				entry.getValue().start();
			}
		}
	}
}

class Worker implements Runnable {

	// 任务队列,用于取得子任务
	protected Queue<Object> workQueue;
	// 子任务处理结果集
	protected Map<String, Object> resultMap;

	public void setWorkQueue(Queue<Object> workQueue) {
		this.workQueue = workQueue;
	}

	public void setResultMap(Map<String, Object> resultMap) {
		this.resultMap = resultMap;
	}

	// 子任务处理逻辑,在子类中实现详细逻辑
	public Object handle(Object input) {
		/* 这里能够写自己想要做的事情 */
		return input;
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		while (true) {
			// 获取子任务
			Object inputObject = workQueue.poll();
			if (inputObject == null) {
				break;
			}
			// 处理子任务
			Object reObject = handle(inputObject);
			resultMap.put(Integer.toString(inputObject.hashCode()), reObject);
		}
	}
}

/*
 * 扩展自己的类
 * */
 class PlusWorker extends Worker{
	 @Override
	public Object handle(Object input) {
		// TODO Auto-generated method stub
		 //在这里能够自己实现自己的业务逻辑等,在这里我让线程睡眠了100毫秒,模拟任务运行
		 
		 try {
			Thread.sleep(100);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		Integer i = (Integer) input;
		return i*i*i;
	}
 }
 
 

这里的大多数都是借鉴java性能优化一书,加上自己的改编和简单介绍。