package com.doumee.biz.system.impl; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.doumee.biz.system.SystemJobTriggerBiz; import com.doumee.biz.system.dto.TriggerJobDTO; import com.doumee.core.constants.Constants; import com.doumee.core.exception.LockedException; import com.doumee.core.job.BaseDistributer; import com.doumee.core.job.BaseJob; import com.doumee.core.job.JobContext; import com.doumee.core.job.JobParam; import com.doumee.core.utils.Utils; import com.doumee.dao.system.SystemJobLogMapper; import com.doumee.dao.system.SystemJobMapper; import com.doumee.dao.system.SystemJobSnippetMapper; import com.doumee.dao.system.model.SystemJob; import com.doumee.dao.system.model.SystemJobLog; import com.doumee.dao.system.model.SystemJobSnippet; import com.doumee.service.system.SystemJobLogService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Date; import java.util.List; import java.util.UUID; @Slf4j @Service public class SystemJobTriggerBizImpl implements SystemJobTriggerBiz { @Autowired private SystemJobLogMapper systemJobLogMapper; @Autowired private SystemJobMapper systemJobMapper; @Autowired private SystemJobSnippetMapper systemJobSnippetMapper; private ThreadLocal jobUnique = new ThreadLocal<>(); @Override public void trigger(TriggerJobDTO dto) { SystemJob job = null; JobParam jobParam = new JobParam(); boolean locked = false; SystemJobLog systemJobLog = new SystemJobLog(); try { Integer jobId = dto.getId(); job = systemJobMapper.selectById(jobId); // 验证JOB if (!this.checkJob(job, dto.getBusinessTime())) { return; } // 初始化JOB数据 jobUnique.set(RandomStringUtils.randomAlphabetic(8)); log(job, "Job started"); log(job, "Job thread name: {}", Thread.currentThread().getName()); log(job, "Job scheduled fire time: {}", Utils.Date.format(dto.getScheduledFireTime())); log(job, "Job fire time: {}", Utils.Date.format(dto.getFireTime())); log(job, "Job business time: {}", Utils.Date.format(dto.getBusinessTime())); jobParam.setId(jobId); jobParam.setTriggerType(dto.getTriggerType()); jobParam.setBusinessTime(dto.getBusinessTime()); // 记录日志 if (job.getWithLog()) { systemJobLog.setJobId(jobId); systemJobLog.setJobName(job.getJobName()); systemJobLog.setRunTimeStart(new Date()); systemJobLog.setServerIp(Utils.Server.getIP()); systemJobLog.setBusinessTime(dto.getBusinessTime()); systemJobLog.setStatus(Constants.Job.LogStatus.NONE.getCode()); systemJobLog.setTriggerType(dto.getTriggerType()); systemJobLog.setRemark("任务正在执行中"); systemJobLogMapper.insert(systemJobLog); } // 任务加锁,防止多服务器重复执行 this.lock(job); locked = true; // 如果开启了异步执行,则创建一个分片 if (job.getWithAsync()) { String snippetCode = this.createAsyncSnippet(job, jobParam); systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode()); systemJobLog.setRemark("异步任务已转为分片任务,分片编码:" + snippetCode); return; } // 如果存在分片 if (StringUtils.isNotBlank(job.getDistributeHandler())) { String distributeGroup = UUID.randomUUID().toString(); int snippetCount = this.distribute(job, jobParam, distributeGroup); systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode()); systemJobLog.setRemark("任务已分发成" + snippetCount + "个分片任务,分发组:" + distributeGroup); return; } // 不存在分片,直接执行任务(根据任务处理器名称获取任务处理器实例并执行) BaseJob jobBean = Utils.SpringContext.getBean(job.getHandler(), BaseJob.class); JobContext jobContext = jobBean.execute(jobParam); // 解锁任务 this.unlock(job, dto.getNextFireTime()); locked = false; // 修改日志信息 if (job.getWithLog()) { systemJobLog.setRemark("任务执行完成"); systemJobLog.setHandleTotalSize(jobContext.getHandleTotalSize()); systemJobLog.setHandleSuccessSize(jobContext.getHandleSuccessSize()); systemJobLog.setContext(jobContext.getContext()); systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode()); } } catch (LockedException e) { log(job, e.getMessage()); // 修改日志状态 if (job.getWithLog()) { systemJobLog.setStatus(Constants.Job.LogStatus.IGNORE.getCode()); systemJobLog.setRemark("任务正在被其他服务执行或锁定"); } } catch (Exception e) { if (job != null) { error(job, e); // 修改日志状态 if (job.getWithLog()) { systemJobLog.setStatus(Constants.Job.LogStatus.FAILED.getCode()); systemJobLog.setRemark("任务执行失败,出现异常:" + e.getMessage()); } } else { e.printStackTrace(); } } finally { // 更新日志 if (job != null && job.getWithLog() && systemJobLog.getId() != null) { systemJobLog.setRunTimeEnd(new Date()); systemJobLog.setRunSpendTime(Integer.valueOf("" + (systemJobLog.getRunTimeEnd().getTime() - systemJobLog.getRunTimeStart().getTime()))); systemJobLogMapper.updateById(systemJobLog); } // 任务解锁 if (locked) { this.unlock(job, dto.getNextFireTime()); log(job, "Job execute completed."); } } } /** * 锁定JOB * * @param job JOB */ private void lock (SystemJob job) throws LockedException{ log(job, "Lock job."); if (Constants.equalsInteger(job.getStatus(),Constants.Job.JobStatus.RUNNING.getCode())) { throw new LockedException("EVA: job is running."); } SystemJob newJob = new SystemJob(); newJob.setStatus(Constants.Job.JobStatus.RUNNING.getCode()); newJob.setLockNumber(job.getLockNumber() + 1); // 不修改更新人和更新时间 newJob.setUpdateUser(Constants.Ignore.IGNORE_USER); newJob.setUpdateTime(Constants.Ignore.IGNORE_TIME); int result = systemJobMapper.update(newJob, new UpdateWrapper() .lambda() .eq(SystemJob::getId, job.getId()) .eq(SystemJob::getLockNumber, job.getLockNumber()) ); if (result == 0) { throw new LockedException("EVA: job is locked."); } log(job, "Lock job completed."); } /** * 解锁JOB * * @param job JOB */ private void unlock(SystemJob job, Date nextFireTime) { log(job, "Unlock job."); SystemJob newJob = new SystemJob(); newJob.setId(job.getId()); newJob.setStatus(Constants.Job.JobStatus.READY.getCode()); newJob.setNextFireTime(nextFireTime); // 不修改更新人和更新时间 newJob.setUpdateUser(Constants.Ignore.IGNORE_USER); newJob.setUpdateTime(Constants.Ignore.IGNORE_TIME); systemJobMapper.updateById(newJob); log(job, "Unlock job completed."); } /** * 创建异步分片 * * @param job JOB * @param jobParam JOB参数 */ private String createAsyncSnippet (SystemJob job, JobParam jobParam) { log(job, "Begin create async snippet."); String distributeGroup = UUID.randomUUID().toString(); SystemJobSnippet snippet = new SystemJobSnippet(); snippet.setJobId(job.getId()); snippet.setJobName(job.getJobName()); snippet.setJobBusinessTime(jobParam.getBusinessTime()); snippet.setJobDistributeGroup(distributeGroup); snippet.setDistributeHandler(job.getDistributeHandler()); snippet.setDistributeLimit(job.getDistributeLimit()); snippet.setHandler(job.getHandler()); snippet.setSnippetIndex(0); snippet.setSnippetCode(UUID.randomUUID().toString()); snippet.setWithLog(job.getWithLog()); snippet.setAllowServerIps(job.getAllowServerIps()); snippet.setStatus(Constants.Job.JobStatus.READY.getCode()); systemJobSnippetMapper.insert(snippet); log(job, "Create async snippet completed."); return snippet.getSnippetCode(); } /** * 分发 * * @param job JOB * @param jobParam JOB参数 * @param distributeGroup 分发组 * @return int */ private int distribute (SystemJob job, JobParam jobParam, String distributeGroup) { log(job, "Begin distribute."); BaseDistributer distributer = Utils.SpringContext.getBean(job.getDistributeHandler(), BaseDistributer.class); List> snippetDatas = distributer.getSnippetData(new BaseDistributer.DistributeDTO(job.getDistributeLimit()), jobParam); for (int i = 0; i < snippetDatas.size(); i++) { List snippetData = snippetDatas.get(i); SystemJobSnippet snippet = new SystemJobSnippet(); snippet.setJobId(job.getId()); snippet.setJobName(job.getJobName()); snippet.setJobBusinessTime(jobParam.getBusinessTime()); snippet.setJobDistributeGroup(distributeGroup); snippet.setHandler(job.getHandler()); snippet.setSnippetIndex(i); snippet.setSnippetCode(UUID.randomUUID().toString()); snippet.setSnippetData(JSON.toJSONString(snippetData)); snippet.setSnippetDataSize(snippetData.size()); snippet.setWithLog(job.getWithLog()); snippet.setAllowServerIps(job.getAllowServerIps()); snippet.setStatus(Constants.Job.JobStatus.READY.getCode()); systemJobSnippetMapper.insert(snippet); } log(job, "distribute completed."); return snippetDatas.size(); } /** * 验证Job是否可执行 * * @param job JOB */ private boolean checkJob (SystemJob job, Date businessTime) { // 执行时间验证(任务预计下一次执行<=当前执行时间即可执行) if (businessTime != null && job.getNextFireTime() != null && job.getNextFireTime().compareTo(businessTime) == 1) { return Boolean.FALSE; } // 服务器验证 if (StringUtils.isNotBlank(job.getAllowServerIps())) { return job.getAllowServerIps().contains(Utils.Server.getIP()); } return Boolean.TRUE; } /** * 打印日志 * * @param job JOB * @param text 日志模版内容 * @param params 日志参数 */ private void log (SystemJob job, String text, Object... params) { log.trace("{}: " + text, String.format("%s_%s", job.getJobName(), jobUnique.get()), params); } /** * 打印错误日志 * * @param job JOB * @param e 异常对象 */ private void error (SystemJob job, Exception e) { log.error(String.format("EVA: %s_%s", job.getJobName(), jobUnique.get()) + ": Handle job throw an exception.", e); } }