package com.doumee.biz.system.impl; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.doumee.biz.system.SystemJobSnippetTriggerBiz; import com.doumee.biz.system.dto.TriggerJobSnippetDTO; import com.doumee.core.constants.Constants; import com.doumee.core.exception.LockedException; import com.doumee.core.job.BaseDistributer; import com.doumee.core.job.BaseJob; import com.doumee.core.job.JobContext; import com.doumee.core.job.JobParam; import com.doumee.core.utils.Utils; import com.doumee.dao.system.SystemJobSnippetMapper; import com.doumee.dao.system.model.SystemJobLog; import com.doumee.dao.system.model.SystemJobSnippet; import com.doumee.service.system.SystemJobLogService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Date; import java.util.List; import java.util.UUID; @Slf4j @Service public class SystemJobSnippetTriggerBizImpl implements SystemJobSnippetTriggerBiz { @Autowired private SystemJobLogService systemJobLogService; @Autowired private SystemJobSnippetMapper systemJobSnippetMapper; @Override public void trigger(TriggerJobSnippetDTO dto) { Utils.ThreadPool.start(() -> { SystemJobSnippet snippet = systemJobSnippetMapper.selectById(dto.getId()); SystemJobLog systemJobLog = new SystemJobLog(); // 验证分片 if(!this.checkSnippet(snippet)) { return; } boolean locked = false; try { log(snippet, "Snippet started"); // 记录日志 if (snippet.getWithLog()) { systemJobLog.setRunTimeStart(new Date()); systemJobLog.setJobId(snippet.getJobId()); systemJobLog.setJobName(snippet.getJobName()); systemJobLog.setJobDistributeGroup(snippet.getJobDistributeGroup()); systemJobLog.setSnippetId(snippet.getId()); systemJobLog.setSnippetCode(snippet.getSnippetCode()); systemJobLog.setBusinessTime(snippet.getJobBusinessTime()); systemJobLog.setRemark("分片正在执行中"); systemJobLog.setServerIp(Utils.Server.getIP()); systemJobLog.setStatus(Constants.Job.LogStatus.NONE.getCode()); systemJobLog.setTriggerType(dto.getTriggerType()); systemJobLogService.create(systemJobLog); } // 锁定当前分片,防止其他服务器重复执行 this.lock(snippet); locked = true; BaseJob jobBean = Utils.SpringContext.getBean(snippet.getHandler(), BaseJob.class); JobParam jobParam = new JobParam(); jobParam.setId(snippet.getId()); jobParam.setSnippetIndex(snippet.getSnippetIndex()); jobParam.setBusinessTime(snippet.getJobBusinessTime()); jobParam.setRuntimeData(snippet.getSnippetData()); // 如果该分片存在分片器,则执行分片器做进一步分发 if (StringUtils.isNotBlank(snippet.getDistributeHandler())) { String distributeGroup = UUID.randomUUID().toString(); int snippetCount = this.distribute(snippet, jobParam, distributeGroup); // 更新日志状态 if (snippet.getWithLog()) { systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode()); systemJobLog.setRemark("任务已分发成" + snippetCount + "个分片任务,分发组:" + distributeGroup); } // 更新分片状态 snippet.setStatus(Constants.Job.SnippetStatus.SUCCESS.getCode()); return; } // 执行任务 JobContext jobContext = jobBean.execute(jobParam); log(snippet, "Snippet context: {}", jobContext); // 更新日志状态 if (snippet.getWithLog()) { systemJobLog.setHandleTotalSize(jobContext.getHandleTotalSize()); systemJobLog.setHandleSuccessSize(jobContext.getHandleSuccessSize()); systemJobLog.setContext(jobContext.getContext()); systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode()); systemJobLog.setRemark("分片执行完成"); } // 更新分片状态 snippet.setStatus(Constants.Job.SnippetStatus.SUCCESS.getCode()); } catch (LockedException e) { log(snippet, "Snippet execute end."); // 更新日志状态 if (snippet.getWithLog()) { systemJobLog.setStatus(Constants.Job.LogStatus.IGNORE.getCode()); systemJobLog.setRemark("分片正在被其他服务执行或锁定"); } } catch (Exception e) { error(snippet, e); // 更新分片状态 snippet.setStatus(Constants.Job.SnippetStatus.FAILED.getCode()); // 更新日志状态 if (snippet.getWithLog()) { systemJobLog.setStatus(Constants.Job.LogStatus.FAILED.getCode()); systemJobLog.setRemark("出现异常:" + e.getMessage()); } } finally { // 更新日志 if (systemJobLog.getId() != null && snippet.getWithLog()) { systemJobLog.setRunTimeEnd(new Date()); systemJobLog.setRunSpendTime(Integer.valueOf("" + (systemJobLog.getRunTimeEnd().getTime() - systemJobLog.getRunTimeStart().getTime()))); systemJobLogService.updateById(systemJobLog); } // 解锁分片 if (locked) { this.unlock(snippet); } } }); } /** * 锁定分片 * * @param snippet 分片 */ private void lock (SystemJobSnippet snippet) throws LockedException { log(snippet, "Lock snippet."); SystemJobSnippet newSnippet = new SystemJobSnippet(); newSnippet.setStatus(Constants.Job.SnippetStatus.RUNNING.getCode()); newSnippet.setLockNumber(snippet.getLockNumber() + 1); int result = systemJobSnippetMapper.update(newSnippet, new LambdaUpdateWrapper() .eq(SystemJobSnippet::getId, snippet.getId()) .eq(SystemJobSnippet::getLockNumber, snippet.getLockNumber()) ); if (result == 0) { throw new LockedException("EVA: snippet is locked."); } log(snippet, "Lock snippet completed."); } /** * 完成分片处理 * * @param snippet 分片 */ private void unlock (SystemJobSnippet snippet) { SystemJobSnippet newSnippet = new SystemJobSnippet(); newSnippet.setStatus(snippet.getStatus()); systemJobSnippetMapper.update(newSnippet, new LambdaUpdateWrapper() .eq(SystemJobSnippet::getId, snippet.getId()) ); log(snippet, "Snippet execute completed."); } /** * 分发 * * @param snippet 分片实例 * @param jobParam JOB执行参数 * @param distributeGroup 分发组 * @return int */ private int distribute (SystemJobSnippet snippet, JobParam jobParam, String distributeGroup) { log(snippet, "Begin distribute."); BaseDistributer distributer = Utils.SpringContext.getBean(snippet.getDistributeHandler(), BaseDistributer.class); List> snippetDatas = distributer.getSnippetData(new BaseDistributer.DistributeDTO(snippet.getDistributeLimit()), jobParam); for (int i = 0; i < snippetDatas.size(); i++) { List snippetData = snippetDatas.get(i); SystemJobSnippet subSnippet = new SystemJobSnippet(); subSnippet.setJobId(snippet.getJobId()); subSnippet.setJobName(snippet.getJobName()); subSnippet.setJobBusinessTime(jobParam.getBusinessTime()); subSnippet.setJobDistributeGroup(distributeGroup); subSnippet.setHandler(snippet.getHandler()); subSnippet.setSnippetIndex(i); subSnippet.setSnippetCode(UUID.randomUUID().toString()); subSnippet.setSnippetData(JSON.toJSONString(snippetData)); subSnippet.setSnippetDataSize(snippetData.size()); subSnippet.setWithLog(snippet.getWithLog()); subSnippet.setAllowServerIps(snippet.getAllowServerIps()); subSnippet.setStatus(Constants.Job.JobStatus.READY.getCode()); systemJobSnippetMapper.insert(subSnippet); } log(snippet, "distribute completed."); return snippetDatas.size(); } /** * 验证分片是否可执行 * * @param snippet 分片 * @return boolean */ private boolean checkSnippet (SystemJobSnippet snippet) { // 服务器验证 if (StringUtils.isNotBlank(snippet.getAllowServerIps())) { return snippet.getAllowServerIps().contains(Utils.Server.getIP()); } return Boolean.TRUE; } /** * 打印日志 * * @param snippet 分片 * @param text 日志模版内容 * @param params 日志参数 */ private void log (SystemJobSnippet snippet, String text, Object... params) { log.trace("{}: " + text, String.format("%s-SNIPPET-%s", snippet.getJobName(), snippet.getSnippetCode()), params); } /** * 打印错误日志 * * @param snippet 分片 * @param e 异常对象 */ private void error (SystemJobSnippet snippet, Exception e) { log.error(String.format("EVA: %s-SNIPPET-%s", snippet.getJobName(), snippet.getSnippetCode()) + ": Handle snippet throw an exception.", e); } }