package com.doumee.biz.system.impl;
|
|
import com.alibaba.fastjson.JSON;
|
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
import com.doumee.biz.system.SystemJobTriggerBiz;
|
import com.doumee.biz.system.dto.TriggerJobDTO;
|
import com.doumee.core.constants.Constants;
|
import com.doumee.core.exception.LockedException;
|
import com.doumee.core.job.BaseDistributer;
|
import com.doumee.core.job.BaseJob;
|
import com.doumee.core.job.JobContext;
|
import com.doumee.core.job.JobParam;
|
import com.doumee.core.utils.Utils;
|
import com.doumee.dao.system.SystemJobLogMapper;
|
import com.doumee.dao.system.SystemJobMapper;
|
import com.doumee.dao.system.SystemJobSnippetMapper;
|
import com.doumee.dao.system.model.SystemJob;
|
import com.doumee.dao.system.model.SystemJobLog;
|
import com.doumee.dao.system.model.SystemJobSnippet;
|
import com.doumee.service.system.SystemJobLogService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.RandomStringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import java.util.Date;
|
import java.util.List;
|
import java.util.UUID;
|
|
@Slf4j
|
@Service
|
public class SystemJobTriggerBizImpl implements SystemJobTriggerBiz {
|
|
@Autowired
|
private SystemJobLogMapper systemJobLogMapper;
|
|
@Autowired
|
private SystemJobMapper systemJobMapper;
|
|
@Autowired
|
private SystemJobSnippetMapper systemJobSnippetMapper;
|
|
private ThreadLocal<String> jobUnique = new ThreadLocal<>();
|
|
@Override
|
public void trigger(TriggerJobDTO dto) {
|
SystemJob job = null;
|
JobParam jobParam = new JobParam();
|
boolean locked = false;
|
SystemJobLog systemJobLog = new SystemJobLog();
|
try {
|
Integer jobId = dto.getId();
|
job = systemJobMapper.selectById(jobId);
|
// 验证JOB
|
if (!this.checkJob(job, dto.getBusinessTime())) {
|
return;
|
}
|
// 初始化JOB数据
|
jobUnique.set(RandomStringUtils.randomAlphabetic(8));
|
log(job, "Job started");
|
log(job, "Job thread name: {}", Thread.currentThread().getName());
|
log(job, "Job scheduled fire time: {}", Utils.Date.format(dto.getScheduledFireTime()));
|
log(job, "Job fire time: {}", Utils.Date.format(dto.getFireTime()));
|
log(job, "Job business time: {}", Utils.Date.format(dto.getBusinessTime()));
|
jobParam.setId(jobId);
|
jobParam.setTriggerType(dto.getTriggerType());
|
jobParam.setBusinessTime(dto.getBusinessTime());
|
// 记录日志
|
if (job.getWithLog()) {
|
systemJobLog.setJobId(jobId);
|
systemJobLog.setJobName(job.getJobName());
|
systemJobLog.setRunTimeStart(new Date());
|
systemJobLog.setServerIp(Utils.Server.getIP());
|
systemJobLog.setBusinessTime(dto.getBusinessTime());
|
systemJobLog.setStatus(Constants.Job.LogStatus.NONE.getCode());
|
systemJobLog.setTriggerType(dto.getTriggerType());
|
systemJobLog.setRemark("任务正在执行中");
|
systemJobLogMapper.insert(systemJobLog);
|
}
|
// 任务加锁,防止多服务器重复执行
|
this.lock(job);
|
locked = true;
|
// 如果开启了异步执行,则创建一个分片
|
if (job.getWithAsync()) {
|
String snippetCode = this.createAsyncSnippet(job, jobParam);
|
systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode());
|
systemJobLog.setRemark("异步任务已转为分片任务,分片编码:" + snippetCode);
|
return;
|
}
|
// 如果存在分片
|
if (StringUtils.isNotBlank(job.getDistributeHandler())) {
|
String distributeGroup = UUID.randomUUID().toString();
|
int snippetCount = this.distribute(job, jobParam, distributeGroup);
|
systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode());
|
systemJobLog.setRemark("任务已分发成" + snippetCount + "个分片任务,分发组:" + distributeGroup);
|
return;
|
}
|
// 不存在分片,直接执行任务(根据任务处理器名称获取任务处理器实例并执行)
|
BaseJob jobBean = Utils.SpringContext.getBean(job.getHandler(), BaseJob.class);
|
JobContext jobContext = jobBean.execute(jobParam);
|
// 解锁任务
|
this.unlock(job, dto.getNextFireTime());
|
locked = false;
|
// 修改日志信息
|
if (job.getWithLog()) {
|
systemJobLog.setRemark("任务执行完成");
|
systemJobLog.setHandleTotalSize(jobContext.getHandleTotalSize());
|
systemJobLog.setHandleSuccessSize(jobContext.getHandleSuccessSize());
|
systemJobLog.setContext(jobContext.getContext());
|
systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode());
|
}
|
} catch (LockedException e) {
|
log(job, e.getMessage());
|
// 修改日志状态
|
if (job.getWithLog()) {
|
systemJobLog.setStatus(Constants.Job.LogStatus.IGNORE.getCode());
|
systemJobLog.setRemark("任务正在被其他服务执行或锁定");
|
}
|
} catch (Exception e) {
|
if (job != null) {
|
error(job, e);
|
// 修改日志状态
|
if (job.getWithLog()) {
|
systemJobLog.setStatus(Constants.Job.LogStatus.FAILED.getCode());
|
systemJobLog.setRemark("任务执行失败,出现异常:" + e.getMessage());
|
}
|
} else {
|
e.printStackTrace();
|
}
|
} finally {
|
// 更新日志
|
if (job != null && job.getWithLog() && systemJobLog.getId() != null) {
|
systemJobLog.setRunTimeEnd(new Date());
|
systemJobLog.setRunSpendTime(Integer.valueOf("" + (systemJobLog.getRunTimeEnd().getTime() - systemJobLog.getRunTimeStart().getTime())));
|
systemJobLogMapper.updateById(systemJobLog);
|
}
|
// 任务解锁
|
if (locked) {
|
this.unlock(job, dto.getNextFireTime());
|
log(job, "Job execute completed.");
|
}
|
}
|
}
|
|
/**
|
* 锁定JOB
|
*
|
* @param job JOB
|
*/
|
private void lock (SystemJob job) throws LockedException{
|
log(job, "Lock job.");
|
if (Constants.equalsInteger(job.getStatus(),Constants.Job.JobStatus.RUNNING.getCode())) {
|
throw new LockedException("EVA: job is running.");
|
}
|
SystemJob newJob = new SystemJob();
|
newJob.setStatus(Constants.Job.JobStatus.RUNNING.getCode());
|
newJob.setLockNumber(job.getLockNumber() + 1);
|
// 不修改更新人和更新时间
|
newJob.setUpdateUser(Constants.Ignore.IGNORE_USER);
|
newJob.setUpdateTime(Constants.Ignore.IGNORE_TIME);
|
int result = systemJobMapper.update(newJob, new UpdateWrapper<SystemJob>()
|
.lambda()
|
.eq(SystemJob::getId, job.getId())
|
.eq(SystemJob::getLockNumber, job.getLockNumber())
|
);
|
if (result == 0) {
|
throw new LockedException("EVA: job is locked.");
|
}
|
log(job, "Lock job completed.");
|
}
|
|
/**
|
* 解锁JOB
|
*
|
* @param job JOB
|
*/
|
private void unlock(SystemJob job, Date nextFireTime) {
|
log(job, "Unlock job.");
|
SystemJob newJob = new SystemJob();
|
newJob.setId(job.getId());
|
newJob.setStatus(Constants.Job.JobStatus.READY.getCode());
|
newJob.setNextFireTime(nextFireTime);
|
// 不修改更新人和更新时间
|
newJob.setUpdateUser(Constants.Ignore.IGNORE_USER);
|
newJob.setUpdateTime(Constants.Ignore.IGNORE_TIME);
|
systemJobMapper.updateById(newJob);
|
log(job, "Unlock job completed.");
|
}
|
|
/**
|
* 创建异步分片
|
*
|
* @param job JOB
|
* @param jobParam JOB参数
|
*/
|
private String createAsyncSnippet (SystemJob job, JobParam jobParam) {
|
log(job, "Begin create async snippet.");
|
String distributeGroup = UUID.randomUUID().toString();
|
SystemJobSnippet snippet = new SystemJobSnippet();
|
snippet.setJobId(job.getId());
|
snippet.setJobName(job.getJobName());
|
snippet.setJobBusinessTime(jobParam.getBusinessTime());
|
snippet.setJobDistributeGroup(distributeGroup);
|
snippet.setDistributeHandler(job.getDistributeHandler());
|
snippet.setDistributeLimit(job.getDistributeLimit());
|
snippet.setHandler(job.getHandler());
|
snippet.setSnippetIndex(0);
|
snippet.setSnippetCode(UUID.randomUUID().toString());
|
snippet.setWithLog(job.getWithLog());
|
snippet.setAllowServerIps(job.getAllowServerIps());
|
snippet.setStatus(Constants.Job.JobStatus.READY.getCode());
|
systemJobSnippetMapper.insert(snippet);
|
log(job, "Create async snippet completed.");
|
return snippet.getSnippetCode();
|
}
|
|
/**
|
* 分发
|
*
|
* @param job JOB
|
* @param jobParam JOB参数
|
* @param distributeGroup 分发组
|
* @return int
|
*/
|
private int distribute (SystemJob job, JobParam jobParam, String distributeGroup) {
|
log(job, "Begin distribute.");
|
BaseDistributer distributer = Utils.SpringContext.getBean(job.getDistributeHandler(), BaseDistributer.class);
|
List<List<?>> snippetDatas = distributer.getSnippetData(new BaseDistributer.DistributeDTO(job.getDistributeLimit()), jobParam);
|
for (int i = 0; i < snippetDatas.size(); i++) {
|
List snippetData = snippetDatas.get(i);
|
SystemJobSnippet snippet = new SystemJobSnippet();
|
snippet.setJobId(job.getId());
|
snippet.setJobName(job.getJobName());
|
snippet.setJobBusinessTime(jobParam.getBusinessTime());
|
snippet.setJobDistributeGroup(distributeGroup);
|
snippet.setHandler(job.getHandler());
|
snippet.setSnippetIndex(i);
|
snippet.setSnippetCode(UUID.randomUUID().toString());
|
snippet.setSnippetData(JSON.toJSONString(snippetData));
|
snippet.setSnippetDataSize(snippetData.size());
|
snippet.setWithLog(job.getWithLog());
|
snippet.setAllowServerIps(job.getAllowServerIps());
|
snippet.setStatus(Constants.Job.JobStatus.READY.getCode());
|
systemJobSnippetMapper.insert(snippet);
|
}
|
log(job, "distribute completed.");
|
return snippetDatas.size();
|
}
|
|
/**
|
* 验证Job是否可执行
|
*
|
* @param job JOB
|
*/
|
private boolean checkJob (SystemJob job, Date businessTime) {
|
// 执行时间验证(任务预计下一次执行<=当前执行时间即可执行)
|
if (businessTime != null && job.getNextFireTime() != null && job.getNextFireTime().compareTo(businessTime) == 1) {
|
return Boolean.FALSE;
|
}
|
// 服务器验证
|
if (StringUtils.isNotBlank(job.getAllowServerIps())) {
|
return job.getAllowServerIps().contains(Utils.Server.getIP());
|
}
|
return Boolean.TRUE;
|
}
|
|
/**
|
* 打印日志
|
*
|
* @param job JOB
|
* @param text 日志模版内容
|
* @param params 日志参数
|
*/
|
private void log (SystemJob job, String text, Object... params) {
|
log.trace("{}: " + text, String.format("%s_%s", job.getJobName(), jobUnique.get()), params);
|
}
|
|
/**
|
* 打印错误日志
|
*
|
* @param job JOB
|
* @param e 异常对象
|
*/
|
private void error (SystemJob job, Exception e) {
|
log.error(String.format("EVA: %s_%s", job.getJobName(), jobUnique.get()) + ": Handle job throw an exception.", e);
|
}
|
}
|