package com.doumee.biz.system.impl;
|
|
import com.alibaba.fastjson.JSON;
|
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
import com.doumee.biz.system.SystemJobSnippetTriggerBiz;
|
import com.doumee.biz.system.dto.TriggerJobSnippetDTO;
|
import com.doumee.core.constants.Constants;
|
import com.doumee.core.exception.LockedException;
|
import com.doumee.core.job.BaseDistributer;
|
import com.doumee.core.job.BaseJob;
|
import com.doumee.core.job.JobContext;
|
import com.doumee.core.job.JobParam;
|
import com.doumee.core.utils.Utils;
|
import com.doumee.dao.system.SystemJobSnippetMapper;
|
import com.doumee.dao.system.model.SystemJobLog;
|
import com.doumee.dao.system.model.SystemJobSnippet;
|
import com.doumee.service.system.SystemJobLogService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import java.util.Date;
|
import java.util.List;
|
import java.util.UUID;
|
|
@Slf4j
|
@Service
|
public class SystemJobSnippetTriggerBizImpl implements SystemJobSnippetTriggerBiz {
|
|
@Autowired
|
private SystemJobLogService systemJobLogService;
|
|
@Autowired
|
private SystemJobSnippetMapper systemJobSnippetMapper;
|
|
@Override
|
public void trigger(TriggerJobSnippetDTO dto) {
|
Utils.ThreadPool.start(() -> {
|
SystemJobSnippet snippet = systemJobSnippetMapper.selectById(dto.getId());
|
SystemJobLog systemJobLog = new SystemJobLog();
|
// 验证分片
|
if(!this.checkSnippet(snippet)) {
|
return;
|
}
|
boolean locked = false;
|
try {
|
log(snippet, "Snippet started");
|
// 记录日志
|
if (snippet.getWithLog()) {
|
systemJobLog.setRunTimeStart(new Date());
|
systemJobLog.setJobId(snippet.getJobId());
|
systemJobLog.setJobName(snippet.getJobName());
|
systemJobLog.setJobDistributeGroup(snippet.getJobDistributeGroup());
|
systemJobLog.setSnippetId(snippet.getId());
|
systemJobLog.setSnippetCode(snippet.getSnippetCode());
|
systemJobLog.setBusinessTime(snippet.getJobBusinessTime());
|
systemJobLog.setRemark("分片正在执行中");
|
systemJobLog.setServerIp(Utils.Server.getIP());
|
systemJobLog.setStatus(Constants.Job.LogStatus.NONE.getCode());
|
systemJobLog.setTriggerType(dto.getTriggerType());
|
systemJobLogService.create(systemJobLog);
|
}
|
// 锁定当前分片,防止其他服务器重复执行
|
this.lock(snippet);
|
locked = true;
|
BaseJob jobBean = Utils.SpringContext.getBean(snippet.getHandler(), BaseJob.class);
|
JobParam jobParam = new JobParam();
|
jobParam.setId(snippet.getId());
|
jobParam.setSnippetIndex(snippet.getSnippetIndex());
|
jobParam.setBusinessTime(snippet.getJobBusinessTime());
|
jobParam.setRuntimeData(snippet.getSnippetData());
|
// 如果该分片存在分片器,则执行分片器做进一步分发
|
if (StringUtils.isNotBlank(snippet.getDistributeHandler())) {
|
String distributeGroup = UUID.randomUUID().toString();
|
int snippetCount = this.distribute(snippet, jobParam, distributeGroup);
|
// 更新日志状态
|
if (snippet.getWithLog()) {
|
systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode());
|
systemJobLog.setRemark("任务已分发成" + snippetCount + "个分片任务,分发组:" + distributeGroup);
|
}
|
// 更新分片状态
|
snippet.setStatus(Constants.Job.SnippetStatus.SUCCESS.getCode());
|
return;
|
}
|
// 执行任务
|
JobContext jobContext = jobBean.execute(jobParam);
|
log(snippet, "Snippet context: {}", jobContext);
|
// 更新日志状态
|
if (snippet.getWithLog()) {
|
systemJobLog.setHandleTotalSize(jobContext.getHandleTotalSize());
|
systemJobLog.setHandleSuccessSize(jobContext.getHandleSuccessSize());
|
systemJobLog.setContext(jobContext.getContext());
|
systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode());
|
systemJobLog.setRemark("分片执行完成");
|
}
|
// 更新分片状态
|
snippet.setStatus(Constants.Job.SnippetStatus.SUCCESS.getCode());
|
} catch (LockedException e) {
|
log(snippet, "Snippet execute end.");
|
// 更新日志状态
|
if (snippet.getWithLog()) {
|
systemJobLog.setStatus(Constants.Job.LogStatus.IGNORE.getCode());
|
systemJobLog.setRemark("分片正在被其他服务执行或锁定");
|
}
|
} catch (Exception e) {
|
error(snippet, e);
|
// 更新分片状态
|
snippet.setStatus(Constants.Job.SnippetStatus.FAILED.getCode());
|
// 更新日志状态
|
if (snippet.getWithLog()) {
|
systemJobLog.setStatus(Constants.Job.LogStatus.FAILED.getCode());
|
systemJobLog.setRemark("出现异常:" + e.getMessage());
|
}
|
} finally {
|
// 更新日志
|
if (systemJobLog.getId() != null && snippet.getWithLog()) {
|
systemJobLog.setRunTimeEnd(new Date());
|
systemJobLog.setRunSpendTime(Integer.valueOf("" + (systemJobLog.getRunTimeEnd().getTime() - systemJobLog.getRunTimeStart().getTime())));
|
systemJobLogService.updateById(systemJobLog);
|
}
|
// 解锁分片
|
if (locked) {
|
this.unlock(snippet);
|
}
|
}
|
});
|
}
|
|
/**
|
* 锁定分片
|
*
|
* @param snippet 分片
|
*/
|
private void lock (SystemJobSnippet snippet) throws LockedException {
|
log(snippet, "Lock snippet.");
|
SystemJobSnippet newSnippet = new SystemJobSnippet();
|
newSnippet.setStatus(Constants.Job.SnippetStatus.RUNNING.getCode());
|
newSnippet.setLockNumber(snippet.getLockNumber() + 1);
|
int result = systemJobSnippetMapper.update(newSnippet, new LambdaUpdateWrapper<SystemJobSnippet>()
|
.eq(SystemJobSnippet::getId, snippet.getId())
|
.eq(SystemJobSnippet::getLockNumber, snippet.getLockNumber())
|
);
|
if (result == 0) {
|
throw new LockedException("EVA: snippet is locked.");
|
}
|
log(snippet, "Lock snippet completed.");
|
}
|
|
/**
|
* 完成分片处理
|
*
|
* @param snippet 分片
|
*/
|
private void unlock (SystemJobSnippet snippet) {
|
SystemJobSnippet newSnippet = new SystemJobSnippet();
|
newSnippet.setStatus(snippet.getStatus());
|
systemJobSnippetMapper.update(newSnippet, new LambdaUpdateWrapper<SystemJobSnippet>()
|
.eq(SystemJobSnippet::getId, snippet.getId())
|
);
|
log(snippet, "Snippet execute completed.");
|
}
|
|
/**
|
* 分发
|
*
|
* @param snippet 分片实例
|
* @param jobParam JOB执行参数
|
* @param distributeGroup 分发组
|
* @return int
|
*/
|
private int distribute (SystemJobSnippet snippet, JobParam jobParam, String distributeGroup) {
|
log(snippet, "Begin distribute.");
|
BaseDistributer distributer = Utils.SpringContext.getBean(snippet.getDistributeHandler(), BaseDistributer.class);
|
List<List<?>> snippetDatas = distributer.getSnippetData(new BaseDistributer.DistributeDTO(snippet.getDistributeLimit()), jobParam);
|
for (int i = 0; i < snippetDatas.size(); i++) {
|
List snippetData = snippetDatas.get(i);
|
SystemJobSnippet subSnippet = new SystemJobSnippet();
|
subSnippet.setJobId(snippet.getJobId());
|
subSnippet.setJobName(snippet.getJobName());
|
subSnippet.setJobBusinessTime(jobParam.getBusinessTime());
|
subSnippet.setJobDistributeGroup(distributeGroup);
|
subSnippet.setHandler(snippet.getHandler());
|
subSnippet.setSnippetIndex(i);
|
subSnippet.setSnippetCode(UUID.randomUUID().toString());
|
subSnippet.setSnippetData(JSON.toJSONString(snippetData));
|
subSnippet.setSnippetDataSize(snippetData.size());
|
subSnippet.setWithLog(snippet.getWithLog());
|
subSnippet.setAllowServerIps(snippet.getAllowServerIps());
|
subSnippet.setStatus(Constants.Job.JobStatus.READY.getCode());
|
systemJobSnippetMapper.insert(subSnippet);
|
}
|
log(snippet, "distribute completed.");
|
return snippetDatas.size();
|
}
|
|
/**
|
* 验证分片是否可执行
|
*
|
* @param snippet 分片
|
* @return boolean
|
*/
|
private boolean checkSnippet (SystemJobSnippet snippet) {
|
// 服务器验证
|
if (StringUtils.isNotBlank(snippet.getAllowServerIps())) {
|
return snippet.getAllowServerIps().contains(Utils.Server.getIP());
|
}
|
return Boolean.TRUE;
|
}
|
|
/**
|
* 打印日志
|
*
|
* @param snippet 分片
|
* @param text 日志模版内容
|
* @param params 日志参数
|
*/
|
private void log (SystemJobSnippet snippet, String text, Object... params) {
|
log.trace("{}: " + text, String.format("%s-SNIPPET-%s", snippet.getJobName(), snippet.getSnippetCode()), params);
|
}
|
|
/**
|
* 打印错误日志
|
*
|
* @param snippet 分片
|
* @param e 异常对象
|
*/
|
private void error (SystemJobSnippet snippet, Exception e) {
|
log.error(String.format("EVA: %s-SNIPPET-%s", snippet.getJobName(), snippet.getSnippetCode()) + ": Handle snippet throw an exception.", e);
|
}
|
}
|