java架构《并发线程中级篇》

java多线程的三大设计模式

    本章主要记录java常见的三大设计模式,Future、Master-Worker和生产者-消费者模式。

一、Future模式

    使用场景:数据可以不及时返回,到下一次实际要使用结果的之前,后台自动查询并返回。类似与Ajax异步加载。

    原理:客户端发起请求,结果需要返回Data对象,当服务器收到请求以后,FutureData包装类实现Data接口,不查询数据库,直接返回结果。(核心)。然后后台自己开一个线程去查询数据库,RealData真

       实数据类,也实现Data接口,并返回数据。当实际使用时。获取到返回的真实数据。

        java架构《并发线程中级篇》

      代码分析:

      

 1 //  FutureClient 客户端类:
 2 
 3       public class FutureClient {
 4 
 5       public Data request(final String queryStr){
 6           //1 我想要一个代理对象(Data接口的实现类)先返回给发送请求的客户端,告诉他请求已经接收到,可以做其他的事情
 7           final FutureData futureData = new FutureData();
 8           //2 启动一个新的线程,去加载真实的数据,传递给这个代理对象
 9           new Thread(new Runnable() {
10           @Override
11           public void run() {
12               //3 这个新的线程可以去慢慢的加载真实对象,然后传递给代理对象
13               RealData realData = new RealData(queryStr);
14               futureData.setRealData(realData);
15               }
16           }).start();
17           return futureData;
18           }
19 
20  

    

 1  // Data类:
 2 
 3       
 4 
 5         public interface Data {
 6 
 7           String getRequest();
 8 
 9         }
10 
11   

    

 1  // FutureData类:   
 2 
 3         public class FutureData implements Data{
 4 
 5         private RealData realData ;
 6 
 7           private boolean isReady = false;
 8 
 9           public synchronized void setRealData(RealData realData) {
10           //如果已经装载完毕了,就直接返回
11           if(isReady){
12                 return;
13                  }
14               //如果没装载,进行装载真实对象
15                  this.realData = realData;
16                  isReady = true;
17               //进行通知
18                 notify();
19              }
20 
21           @Override
22           public synchronized String getRequest() {
23           //如果没装载好 程序就一直处于阻塞状态
24             while(!isReady){
25             try {
26               wait();
27               } catch (InterruptedException e) {
28                 e.printStackTrace();
29               }
30               }
31               //装载好直接获取数据即可
32               return this.realData.getRequest();
33               }
34 
35  
36 
37              }
38 
39  

     

 1 // RealData类:   
 2 
 3         public class RealData implements Data{
 4 
 5         private String result ;
 6 
 7         public RealData (String queryStr){
 8             System.out.println("根据" + queryStr + "进行查询,这是一个很耗时的操作..");
 9             try {
10               Thread.sleep(5000);
11               } catch (InterruptedException e) {
12                 e.printStackTrace();
13               }
14                 System.out.println("操作完毕,获取结果");
15                 result = "查询结果";
16               }
17 
18            @Override
19               public String getRequest() {
20               return result;
21              }
22 
23             }
24 
25  

     

 1 // Main测试类:            
 2 
 3         public class Main {
 4 
 5             public static void main(String[] args) throws InterruptedException {
 6 
 7             FutureClient fc = new FutureClient();
 8             Data data = fc.request("请求参数");
 9             System.out.println("请求发送成功!");
10             System.out.println("做其他的事情...");
11             String result = data.getRequest();
12             System.out.println(result);
13           }
14         }

 

二:Master-Worker模式(并行计算模式)      

    使用场景:互不影响的多任务时。返回结果需要共同返回。其好处是讲一个大任务分解成若干个小任务。并行执行,提高系统的吞吐量。

    原理:核心思想是系统由两类进程协作工作;Master进程和Worker进程。Master进程负责接收和分配工作,Worker进程主要负责处理子任务。当各
       个Worker进程处理完后。会将结果返回给Master,由Master做归纳和总结,并返回。

      java架构《并发线程中级篇》

      

      java架构《并发线程中级篇》

    

    

    

    代码分析:

      

 1 //Worker类:
 2 
 3       public class Worker implements Runnable {
 4 
 5         private ConcurrentLinkedQueue<Task> workQueue;
 6         private ConcurrentHashMap<String, Object> resultMap;
 7 
 8         public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
 9           this.workQueue = workQueue;
10         }
11 
12         public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
13           this.resultMap = resultMap;
14         }
15 
16         @Override
17         public void run() {
18           while(true){
19             Task input = this.workQueue.poll();
20               if(input == null) break;
21                 Object output = handle(input);
22                 this.resultMap.put(Integer.toString(input.getId()), output);
23               }
24             }
25 
26         private Object handle(Task input) {
27             Object output = null;
28               try {
29                 //处理任务的耗时。。 比如说进行操作数据库。。。
30                 Thread.sleep(500);
31                 output = input.getPrice();   //模拟把Task类的价格做为结果返回
32                 } catch (InterruptedException e) {
33                 e.printStackTrace();
34                 }
35                 return output;
36              }
37 
38          }
39 
40  

    

 1  //Master类:
 2 
 3       public class Master {
 4 
 5         //1 有一个盛放任务的容器
 6         private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
 7 
 8         //2 需要有一个盛放worker的集合
 9         private HashMap<String, Thread> workers = new HashMap<String, Thread>();
10 
11         //3 需要有一个盛放每一个worker执行任务的结果集合
12         private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
13 
14         //4 构造方法
15         public Master(Worker worker , int workerCount){
16           worker.setWorkQueue(this.workQueue);
17           worker.setResultMap(this.resultMap);
18 
19           for(int i = 0; i < workerCount; i ++){
20               this.workers.put(Integer.toString(i), new Thread(worker));
21             }
22             }
23 
24           //5 需要一个提交任务的方法
25        public void submit(Task task){
26             this.workQueue.add(task);
27             }
28 
29           //6 需要有一个执行的方法,启动所有的worker方法去执行任务
30       public void execute(){
31           for(Map.Entry<String, Thread> me : workers.entrySet()){
32               me.getValue().start();
33             }
34             }
35 
36           //7 判断是否运行结束的方法
37       public boolean isComplete() {
38           for(Map.Entry<String, Thread> me : workers.entrySet()){
39               if(me.getValue().getState() != Thread.State.TERMINATED){
40               return false;
41                 }
42               }    
43             return true;
44           }
45 
46           //8 计算结果方法
47       public int getResult() {
48           int priceResult = 0;
49           for(Map.Entry<String, Object> me : resultMap.entrySet()){
50           priceResult += (Integer)me.getValue();
51           }
52           return priceResult;
53           }
54       }

    

     

 1 // Task类:
 2 
 3         
 4 
 5       public class Task {
 6 
 7         private int id;
 8         private int price ;
 9         public int getId() {
10           return id;
11            }
12         public void setId(int id) {
13           this.id = id;
14           }
15         public int getPrice() {
16           return price;
17           }
18         public void setPrice(int price) {
19           this.price = price;
20         } 
21        }
22 
23  

      

 1 //main测试类:
 2 
 3          public class Main {
 4 
 5           public static void main(String[] args) {
 6 
 7           int Processors= Runtime.getRuntime().availableProcessors(); //获取到当前电脑的线程数
 8           System.out.println("当前电脑是"+Processors+"核");
 9           Master master = new Master(new Worker(),Processors );
10           //Master master = new Master(new Worker(),20 ); //开20个线程
11           Random r = new Random();
12           for(int i = 1; i <= 100; i++){
13             Task t = new Task();
14             t.setId(i);
15             t.setPrice(r.nextInt(1000));
16             master.submit(t);
17             }
18             master.execute();  //执行任务
19             long start = System.currentTimeMillis();
20 
21             while(true){
22             if(master.isComplete()){
23             long end = System.currentTimeMillis() - start;
24             int priceResult = master.getResult();
25             System.out.println("最终结果:" + priceResult + ", 执行时间:" + end);
26             break;
27             }
28             }
29           }
30         }
31 
32  

三:生产者-消费者模式

    使用场景:消息中间件。

      java架构《并发线程中级篇》

    代码分析:

       

 1 // main测试类:  
 2 
 3           public class Main {
 4 
 5           public static void main(String[] args) throws Exception {
 6           //内存缓冲区
 7           BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
 8           //生产者
 9           Provider p1 = new Provider(queue);
10           Provider p2 = new Provider(queue);
11           Provider p3 = new Provider(queue);
12           //消费者
13           Consumer c1 = new Consumer(queue);
14           Consumer c2 = new Consumer(queue);
15           Consumer c3 = new Consumer(queue);
16           //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)  
17 
18           ExecutorService cachePool = Executors.newCachedThreadPool();
19 
20           cachePool.execute(p1);
21           cachePool.execute(p2);
22           cachePool.execute(p3);
23           cachePool.execute(c1);
24           cachePool.execute(c2);
25           cachePool.execute(c3);
26 
27           try {
28             Thread.sleep(3000);
29             } catch (InterruptedException e) {
30               e.printStackTrace();
31             }
32               p1.stop();
33               p2.stop();
34               p3.stop();
35           try {
36             Thread.sleep(2000);
37             } catch (InterruptedException e) {
38               e.printStackTrace();
39             }    
40           }
41 
42         }
43 
44  

     

 1  Data类:        
 2 
 3         public final class Data {
 4 
 5         private String id;
 6         private String name;
 7 
 8         public Data(String id, String name){
 9           this.id = id;
10           this.name = name;
11           }
12 
13         public String getId() {
14           return id;
15           }
16 
17         public void setId(String id) {
18           this.id = id;
19           }
20 
21         public String getName() {
22           return name;
23           }
24 
25         public void setName(String name) {
26           this.name = name;
27           }
28 
29         @Override
30         public String toString(){
31           return "{id: " + id + ", name: " + name + "}";
32           }
33 
34         }
35 
36  

     

 1  //Provider成产者类:
 2 
 3         
 4 
 5         public class Provider implements Runnable{
 6 
 7             //共享缓存区
 8           private BlockingQueue<Data> queue;
 9             //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
10           private volatile boolean isRunning = true;
11             //id生成器
12           private static AtomicInteger count = new AtomicInteger();
13             //随机对象
14           private static Random r = new Random(); 
15 
16           public Provider(BlockingQueue queue){
17             this.queue = queue;
18             }
19 
20           @Override
21           public void run() {
22           while(isRunning){
23             try {
24               //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时) 
25               Thread.sleep(r.nextInt(1000));
26               //获取的数据进行累计...
27               int id = count.incrementAndGet();
28               //比如通过一个getData方法获取了
29               Data data = new Data(Integer.toString(id), "数据" + id);
30               System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
31               if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
32               System.out.println("提交缓冲区数据失败....");
33               //do something... 比如重新提交
34               }
35             } catch (InterruptedException e) {
36               e.printStackTrace();
37             }
38           }
39           }
40 
41           public void stop(){
42             this.isRunning = false;
43           }
44 
45         }
46 
47  

    

 1  // ConSumber消费者类:
 2 
 3         
 4 
 5         public class Consumer implements Runnable{
 6 
 7           private BlockingQueue<Data> queue;
 8 
 9           public Consumer(BlockingQueue queue){
10           this.queue = queue;
11           }
12 
13           //随机对象
14           private static Random r = new Random();
15 
16           @Override
17           public void run() {
18             while(true){
19               try {
20               //获取数据
21               Data data = this.queue.take();
22               //进行数据处理。休眠0 - 1000毫秒模拟耗时
23               Thread.sleep(r.nextInt(1000));
24               System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId());
25               } catch (InterruptedException e) {
26                 e.printStackTrace();
27               }
28             }
29             }
30          }
31 
32