package com.doumee.biz.system.impl; 
 | 
  
 | 
import com.alibaba.fastjson.JSON; 
 | 
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; 
 | 
import com.doumee.biz.system.SystemJobTriggerBiz; 
 | 
import com.doumee.biz.system.dto.TriggerJobDTO; 
 | 
import com.doumee.core.constants.Constants; 
 | 
import com.doumee.core.exception.LockedException; 
 | 
import com.doumee.core.job.BaseDistributer; 
 | 
import com.doumee.core.job.BaseJob; 
 | 
import com.doumee.core.job.JobContext; 
 | 
import com.doumee.core.job.JobParam; 
 | 
import com.doumee.core.utils.Utils; 
 | 
import com.doumee.dao.system.SystemJobLogMapper; 
 | 
import com.doumee.dao.system.SystemJobMapper; 
 | 
import com.doumee.dao.system.SystemJobSnippetMapper; 
 | 
import com.doumee.dao.system.model.SystemJob; 
 | 
import com.doumee.dao.system.model.SystemJobLog; 
 | 
import com.doumee.dao.system.model.SystemJobSnippet; 
 | 
import lombok.extern.slf4j.Slf4j; 
 | 
import org.apache.commons.lang3.RandomStringUtils; 
 | 
import org.apache.commons.lang3.StringUtils; 
 | 
import org.springframework.beans.factory.annotation.Autowired; 
 | 
import org.springframework.stereotype.Service; 
 | 
  
 | 
import java.util.Date; 
 | 
import java.util.List; 
 | 
import java.util.UUID; 
 | 
  
 | 
@Slf4j 
 | 
@Service 
 | 
public class SystemJobTriggerBizImpl implements SystemJobTriggerBiz { 
 | 
  
 | 
    @Autowired 
 | 
    private SystemJobLogMapper systemJobLogMapper; 
 | 
  
 | 
    @Autowired 
 | 
    private SystemJobMapper systemJobMapper; 
 | 
  
 | 
    @Autowired 
 | 
    private SystemJobSnippetMapper systemJobSnippetMapper; 
 | 
  
 | 
    private ThreadLocal<String> jobUnique = new ThreadLocal<>(); 
 | 
  
 | 
    @Override 
 | 
    public void trigger(TriggerJobDTO dto) { 
 | 
        SystemJob job = null; 
 | 
        JobParam jobParam = new JobParam(); 
 | 
        boolean locked = false; 
 | 
        SystemJobLog systemJobLog = new SystemJobLog(); 
 | 
        try { 
 | 
            Integer jobId = dto.getId(); 
 | 
            job = systemJobMapper.selectById(jobId); 
 | 
            // 验证JOB 
 | 
            if (!this.checkJob(job, dto.getBusinessTime())) { 
 | 
                return; 
 | 
            } 
 | 
            // 初始化JOB数据 
 | 
            jobUnique.set(RandomStringUtils.randomAlphabetic(8)); 
 | 
            log(job, "Job started"); 
 | 
            log(job, "Job thread name: {}", Thread.currentThread().getName()); 
 | 
            log(job, "Job scheduled fire time: {}", Utils.Date.format(dto.getScheduledFireTime())); 
 | 
            log(job, "Job fire time: {}", Utils.Date.format(dto.getFireTime())); 
 | 
            log(job, "Job business time: {}", Utils.Date.format(dto.getBusinessTime())); 
 | 
            jobParam.setId(jobId); 
 | 
            jobParam.setTriggerType(dto.getTriggerType()); 
 | 
            jobParam.setBusinessTime(dto.getBusinessTime()); 
 | 
            // 记录日志 
 | 
            if (job.getWithLog()) { 
 | 
                systemJobLog.setJobId(jobId); 
 | 
                systemJobLog.setJobName(job.getJobName()); 
 | 
                systemJobLog.setRunTimeStart(new Date()); 
 | 
                systemJobLog.setServerIp(Utils.Server.getIP()); 
 | 
                systemJobLog.setBusinessTime(dto.getBusinessTime()); 
 | 
                systemJobLog.setStatus(Constants.Job.LogStatus.NONE.getCode()); 
 | 
                systemJobLog.setTriggerType(dto.getTriggerType()); 
 | 
                systemJobLog.setRemark("任务正在执行中"); 
 | 
                systemJobLogMapper.insert(systemJobLog); 
 | 
            } 
 | 
            // 任务加锁,防止多服务器重复执行 
 | 
            this.lock(job); 
 | 
            locked = true; 
 | 
            // 如果开启了异步执行,则创建一个分片 
 | 
            if (job.getWithAsync()) { 
 | 
                String snippetCode = this.createAsyncSnippet(job, jobParam); 
 | 
                systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode()); 
 | 
                systemJobLog.setRemark("异步任务已转为分片任务,分片编码:" + snippetCode); 
 | 
                return; 
 | 
            } 
 | 
            // 如果存在分片 
 | 
            if (StringUtils.isNotBlank(job.getDistributeHandler())) { 
 | 
                String distributeGroup = UUID.randomUUID().toString(); 
 | 
                int snippetCount = this.distribute(job, jobParam, distributeGroup); 
 | 
                systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode()); 
 | 
                systemJobLog.setRemark("任务已分发成" + snippetCount + "个分片任务,分发组:" + distributeGroup); 
 | 
                return; 
 | 
            } 
 | 
            // 不存在分片,直接执行任务(根据任务处理器名称获取任务处理器实例并执行) 
 | 
            BaseJob jobBean = Utils.SpringContext.getBean(job.getHandler(), BaseJob.class); 
 | 
            JobContext jobContext = jobBean.execute(jobParam); 
 | 
            // 解锁任务 
 | 
            this.unlock(job, dto.getNextFireTime()); 
 | 
            locked = false; 
 | 
            // 修改日志信息 
 | 
            if (job.getWithLog()) { 
 | 
                systemJobLog.setRemark("任务执行完成"); 
 | 
                systemJobLog.setHandleTotalSize(jobContext.getHandleTotalSize()); 
 | 
                systemJobLog.setHandleSuccessSize(jobContext.getHandleSuccessSize()); 
 | 
                systemJobLog.setContext(jobContext.getContext()); 
 | 
                systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode()); 
 | 
            } 
 | 
        } catch (LockedException e) { 
 | 
            log(job, e.getMessage()); 
 | 
            // 修改日志状态 
 | 
            if (job.getWithLog()) { 
 | 
                systemJobLog.setStatus(Constants.Job.LogStatus.IGNORE.getCode()); 
 | 
                systemJobLog.setRemark("任务正在被其他服务执行或锁定"); 
 | 
            } 
 | 
        } catch (Exception e) { 
 | 
            if (job != null) { 
 | 
                error(job, e); 
 | 
                // 修改日志状态 
 | 
                if (job.getWithLog()) { 
 | 
                    systemJobLog.setStatus(Constants.Job.LogStatus.FAILED.getCode()); 
 | 
                    systemJobLog.setRemark("任务执行失败,出现异常:" + e.getMessage()); 
 | 
                } 
 | 
            } else { 
 | 
                e.printStackTrace(); 
 | 
            } 
 | 
        } finally { 
 | 
            // 更新日志 
 | 
            if (job != null && job.getWithLog() && systemJobLog.getId() != null) { 
 | 
                systemJobLog.setRunTimeEnd(new Date()); 
 | 
                systemJobLog.setRunSpendTime(Integer.valueOf("" + (systemJobLog.getRunTimeEnd().getTime() - systemJobLog.getRunTimeStart().getTime()))); 
 | 
                systemJobLogMapper.updateById(systemJobLog); 
 | 
            } 
 | 
            // 任务解锁 
 | 
            if (locked) { 
 | 
                this.unlock(job, dto.getNextFireTime()); 
 | 
                log(job, "Job execute completed."); 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 锁定JOB 
 | 
     * 
 | 
     * @param job JOB 
 | 
     */ 
 | 
    private void lock (SystemJob job) throws LockedException{ 
 | 
        log(job, "Lock job."); 
 | 
        if (Constants.equalsInteger(job.getStatus(),Constants.Job.JobStatus.RUNNING.getCode())) { 
 | 
            throw new LockedException("EVA: job is running."); 
 | 
        } 
 | 
        SystemJob newJob = new SystemJob(); 
 | 
        newJob.setStatus(Constants.Job.JobStatus.RUNNING.getCode()); 
 | 
        newJob.setLockNumber(job.getLockNumber() + 1); 
 | 
        // 不修改更新人和更新时间 
 | 
        newJob.setUpdateUser(Constants.Ignore.IGNORE_USER); 
 | 
        newJob.setUpdateTime(Constants.Ignore.IGNORE_TIME); 
 | 
        int result = systemJobMapper.update(newJob, new UpdateWrapper<SystemJob>() 
 | 
                .lambda() 
 | 
                .eq(SystemJob::getId, job.getId()) 
 | 
                .eq(SystemJob::getLockNumber, job.getLockNumber()) 
 | 
        ); 
 | 
        if (result == 0) { 
 | 
            throw new LockedException("EVA: job is locked."); 
 | 
        } 
 | 
        log(job, "Lock job completed."); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 解锁JOB 
 | 
     * 
 | 
     * @param job JOB 
 | 
     */ 
 | 
    private void unlock(SystemJob job, Date nextFireTime) { 
 | 
        log(job, "Unlock job."); 
 | 
        SystemJob newJob = new SystemJob(); 
 | 
        newJob.setId(job.getId()); 
 | 
        newJob.setStatus(Constants.Job.JobStatus.READY.getCode()); 
 | 
        newJob.setNextFireTime(nextFireTime); 
 | 
        // 不修改更新人和更新时间 
 | 
        newJob.setUpdateUser(Constants.Ignore.IGNORE_USER); 
 | 
        newJob.setUpdateTime(Constants.Ignore.IGNORE_TIME); 
 | 
        systemJobMapper.updateById(newJob); 
 | 
        log(job, "Unlock job completed."); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 创建异步分片 
 | 
     * 
 | 
     * @param job JOB 
 | 
     * @param jobParam JOB参数 
 | 
     */ 
 | 
    private String createAsyncSnippet (SystemJob job, JobParam jobParam) { 
 | 
        log(job, "Begin create async snippet."); 
 | 
        String distributeGroup = UUID.randomUUID().toString(); 
 | 
        SystemJobSnippet snippet = new SystemJobSnippet(); 
 | 
        snippet.setJobId(job.getId()); 
 | 
        snippet.setJobName(job.getJobName()); 
 | 
        snippet.setJobBusinessTime(jobParam.getBusinessTime()); 
 | 
        snippet.setJobDistributeGroup(distributeGroup); 
 | 
        snippet.setDistributeHandler(job.getDistributeHandler()); 
 | 
        snippet.setDistributeLimit(job.getDistributeLimit()); 
 | 
        snippet.setHandler(job.getHandler()); 
 | 
        snippet.setSnippetIndex(0); 
 | 
        snippet.setSnippetCode(UUID.randomUUID().toString()); 
 | 
        snippet.setWithLog(job.getWithLog()); 
 | 
        snippet.setAllowServerIps(job.getAllowServerIps()); 
 | 
        snippet.setStatus(Constants.Job.JobStatus.READY.getCode()); 
 | 
        systemJobSnippetMapper.insert(snippet); 
 | 
        log(job, "Create async snippet completed."); 
 | 
        return snippet.getSnippetCode(); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 分发 
 | 
     * 
 | 
     * @param job JOB 
 | 
     * @param jobParam JOB参数 
 | 
     * @param distributeGroup 分发组 
 | 
     * @return int 
 | 
     */ 
 | 
    private int distribute (SystemJob job, JobParam jobParam, String distributeGroup) { 
 | 
        log(job, "Begin distribute."); 
 | 
        BaseDistributer distributer = Utils.SpringContext.getBean(job.getDistributeHandler(), BaseDistributer.class); 
 | 
        List<List<?>> snippetDatas = distributer.getSnippetData(new BaseDistributer.DistributeDTO(job.getDistributeLimit()), jobParam); 
 | 
        for (int i = 0; i < snippetDatas.size(); i++) { 
 | 
            List snippetData = snippetDatas.get(i); 
 | 
            SystemJobSnippet snippet = new SystemJobSnippet(); 
 | 
            snippet.setJobId(job.getId()); 
 | 
            snippet.setJobName(job.getJobName()); 
 | 
            snippet.setJobBusinessTime(jobParam.getBusinessTime()); 
 | 
            snippet.setJobDistributeGroup(distributeGroup); 
 | 
            snippet.setHandler(job.getHandler()); 
 | 
            snippet.setSnippetIndex(i); 
 | 
            snippet.setSnippetCode(UUID.randomUUID().toString()); 
 | 
            snippet.setSnippetData(JSON.toJSONString(snippetData)); 
 | 
            snippet.setSnippetDataSize(snippetData.size()); 
 | 
            snippet.setWithLog(job.getWithLog()); 
 | 
            snippet.setAllowServerIps(job.getAllowServerIps()); 
 | 
            snippet.setStatus(Constants.Job.JobStatus.READY.getCode()); 
 | 
            systemJobSnippetMapper.insert(snippet); 
 | 
        } 
 | 
        log(job, "distribute completed."); 
 | 
        return snippetDatas.size(); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 验证Job是否可执行 
 | 
     * 
 | 
     * @param job JOB 
 | 
     */ 
 | 
    private boolean checkJob (SystemJob job, Date businessTime) { 
 | 
        // 执行时间验证(任务预计下一次执行<=当前执行时间即可执行) 
 | 
        if (businessTime != null && job.getNextFireTime() != null && job.getNextFireTime().compareTo(businessTime) == 1) { 
 | 
            return Boolean.FALSE; 
 | 
        } 
 | 
        // 服务器验证 
 | 
        if (StringUtils.isNotBlank(job.getAllowServerIps())) { 
 | 
            return job.getAllowServerIps().contains(Utils.Server.getIP()); 
 | 
        } 
 | 
        return Boolean.TRUE; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 打印日志 
 | 
     * 
 | 
     * @param job JOB 
 | 
     * @param text 日志模版内容 
 | 
     * @param params 日志参数 
 | 
     */ 
 | 
    private void log (SystemJob job, String text, Object... params) { 
 | 
        log.trace("{}: " + text, String.format("%s_%s", job.getJobName(), jobUnique.get()), params); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 打印错误日志 
 | 
     * 
 | 
     * @param job JOB 
 | 
     * @param e 异常对象 
 | 
     */ 
 | 
    private void error (SystemJob job, Exception e) { 
 | 
        log.error(String.format("EVA: %s_%s", job.getJobName(), jobUnique.get()) + ": Handle job throw an exception.", e); 
 | 
    } 
 | 
} 
 |