分布式定时任务—xxl-job学习(二):源码分析——执行器的启动过程

从上一篇搭建一个简单的分布式demo任务调度项目可以知道,主要是三个部分:

配置并启动任务调度中心(xxl-job-admin)
配置并启动业务系统(执行器)
在调度中心web页面配置执行器及任务
本篇咱们先从业务系统的执行器的配置和启动的源码进行深度分析。
xxl.job.version使用的是 2.2.1-SNAPSHOT版本

一、执行器的启动
在业务定时任务系统

引入xxl-job的依赖配置
新增执行器组件配置类XxlJobConfig.java,其中配置了核心类XxlJobSpringExecutor
新增jobhandler类,类中有带 @XxlJob("xxx")注解的方法
1.1 分析核心类XxlJobSpringExecutor
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {

@Override
public void afterSingletonsInstantiated() {
//。。。。。。。。暂时省略这个方法的具体内容
}

@Override
public void destroy() {
super.destroy();
}

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
//。。。。。。。。暂时省略这个方法的具体内容
}

private static ApplicationContext applicationContext;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

public static ApplicationContext getApplicationContext() {
return applicationContext;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
在源码中我们可以看到这个类继承了XxlJobExecutor类,实现了ApplicationContextAware、SmartInitializingSingleton、DisposableBean。

这个对象初始化的时候会调用afterSingletonsInstantiated()方法。

@Override
public void afterSingletonsInstantiated() {

// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/

// init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);

// refresh GlueFactory
GlueFactory.refreshInstance(1);

// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
initJobHandlerRepository()和initJobHandlerMethodRepository()是将项目中配置的任务保存在项目的内存中,使用ConcurrentMap<String, IJobHandler>保存,使用 springbean的id为key,具体的任务实例对象为 value。;
刷新GlueFactory(glue执行工厂),把它刷新为 SpringGlueFactory,在执行 glue 模式的任务时使用 spring 来加载相应实例。
会调用执行器的核心XxlJobExecutor中的start()方法。
1.1.1 initJobHandlerRepository()
这个方法是旧版本中用来注册带有 @JobHandler 注解的bean的Java类, 2.2.1-SNAPSHOT版本已经不支持该种方式;

1.1.2 initJobHandlerMethodRepository()
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// init job handler from method
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);

Map<Method, XxlJob> annotatedMethods = null;
// referred to : org.springframework.context.event.EventListenerMethodProcessor.processBean
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
continue;
}

for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method method = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
if (xxlJob == null) {
continue;
}

String name = xxlJob.value();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}

// execute method
if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like " public ReturnT<String> execute(String param) " .");
}
if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like " public ReturnT<String> execute(String param) " .");
}
method.setAccessible(true);

// init and destory
Method initMethod = null;
Method destroyMethod = null;

if (xxlJob.init().trim().length() > 0) {
try {
initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
}

// registry jobhandler 向`ConcurrentMap<String, IJobHandler>`中保存当前定时任务实例。
registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
分析:

从applicationContext中获取所有的bean对象;
利用MethodIntrospector工具类的selectMethods方法和MetadataLookup接口得到Map<Method, XxlJob>(下边学习下这个工具类核心方法selectMethods的源码)
public static <T> Map<Method, T> selectMethods(Class<?> targetType, final MetadataLookup<T> metadataLookup) {
final Map<Method, T> methodMap = new LinkedHashMap<>();
Set<Class<?>> handlerTypes = new LinkedHashSet<>();
Class<?> specificHandlerType = null;
//判断是否是代理类
if (!Proxy.isProxyClass(targetType)) {
//如果是代理类,找到实际的类型
specificHandlerType = ClassUtils.getUserClass(targetType);
handlerTypes.add(specificHandlerType);
}
handlerTypes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetType));
//遍历所有找到的class对象
for (Class<?> currentHandlerType : handlerTypes) {
final Class<?> targetClass = (specificHandlerType != null ? specificHandlerType : currentHandlerType);

ReflectionUtils.doWithMethods(currentHandlerType, method -> {
//获取指定的method
Method specificMethod = ClassUtils.getMostSpecificMethod(method, targetClass);
//获取方法关联的元数据,一般是指注解
T result = metadataLookup.inspect(specificMethod);
if (result != null) {
Method bridgedMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
if (bridgedMethod == specificMethod || metadataLookup.inspect(bridgedMethod) == null) {
methodMap.put(specificMethod, result);
}
}
}, ReflectionUtils.USER_DECLARED_METHODS);
}

return methodMap;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
循环第二步得到的Map<Method, XxlJob>,key就是注解的id,value是注解元数据。
校验注解元数据的name属性,如果为空则抛出异常;
根据name从内存ConcurrentMap<String, IJobHandler>(这其实是注册的时候存的所有任务的仓库)获取对应任务实例,如果已经存在,则抛出异常(任务冲突);
校验入参,必须为String param,因为 2.2.1-SNAPSHOT指定了开发Job方法,方式格式要求为 “public ReturnT< String> execute(String param)”。
校验出参,必须是ReturnT< String>格式;
注入元数据中配置的init()和destroy()方法;
向ConcurrentMap<String, IJobHandler>中保存当前定时任务实例。
1.1.3 GlueFactory.refreshInstance(1)
刷新GlueFactory为 SpringGlueFactory,在执行 glue 模式的任务时使用 spring 来加载相应实例。

1.1.4 super.start()
调用XxlJobExecutor.start() 。

1.2 分析核心类XxlJobExecutor
XxlJobExecutor的属性有:

// ---------------------- param ----------------------
private String adminAddresses;
private String accessToken;
private String appname;
private String address;
private String ip;
private int port;
private String logPath;
private int logRetentionDays;
1
2
3
4
5
6
7
8
9
上一步介绍了最终XxlJobSpringExecutor会调用XxlJobExecutor的start()方法,下边我们继续看看这个方法做些什么:

public void start() throws Exception {

// init logpath
XxlJobFileAppender.initLogPath(logPath);

// init invoker, admin-client
initAdminBizList(adminAddresses, accessToken);


// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);

// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();

// init executor-server
initEmbedServer(address, ip, port, appname, accessToken);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1.2.1 XxlJobFileAppender.initLogPath(logPath)
logPath是我们配置执行器组件里的xxl.job.executor.logpath日志路径。

public static void initLogPath(String logPath){
// init
if (logPath!=null && logPath.trim().length()>0) {
logBasePath = logPath;
}
// mk base dir
File logPathDir = new File(logBasePath);
if (!logPathDir.exists()) {
logPathDir.mkdirs();
}
logBasePath = logPathDir.getPath();

// mk glue dir
File glueBaseDir = new File(logPathDir, "gluesource");
if (!glueBaseDir.exists()) {
glueBaseDir.mkdirs();
}
glueSrcPath = glueBaseDir.getPath();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
如果配置了日志路径,那么logBasePath就是我们配置文件里的地址;
判断这个日志路径是否存在,如果不存在则创建日志目录;
生成gluesource子文件夹;
1.2.2 initAdminBizList(adminAddresses, accessToken)
// ---------------------- admin-client (rpc invoker) ----------------------
private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {

AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
adminBizList.add(adminBiz);
}
}
}
}
public static List<AdminBiz> getAdminBizList(){
return adminBizList;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
这个方法是根据调度中心部署跟地址adminAddresses和执行器通讯TOKENaccessToken初始化AdminBizClient,AdminBizClient这个类有三个核心方法

@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}

@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}

@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
提供callback(回调)、registry(注册)以及registryRemove(注册移除)到调度中心的方法。

1.2.3 JobLogFileCleanThread.getInstance().start(logRetentionDays)
这个方法是初始化日志清除线程,过期日志自动清理(清理N天前的日志文件)。

public class JobLogFileCleanThread {
private static Logger logger = LoggerFactory.getLogger(JobLogFileCleanThread.class);

private static JobLogFileCleanThread instance = new JobLogFileCleanThread();
public static JobLogFileCleanThread getInstance(){
return instance;
}

private Thread localThread;
private volatile boolean toStop = false;
public void start(final long logRetentionDays){

// limit min value
if (logRetentionDays < 3 ) {
return;
}

localThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// clean log dir, over logRetentionDays
File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
if (childDirs!=null && childDirs.length>0) {

// today
Calendar todayCal = Calendar.getInstance();
todayCal.set(Calendar.HOUR_OF_DAY,0);
todayCal.set(Calendar.MINUTE,0);
todayCal.set(Calendar.SECOND,0);
todayCal.set(Calendar.MILLISECOND,0);

Date todayDate = todayCal.getTime();

for (File childFile: childDirs) {

// valid
if (!childFile.isDirectory()) {
continue;
}
if (childFile.getName().indexOf("-") == -1) {
continue;
}

// file create date
Date logFileCreateDate = null;
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
logFileCreateDate = simpleDateFormat.parse(childFile.getName());
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
if (logFileCreateDate == null) {
continue;
}

if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
FileUtil.deleteRecursively(childFile);
}

}
}

} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}

}

try {
TimeUnit.DAYS.sleep(1);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory.");

}
});
localThread.setDaemon(true);
localThread.setName("xxl-job, executor JobLogFileCleanThread");
localThread.start();
}

public void toStop() {
toStop = true;

if (localThread == null) {
return;
}

// interrupt and wait
localThread.interrupt();
try {
localThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}  郑州看妇科哪家医院好:http://www.zztongjifk.com/郑州妇科医院哪里好:http://www.zztongjifk.com/郑州做妇科检查多少钱:http://www.zztongjifk.com/

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
我们主要关注下start()方法:

执行器组件配置的执行器日志文件保存天数必须大于3,否则不清理;
创建一个守护线程,每天执行一次(TimeUnit.DAYS.sleep(1););
获取日志路径根目录下的所有日期文件目录;
循环判断当前时间(当天的0时0分0秒0毫秒)和日期目录对应的"yyyy-MM-dd"时间差值是否大于配置的执行器日志文件保存天数参数;
(todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000)
1
如果超过日志保存天数,则删除该时间目录及其目录下所有文件。