spring + quartz 分布式自定义注解 相关技术 自定义注解  配置类 注解解析beanPosrProcessor 任务类

    本文采用spring + quartz的方案。使用mysql作为任务的持久化,支持分布式。

自定义注解

  1.启用定时任务

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(QuartzConfig.class)   //引入配置
@Documented
public @interface EnableMScheduling {

}

//该注解需要放在application启动类上,标识启用定时任务,它的作用就是配置、解析任务以及启动调

 2.标识调度类 

@Target(value = ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MScheduleClass {

    /**
     * 任务分组,页面显示作用
     * @return
     */
    String module() default "系统";
    
    
    /**
     * 描述,提示作用
     * @return
     */
    String desc() default "";
    
}
//
该注解放置在类上,标识指定的类是一组定时任务,其中需要设置任务分组,描述

 3.标识执行的方法

@Target(value = ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MSchedule {


    /**
     * 任务名称
     * @return
     */
    String title() default "";

    
    /**
     * 调度触发的corn表达式 : 用作Job的触发器,目前只支持一个触发器表达式。
     */
    String corn();

    /**
     * 描述
     */
    String desc() default "";

    /**
     * 参数
     */
    String param() default "";
}
//
该注解标识在@MScheduleClass标识的类中的方法中,标识指定的方式是任务执行的方法。

 配置类

import javax.sql.DataSource;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;

public class QuartzConfig {

    /**
     * 配置任务调度器
     * 使用项目数据源作为quartz数据源
     * @param jobFactory 自定义配置任务工厂
     * @param dataSource 数据源实例
     * @return
     * @throws Exception
     */
    @Bean(destroyMethod = "destroy")
    public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource,
        ObjectProvider<PlatformTransactionManager> transactionManager) throws Exception {
        
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        // 项目启动完成后,等待10秒后开始执行调度器初始化
        //schedulerFactoryBean.setStartupDelay(10);
        // 设置调度器自动运行
        schedulerFactoryBean.setAutoStartup(false);
        // 设置数据源,使用与项目统一数据源
        schedulerFactoryBean.setDataSource(dataSource);

        PlatformTransactionManager txManager = transactionManager.getIfUnique();
        if (txManager != null) {
            schedulerFactoryBean.setTransactionManager(txManager);
        }
        // 设置上下文spring bean name
        schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext");
        // 设置配置文件位置
        schedulerFactoryBean.setConfigLocation(new ClassPathResource("/quartz.properties"));
        return schedulerFactoryBean;
    }
    
    @Bean
    public MScheduleBeanPostProcessor mScheduleBeanPostProcessor() {
        return new MScheduleBeanPostProcessor();
    }
}

其中dataSource我自己用的阿里的druid。  具体配置自行处理

 quartz.properites

#调度器实例名称
org.quartz.scheduler.instanceName = quartzScheduler

#调度器实例编号自动生成
org.quartz.scheduler.instanceId = AUTO

#持久化方式配置
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX

#持久化方式配置数据驱动,MySQL数据库
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate

#quartz相关数据表前缀名
org.quartz.jobStore.tablePrefix = QRTZ_

#开启分布式部署
org.quartz.jobStore.isClustered = true
#配置是否使用
org.quartz.jobStore.useProperties = false

#分布式节点有效性检查时间间隔,单位:毫秒
org.quartz.jobStore.clusterCheckinInterval = 20000

#线程池实现类
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool

#执行最大并发线程数量
org.quartz.threadPool.threadCount = 10

#线程优先级
org.quartz.threadPool.threadPriority = 5

#配置是否启动自动加载数据库内的定时任务,默认true
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

quartz初始化的数据库表在org/quartz/impl/jdbcjobstore/tables_@@platform@@.sql

注解解析beanPosrProcessor

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;

import com.mustr.cluster.annotation.MSchedule;
import com.mustr.cluster.annotation.MScheduleClass;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MScheduleBeanPostProcessor implements BeanPostProcessor, ApplicationListener<ContextRefreshedEvent>, DisposableBean {

    @Autowired
    private Scheduler scheduler;
    
    private List<MustrTask> tasks = new ArrayList<>();
    
    
    @Override
    public void destroy() throws Exception {
        scheduler.shutdown();
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        log.info("all scheduler tasks total {}", tasks.size());
        
        try {
            //先把原来的都删除
            Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.anyGroup());
            scheduler.deleteJobs(new ArrayList<>(jobKeys));
        } catch (SchedulerException e1) {
            e1.printStackTrace();
        }
        
        //重新添加新的
        tasks.forEach(task -> {
            try {
                scheduler.scheduleJob(task.getJobDetail(), task.getTrigger());
            } catch (SchedulerException e) {
                e.printStackTrace();
            }
        });
        
        try {
            scheduler.start(); //启动调度器
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        MScheduleClass msClass = bean.getClass().getAnnotation(MScheduleClass.class);
        if (msClass == null) {
            return bean;
        }
        
        String group = bean.getClass().getSimpleName();
        Method[] methods = bean.getClass().getDeclaredMethods();
        if (methods == null) {
            return bean;
        }
        
        
        for (Method method : methods) {
            MSchedule mSchedule = method.getAnnotation(MSchedule.class);
            if (mSchedule == null) {
                continue;
            }
            hanlderSchedule(group, mSchedule, method, bean);
        }
        
        return bean;
    }

    private void hanlderSchedule(String group, MSchedule mSchedule, Method method, Object bean) {
        String jobName = method.getName();
        
        JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.put("targetClass", bean);
        jobDataMap.put("targetMethod", method.getName());
        jobDataMap.put("arguments", mSchedule.param());
        
        JobDetail jobDetail = JobBuilder.newJob(MustrCommonJob.class)
            .setJobData(jobDataMap)
            .withIdentity(new JobKey(jobName, group))
            .withDescription(mSchedule.desc())
            .storeDurably()
            .build();
        
        Trigger trigger = TriggerBuilder.newTrigger()
            .withIdentity(new TriggerKey(jobName, group))
            .withDescription(mSchedule.desc())
            .withSchedule(CronScheduleBuilder.cronSchedule(mSchedule.corn()).withMisfireHandlingInstructionDoNothing())
            .forJob(jobDetail)
            .build();
        
        tasks.add(new MustrTask(jobDetail, trigger));
    }
}

该类就是解析自定义注解的@MScheduleClass@MSchedule标识的任务。封装jobDetailTrigger

任务job统一使用MustrCommonJob通过反射来执行配置的指定类的指定方法

 

MustrTask

import org.quartz.JobDetail;
import org.quartz.Trigger;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

@Setter
@Getter
@AllArgsConstructor
public class MustrTask {

    private JobDetail jobDetail;
    private Trigger trigger;
}

任务类

import java.io.Serializable;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.util.MethodInvoker;

public class MustrCommonJob implements Job, Serializable{
    private static final long serialVersionUID = 8651275978441122356L;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        Object targetClass = context.getMergedJobDataMap().get("targetClass");
        String targetMethod = context.getMergedJobDataMap().getString("targetMethod");
        String param = context.getMergedJobDataMap().getString("arguments");
        
        //前置处理
        // do ....
        
        try {
            MethodInvoker methodInvoker = new MethodInvoker();
            methodInvoker.setTargetObject(targetClass);
            methodInvoker.setTargetMethod(targetMethod);
            if (param != null && !"".equals(param)) {
                String[] params = param.split(",");
                Object[] temp = new Object[params.length];
                for (int i = 0; i < params.length; i++) {
                    temp[i] = params[i];
                }
                methodInvoker.setArguments(temp);
            }
            methodInvoker.prepare();
            methodInvoker.invoke();
        }  catch (Exception e) {
            e.printStackTrace();
        } finally {
            
            //后置处理
            // do  ... 如记录日志
        }
    }

}
该类实现了quartz的job接口。通过反射调用指定的方法

一个简单的demo

import java.io.Serializable;
import java.time.LocalDateTime;

import com.mustr.cluster.annotation.MSchedule;
import com.mustr.cluster.annotation.MScheduleClass;

@MScheduleClass(module = "系统", desc = "测试组")
public class HelloSchedule implements Serializable{
    private static final long serialVersionUID = 3619058186885794136L;

    /*@MSchedule(corn = "0/30 * * * * ?", desc = "打印hello world")
    public void hello() {
        System.out.println("hello mustr..... <<<:::>>>" + LocalDateTime.now());
    }*/
    
    @MSchedule(corn = "0/10 * * * * ?", desc = "打印hello world")
    public void hello1() {
        System.out.println("<<<<hello1 mustr..... <<<:::>>>" + LocalDateTime.now());
    }
}

最后一步

在程序启动类中加入 @EnableMScheduling注解,启动项目即可看到控制台打印

本文代码:https://github.com/Mustr/mustr-quartz-boot