AsyncTask - Java异步调用框架

AsyncTask -- Java异步调用框架

原创文章,转载请注明作者:jmppok及出处:http://blog.csdn.net/jmppok/article/details/44590991。


AsyncTask是个人编写的一个Java异步调用框架,支持用户:

1)自定义Task,并可设置Task的类型(Type), 子类型(subType),超时时间(TImeout),标识(Flag-可用来区分不同的Task),Task的输入参数(input)等。

2)可通过submitTask,提交 到框架中异步执行,框架查找对应的TaskExectuor,多线程执行。

3)可自定义对应TaskExecutor,通过配置添加到框架中。TaskExecutor支持Execotor Chain, 多个Executor可以组合在一起顺序执行。并且支持在Task执行过程中,实时通知任务调用者Task的状态,进度等。

4)用户可使用TaskCollector通过TaskManager查询所有的Task,支持按Task Id,Task Type, Task SubType, Task State, Task Flag, Task beginTIme, Task finishTime等多种方式的组合查询。

5)支持持久化,用户提交的Task可以被存储在数据库中。即使Task在执行过程中被中断,重新启动后会从数据库中恢复后继续执行。

6)用户可通过查询接口可获取Task的引用ITaskReference,通过ITaskReference可实时获取Task的状态(State)和进度Progress。

7)用户可定义Task的FinishedCallBack回调,在Submit Task时传入,在Task完成后自动回调。

8)通过ITaskReference的waitForTask,支持用户以同步方式使用。

9)用户可通过ITaskReference获取Task的执行结果或错误信息。


代码:https://git.oschina.net/jmpp/AsyncTask


1.为什么需要AsyncTask?与Asyn4J 区别?

1.1Java传统的Thread 和 Runable功能不足

Java提供了Thread,ThreadPool等多线程编程接口。但这些都是基础接口,虽然使用方便,但功能比较简单,很多场景下都无法满足需求。

比如下面的几个场景:

1)我需要提交一个任务,改任务在后台异步执行,同时我要实时观察任务的状态,进度等信息。

2)在提交任务时希望传入参数,任务完成后能主动通知我,并能获取结果。

3)任务持久化,我希望在任务执行完毕后,可以查询到执行的任务列表。或者任务失败后能重新执行。

如果要实现这些场景,Java本身自带的接口显然无法满足,必须要有一个新的框架来实现。


1.2 Asyn4J

Asyn4J也是一个类似的框架,但它目前还不支持任务的超时设置,持久化,任务回调等功能。



2.设计及实现

2.1接口设计

直接上图

AsyncTask - Java异步调用框架


2.2 代码实现

具体实现代码见 Git@OSChttps://git.oschina.net/jmpp/AsyncTask

代码结构如下:

AsyncTask - Java异步调用框架

这里简单说一下实现思路:

1) 整个实现还是基于Java的Thread和ThreadPool,没有用第三方框架。

2)持久化基于Mysql,只有一个数据库表tasks,见tasks.sql.

3)持久化层的实现用到了Mybatis,给予Mybatis的代码生成工具,直接生成了tasks表对应的数据结构。

4)要持久化必然还要用到对象序列化,这里使用了Kryo。为啥用Kryo,见我的另一篇文章:Java对象序列化小结。

5)日志使用了Log4j。


3.测试

具体可见代码:https://git.oschina.net/jmpp/AsyncTask

3.1自定义MyTask

package test.mytask;

import com.lenovo.asynctask.Task;

/**
 * 类 MyTask 的实现描述:TODO 类实现描述
 * 
 * @author ligh4 2015年3月12日下午2:42:56
 */
public class MyTask extends Task {

    /**
     * @param taskType
     * @param inputParam
     * @param timeoutMills
     */
    public MyTask(Object inputParam, int timeoutMills) {
        super(MyTask.class.getSimpleName(), inputParam, timeoutMills);
        setNeedPersistence(true);
    }

}


3.2自定义MyTaskExecutor

package test.mytask;

import com.lenovo.asynctask.ITaskExecutor;
import com.lenovo.asynctask.ITaskReferenceInternal;
import com.lenovo.asynctask.TaskState;
import com.lenovo.asynctask.util.LogHelper;

/**
 * 类 TestTaskExecutor 的实现描述:TODO 类实现描述
 * 
 * @author ligh4 2015年3月12日下午2:43:19
 */
public class MyTaskExecutor extends ITaskExecutor {

    /**
     * @author ligh4 2015年3月12日下午2:46:51
     */
    @Override
    public Object execute(ITaskReferenceInternal taskRef) {
        LogHelper.debug("begin execute MyTask...");

        for (int i = 0; i < 100; i++) {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                LogHelper.exception(e);
            }
            taskRef.setProgress(i + 1);
        }

        return taskRef.getInput().toString().toUpperCase();
    }

    /**
     * @author ligh4 2015年3月12日下午2:46:51
     */
    @Override
    public Object continue_execute(ITaskReferenceInternal taskRef) {
        if (taskRef.getState() == TaskState.running) {
            int i = taskRef.getProgress();
            for (; i < 100; i++) {
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    LogHelper.exception(e);
                }
                taskRef.setProgress(i + 1);
            }

            return taskRef.getInput().toString().toUpperCase();
        } else {
            taskRef.setState(TaskState.failed, "");
            return null;
        }

    }

}


3.3配置MyTaskExecutor

taskexecutors.properties中添加:

MyTask = test.mytask.MyTaskExecutor

其实是task的type     =     task的Executor    

3.4提交Task并实时监听进度

package test.mytask;

import java.util.List;

import com.lenovo.asynctask.ITaskFinishedCallback;
import com.lenovo.asynctask.ITaskReference;
import com.lenovo.asynctask.TaskCollector;
import com.lenovo.asynctask.TaskManager;
import com.lenovo.asynctask.TaskState;
import com.lenovo.asynctask.util.DateUtil;
import com.lenovo.asynctask.util.LogHelper;

/**
 * 类 TestContinueTask 的实现描述:TODO 类实现描述
 * 
 * @author ligh4 2015年3月23日上午9:42:14
 */
public class TestContinueTask {

    /**
     * @author ligh4 2015年3月12日下午2:52:45
     * @param args
     */
    public static void main(String[] args) throws Exception {
        TaskManager.instance().start();

        List<ITaskReference> tasks = queryRunningTasks();
        if (tasks == null || tasks.size() == 0) {
            submitAndWaitTask();
        } else {
            for (ITaskReference taskReference : tasks) {
                queryTaskProgress(taskReference);
            }
        }

        TaskManager.instance().stop();
    }

    public static void submitAndWaitTask() throws Exception {
        MyTask task = new MyTask("liguanghui", 200000);
        ITaskReference taskReference = TaskManager.instance().submitTask(task,
                new ITaskFinishedCallback() {

                    @Override
                    public void onTaskFinished(ITaskReference taskRef) {
                        LogHelper.debug(taskRef.getId() + ";" + taskRef.getState().toString() + ";"
                                + DateUtil.format(taskRef.getStartedTime()) + "  "
                                + DateUtil.format(taskRef.getFinishedTime()) + ";"
                                + taskRef.getResult().toString());

                    }
                });

        queryTaskProgress(taskReference);
    }

    public static void queryTaskProgress(ITaskReference taskReference) throws Exception {
        String taskID = taskReference.getId();
        while (!taskReference.isFinished()) {
            LogHelper.debug(taskID + ": progress " + taskReference.getProgress());
            Thread.sleep(1000);
        }
        LogHelper.debug(taskID + ": finished. ");
    }

    public static List<ITaskReference> queryRunningTasks() {
        TaskCollector collector = new TaskCollector();
        collector.setTaskStateFilter(new TaskState[] { TaskState.running });
        collector.setTaskTypeFilter(new String[] { MyTask.class.getSimpleName() });
        return TaskManager.instance().findTasks(collector);

    }
}


3.5终止Task执行然后重新启动程序,进行Task恢复测试

还是同3.4一样的代码

1)第一次运行没有Task,会提交一个Task。  submitAndWaitTask();

2)如果改Task没有被执行完毕就被终止,第二次启动后该Task就会恢复。

3)这时queryRunningTasks就会查询到正在运行的Task,并且进度到等待该Task的分支。

4)当然如果你停止的时间很长才重新启动,会发现Task超时。

4.不足

1.整体实现比较简单,特别是数据库表中存储Java对象序列化的字段,偷懒用的varchar(2000),可能超出,最好改为Blob。(为啥当时不用Blob,因为偷懒,如果Blob的话mybatis生成的代码就比较复杂了,会有一个XXwithBlob,调用不方便....)

2.线程池个数写死了,应该可以配置。

3.测试比较简单,可能有未知bug。