| package com.doumee.biz.system.impl; | 
|   | 
| import com.alibaba.fastjson.JSON; | 
| import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; | 
| import com.doumee.biz.system.SystemJobTriggerBiz; | 
| import com.doumee.biz.system.dto.TriggerJobDTO; | 
| import com.doumee.core.constants.Constants; | 
| import com.doumee.core.exception.LockedException; | 
| import com.doumee.core.job.BaseDistributer; | 
| import com.doumee.core.job.BaseJob; | 
| import com.doumee.core.job.JobContext; | 
| import com.doumee.core.job.JobParam; | 
| import com.doumee.core.utils.Utils; | 
| import com.doumee.dao.system.SystemJobLogMapper; | 
| import com.doumee.dao.system.SystemJobMapper; | 
| import com.doumee.dao.system.SystemJobSnippetMapper; | 
| import com.doumee.dao.system.model.SystemJob; | 
| import com.doumee.dao.system.model.SystemJobLog; | 
| import com.doumee.dao.system.model.SystemJobSnippet; | 
| import com.doumee.service.system.SystemJobLogService; | 
| import lombok.extern.slf4j.Slf4j; | 
| import org.apache.commons.lang3.RandomStringUtils; | 
| import org.apache.commons.lang3.StringUtils; | 
| import org.springframework.beans.factory.annotation.Autowired; | 
| import org.springframework.stereotype.Service; | 
|   | 
| import java.util.Date; | 
| import java.util.List; | 
| import java.util.UUID; | 
|   | 
| @Slf4j | 
| @Service | 
| public class SystemJobTriggerBizImpl implements SystemJobTriggerBiz { | 
|   | 
|     @Autowired | 
|     private SystemJobLogMapper systemJobLogMapper; | 
|   | 
|     @Autowired | 
|     private SystemJobMapper systemJobMapper; | 
|   | 
|     @Autowired | 
|     private SystemJobSnippetMapper systemJobSnippetMapper; | 
|   | 
|     private ThreadLocal<String> jobUnique = new ThreadLocal<>(); | 
|   | 
|     @Override | 
|     public void trigger(TriggerJobDTO dto) { | 
|         SystemJob job = null; | 
|         JobParam jobParam = new JobParam(); | 
|         boolean locked = false; | 
|         SystemJobLog systemJobLog = new SystemJobLog(); | 
|         try { | 
|             Integer jobId = dto.getId(); | 
|             job = systemJobMapper.selectById(jobId); | 
|             // 验证JOB | 
|             if (!this.checkJob(job, dto.getBusinessTime())) { | 
|                 return; | 
|             } | 
|             // 初始化JOB数据 | 
|             jobUnique.set(RandomStringUtils.randomAlphabetic(8)); | 
|             log(job, "Job started"); | 
|             log(job, "Job thread name: {}", Thread.currentThread().getName()); | 
|             log(job, "Job scheduled fire time: {}", Utils.Date.format(dto.getScheduledFireTime())); | 
|             log(job, "Job fire time: {}", Utils.Date.format(dto.getFireTime())); | 
|             log(job, "Job business time: {}", Utils.Date.format(dto.getBusinessTime())); | 
|             jobParam.setId(jobId); | 
|             jobParam.setTriggerType(dto.getTriggerType()); | 
|             jobParam.setBusinessTime(dto.getBusinessTime()); | 
|             // 记录日志 | 
|             if (job.getWithLog()) { | 
|                 systemJobLog.setJobId(jobId); | 
|                 systemJobLog.setJobName(job.getJobName()); | 
|                 systemJobLog.setRunTimeStart(new Date()); | 
|                 systemJobLog.setServerIp(Utils.Server.getIP()); | 
|                 systemJobLog.setBusinessTime(dto.getBusinessTime()); | 
|                 systemJobLog.setStatus(Constants.Job.LogStatus.NONE.getCode()); | 
|                 systemJobLog.setTriggerType(dto.getTriggerType()); | 
|                 systemJobLog.setRemark("任务正在执行中"); | 
|                 systemJobLogMapper.insert(systemJobLog); | 
|             } | 
|             // 任务加锁,防止多服务器重复执行 | 
|             this.lock(job); | 
|             locked = true; | 
|             // 如果开启了异步执行,则创建一个分片 | 
|             if (job.getWithAsync()) { | 
|                 String snippetCode = this.createAsyncSnippet(job, jobParam); | 
|                 systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode()); | 
|                 systemJobLog.setRemark("异步任务已转为分片任务,分片编码:" + snippetCode); | 
|                 return; | 
|             } | 
|             // 如果存在分片 | 
|             if (StringUtils.isNotBlank(job.getDistributeHandler())) { | 
|                 String distributeGroup = UUID.randomUUID().toString(); | 
|                 int snippetCount = this.distribute(job, jobParam, distributeGroup); | 
|                 systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode()); | 
|                 systemJobLog.setRemark("任务已分发成" + snippetCount + "个分片任务,分发组:" + distributeGroup); | 
|                 return; | 
|             } | 
|             // 不存在分片,直接执行任务(根据任务处理器名称获取任务处理器实例并执行) | 
|             BaseJob jobBean = Utils.SpringContext.getBean(job.getHandler(), BaseJob.class); | 
|             JobContext jobContext = jobBean.execute(jobParam); | 
|             // 解锁任务 | 
|             this.unlock(job, dto.getNextFireTime()); | 
|             locked = false; | 
|             // 修改日志信息 | 
|             if (job.getWithLog()) { | 
|                 systemJobLog.setRemark("任务执行完成"); | 
|                 systemJobLog.setHandleTotalSize(jobContext.getHandleTotalSize()); | 
|                 systemJobLog.setHandleSuccessSize(jobContext.getHandleSuccessSize()); | 
|                 systemJobLog.setContext(jobContext.getContext()); | 
|                 systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode()); | 
|             } | 
|         } catch (LockedException e) { | 
|             log(job, e.getMessage()); | 
|             // 修改日志状态 | 
|             if (job.getWithLog()) { | 
|                 systemJobLog.setStatus(Constants.Job.LogStatus.IGNORE.getCode()); | 
|                 systemJobLog.setRemark("任务正在被其他服务执行或锁定"); | 
|             } | 
|         } catch (Exception e) { | 
|             if (job != null) { | 
|                 error(job, e); | 
|                 // 修改日志状态 | 
|                 if (job.getWithLog()) { | 
|                     systemJobLog.setStatus(Constants.Job.LogStatus.FAILED.getCode()); | 
|                     systemJobLog.setRemark("任务执行失败,出现异常:" + e.getMessage()); | 
|                 } | 
|             } else { | 
|                 e.printStackTrace(); | 
|             } | 
|         } finally { | 
|             // 更新日志 | 
|             if (job != null && job.getWithLog() && systemJobLog.getId() != null) { | 
|                 systemJobLog.setRunTimeEnd(new Date()); | 
|                 systemJobLog.setRunSpendTime(Integer.valueOf("" + (systemJobLog.getRunTimeEnd().getTime() - systemJobLog.getRunTimeStart().getTime()))); | 
|                 systemJobLogMapper.updateById(systemJobLog); | 
|             } | 
|             // 任务解锁 | 
|             if (locked) { | 
|                 this.unlock(job, dto.getNextFireTime()); | 
|                 log(job, "Job execute completed."); | 
|             } | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * 锁定JOB | 
|      * | 
|      * @param job JOB | 
|      */ | 
|     private void lock (SystemJob job) throws LockedException{ | 
|         log(job, "Lock job."); | 
|         if (Constants.equalsInteger(job.getStatus(),Constants.Job.JobStatus.RUNNING.getCode())) { | 
|             throw new LockedException("EVA: job is running."); | 
|         } | 
|         SystemJob newJob = new SystemJob(); | 
|         newJob.setStatus(Constants.Job.JobStatus.RUNNING.getCode()); | 
|         newJob.setLockNumber(job.getLockNumber() + 1); | 
|         // 不修改更新人和更新时间 | 
|         newJob.setUpdateUser(Constants.Ignore.IGNORE_USER); | 
|         newJob.setUpdateTime(Constants.Ignore.IGNORE_TIME); | 
|         int result = systemJobMapper.update(newJob, new UpdateWrapper<SystemJob>() | 
|                 .lambda() | 
|                 .eq(SystemJob::getId, job.getId()) | 
|                 .eq(SystemJob::getLockNumber, job.getLockNumber()) | 
|         ); | 
|         if (result == 0) { | 
|             throw new LockedException("EVA: job is locked."); | 
|         } | 
|         log(job, "Lock job completed."); | 
|     } | 
|   | 
|     /** | 
|      * 解锁JOB | 
|      * | 
|      * @param job JOB | 
|      */ | 
|     private void unlock(SystemJob job, Date nextFireTime) { | 
|         log(job, "Unlock job."); | 
|         SystemJob newJob = new SystemJob(); | 
|         newJob.setId(job.getId()); | 
|         newJob.setStatus(Constants.Job.JobStatus.READY.getCode()); | 
|         newJob.setNextFireTime(nextFireTime); | 
|         // 不修改更新人和更新时间 | 
|         newJob.setUpdateUser(Constants.Ignore.IGNORE_USER); | 
|         newJob.setUpdateTime(Constants.Ignore.IGNORE_TIME); | 
|         systemJobMapper.updateById(newJob); | 
|         log(job, "Unlock job completed."); | 
|     } | 
|   | 
|     /** | 
|      * 创建异步分片 | 
|      * | 
|      * @param job JOB | 
|      * @param jobParam JOB参数 | 
|      */ | 
|     private String createAsyncSnippet (SystemJob job, JobParam jobParam) { | 
|         log(job, "Begin create async snippet."); | 
|         String distributeGroup = UUID.randomUUID().toString(); | 
|         SystemJobSnippet snippet = new SystemJobSnippet(); | 
|         snippet.setJobId(job.getId()); | 
|         snippet.setJobName(job.getJobName()); | 
|         snippet.setJobBusinessTime(jobParam.getBusinessTime()); | 
|         snippet.setJobDistributeGroup(distributeGroup); | 
|         snippet.setDistributeHandler(job.getDistributeHandler()); | 
|         snippet.setDistributeLimit(job.getDistributeLimit()); | 
|         snippet.setHandler(job.getHandler()); | 
|         snippet.setSnippetIndex(0); | 
|         snippet.setSnippetCode(UUID.randomUUID().toString()); | 
|         snippet.setWithLog(job.getWithLog()); | 
|         snippet.setAllowServerIps(job.getAllowServerIps()); | 
|         snippet.setStatus(Constants.Job.JobStatus.READY.getCode()); | 
|         systemJobSnippetMapper.insert(snippet); | 
|         log(job, "Create async snippet completed."); | 
|         return snippet.getSnippetCode(); | 
|     } | 
|   | 
|     /** | 
|      * 分发 | 
|      * | 
|      * @param job JOB | 
|      * @param jobParam JOB参数 | 
|      * @param distributeGroup 分发组 | 
|      * @return int | 
|      */ | 
|     private int distribute (SystemJob job, JobParam jobParam, String distributeGroup) { | 
|         log(job, "Begin distribute."); | 
|         BaseDistributer distributer = Utils.SpringContext.getBean(job.getDistributeHandler(), BaseDistributer.class); | 
|         List<List<?>> snippetDatas = distributer.getSnippetData(new BaseDistributer.DistributeDTO(job.getDistributeLimit()), jobParam); | 
|         for (int i = 0; i < snippetDatas.size(); i++) { | 
|             List snippetData = snippetDatas.get(i); | 
|             SystemJobSnippet snippet = new SystemJobSnippet(); | 
|             snippet.setJobId(job.getId()); | 
|             snippet.setJobName(job.getJobName()); | 
|             snippet.setJobBusinessTime(jobParam.getBusinessTime()); | 
|             snippet.setJobDistributeGroup(distributeGroup); | 
|             snippet.setHandler(job.getHandler()); | 
|             snippet.setSnippetIndex(i); | 
|             snippet.setSnippetCode(UUID.randomUUID().toString()); | 
|             snippet.setSnippetData(JSON.toJSONString(snippetData)); | 
|             snippet.setSnippetDataSize(snippetData.size()); | 
|             snippet.setWithLog(job.getWithLog()); | 
|             snippet.setAllowServerIps(job.getAllowServerIps()); | 
|             snippet.setStatus(Constants.Job.JobStatus.READY.getCode()); | 
|             systemJobSnippetMapper.insert(snippet); | 
|         } | 
|         log(job, "distribute completed."); | 
|         return snippetDatas.size(); | 
|     } | 
|   | 
|     /** | 
|      * 验证Job是否可执行 | 
|      * | 
|      * @param job JOB | 
|      */ | 
|     private boolean checkJob (SystemJob job, Date businessTime) { | 
|         // 执行时间验证(任务预计下一次执行<=当前执行时间即可执行) | 
|         if (businessTime != null && job.getNextFireTime() != null && job.getNextFireTime().compareTo(businessTime) == 1) { | 
|             return Boolean.FALSE; | 
|         } | 
|         // 服务器验证 | 
|         if (StringUtils.isNotBlank(job.getAllowServerIps())) { | 
|             return job.getAllowServerIps().contains(Utils.Server.getIP()); | 
|         } | 
|         return Boolean.TRUE; | 
|     } | 
|   | 
|     /** | 
|      * 打印日志 | 
|      * | 
|      * @param job JOB | 
|      * @param text 日志模版内容 | 
|      * @param params 日志参数 | 
|      */ | 
|     private void log (SystemJob job, String text, Object... params) { | 
|         log.trace("{}: " + text, String.format("%s_%s", job.getJobName(), jobUnique.get()), params); | 
|     } | 
|   | 
|     /** | 
|      * 打印错误日志 | 
|      * | 
|      * @param job JOB | 
|      * @param e 异常对象 | 
|      */ | 
|     private void error (SystemJob job, Exception e) { | 
|         log.error(String.format("EVA: %s_%s", job.getJobName(), jobUnique.get()) + ": Handle job throw an exception.", e); | 
|     } | 
| } |