Java并发编程之线程治理(Executor框架11)

Java并发编程之线程管理(Executor框架11)

4线程执行器

如果你不得不开发一个需要运行许多并发任务的程序,这种方法有下面这些劣势:


Ø  你不得不实现所有相关代码来管理线程对象(对象的创建,结束,获取结果)。

Ø  每个任务,你需要创建一个线程。如果你不得不执行一个超大量的任务,这将影响到应用程序的吞吐量。处理不好,会影响系统的整体性能。

Ø  你不得不高效地控制和管理计算机的系统资源。如果你创建了太对的线程,你的系统资源会变得不到充分利用。


从Java 5以后,Java 并发API提供了一个机制来解决这些问题。这个机制是称作执行器框架(Executor Framework),它围绕着接口Executor,它的子接口ExecutorService,类ThreadPoolExecutor实现了这两个接口。

使用一个executor,你仅需要实现Runnable,并发送给executor对象。Executor对象负责执行,实例化,和运行必要的线程。它的功能远不止这些,它使用线程池帮助提高了系统的性能。运用线程池,避免了连续不断的产生线程的花费,这样合理利用了系统资源。另外,Executor框架是一个callable接口,调用call方法可以返回执行结果。同时,你将获得一个实现了Future接口的对象,使用这个对象来控制线程的状态和Callable对象的返回值。

1.1   创建一个线程执行器

使用Executor 框架的第一步骤就是创建一个ThreadPoolExecutor类,你能够使用四个参数的构造方法或者使用一个叫做Executors工厂类来创建ThreadPoolExecutor对象。一旦你有一个executor对象,你可以发送Runnable或者Callable对象,执行对应的业务逻辑。

看一个例子,首先创建一个Task类,实现所需要执行的业务逻辑。

 

import java.util.Date;
import java.util.concurrent.TimeUnit;
 
/**
 * This class implements aconcurrent task
 *
 */
public class Task implements Runnable {
 
    /**
     * The start date of the task
     */
    private Date initDate;
    /**
     * The name of the task
     */
    private String name;
   
    /**
     * Constructor of the class. Initializes thename of the task
     * @param name name asigned to the task
     */
    public Task(String name){
        initDate=new Date();
        this.name=name;
    }
   
    /**
     * This method implements the execution of thetask. Waits a random period of time and finish
     */
    @Override
    public voidrun() {
        System.out.printf("%s: Task %s: Created on: %s\n",Thread.currentThread().getName(),name,initDate);
        System.out.printf("%s: Task %s: Started on: %s\n",Thread.currentThread().getName(),name,new Date());
       
        try {
            Long duration=(long)(Math.random()*10);
            System.out.printf("%s: Task %s: Doing a task during %d seconds\n",
                              Thread.currentThread().getName(),
                              name,
                              duration);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
       
        System.out.printf("%s: Task %s: Finished on: %s\n",Thread.currentThread().getName(),name,new Date());
    }
 
}
 

创建Worker类,定义自己的线程池执行器,管理线程的执行情况。

 

import java.util.concurrent.Executors;
importjava.util.concurrent.ThreadPoolExecutor;
 
public class Worker {
 
    /*ThreadPoolExecutorsto manage the execution of the request */
    private ThreadPoolExecutor executor;
   
    public Worker(){
        executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
    }
   
   
    /**
     * This method is called when a request to theserver is made. The
     * server uses the executor to execute therequest that it receives
     * @param task The request made to the server
     */
    public voidexecuteTask(Task task){
        System.out.printf("Server: A new task has arrived\n");
        executor.execute(task);
        System.out.printf("Server: Pool Size: %d\n",executor.getPoolSize());
        System.out.printf("Server: Active Count: %d\n",executor.getActiveCount());
        System.out.printf("Server: Completed Tasks: %d\n",executor.getCompletedTaskCount());
    }
   
    /**
     * Ends the task with shutdown() method.
     */
    public voidendTask(){
        executor.shutdown();
    }
   
    public staticvoidmain(String []args){
       
        Worker worker = new Worker();
       
        for(int i = 0; i < 100; i++){
            Task task = new Task("task_"+ i);
            worker.executeTask(task);
        }
       
    }
}

在这个例子中,你通过使用newCachedThreadPool()方法创建了一个缓存线程池。这个方法返回一个ExecutorService对象。因此,它被强制转换成ThreadPoolExecutor。如果缓存的线程池需要去执行一个新任务时,它将使用那些已经运行完的线程来执行新线程的任务。线程的重新利用,有利于减少创建新线程的时间,这是它的优势所在。然而,它的劣势就是总存在为新任务准备的常量线程,这样的话,如果你发送大量的任务给executor,你将使系统超过负荷。