| package com.doumee.biz.system.impl; | 
|   | 
| import com.alibaba.fastjson.JSON; | 
| import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; | 
| import com.doumee.biz.system.SystemJobSnippetTriggerBiz; | 
| import com.doumee.biz.system.dto.TriggerJobSnippetDTO; | 
| import com.doumee.core.constants.Constants; | 
| import com.doumee.core.exception.LockedException; | 
| import com.doumee.core.job.BaseDistributer; | 
| import com.doumee.core.job.BaseJob; | 
| import com.doumee.core.job.JobContext; | 
| import com.doumee.core.job.JobParam; | 
| import com.doumee.core.utils.Utils; | 
| import com.doumee.dao.system.SystemJobSnippetMapper; | 
| import com.doumee.dao.system.model.SystemJobLog; | 
| import com.doumee.dao.system.model.SystemJobSnippet; | 
| import com.doumee.service.system.SystemJobLogService; | 
| import lombok.extern.slf4j.Slf4j; | 
| import org.apache.commons.lang3.StringUtils; | 
| import org.springframework.beans.factory.annotation.Autowired; | 
| import org.springframework.stereotype.Service; | 
|   | 
| import java.util.Date; | 
| import java.util.List; | 
| import java.util.UUID; | 
|   | 
| @Slf4j | 
| @Service | 
| public class SystemJobSnippetTriggerBizImpl implements SystemJobSnippetTriggerBiz { | 
|   | 
|     @Autowired | 
|     private SystemJobLogService systemJobLogService; | 
|   | 
|     @Autowired | 
|     private SystemJobSnippetMapper systemJobSnippetMapper; | 
|   | 
|     @Override | 
|     public void trigger(TriggerJobSnippetDTO dto) { | 
|         Utils.ThreadPool.start(() -> { | 
|             SystemJobSnippet snippet = systemJobSnippetMapper.selectById(dto.getId()); | 
|             SystemJobLog systemJobLog = new SystemJobLog(); | 
|             // 验证分片 | 
|             if(!this.checkSnippet(snippet)) { | 
|                 return; | 
|             } | 
|             boolean locked = false; | 
|             try { | 
|                 log(snippet, "Snippet started"); | 
|                 // 记录日志 | 
|                 if (snippet.getWithLog()) { | 
|                     systemJobLog.setRunTimeStart(new Date()); | 
|                     systemJobLog.setJobId(snippet.getJobId()); | 
|                     systemJobLog.setJobName(snippet.getJobName()); | 
|                     systemJobLog.setJobDistributeGroup(snippet.getJobDistributeGroup()); | 
|                     systemJobLog.setSnippetId(snippet.getId()); | 
|                     systemJobLog.setSnippetCode(snippet.getSnippetCode()); | 
|                     systemJobLog.setBusinessTime(snippet.getJobBusinessTime()); | 
|                     systemJobLog.setRemark("分片正在执行中"); | 
|                     systemJobLog.setServerIp(Utils.Server.getIP()); | 
|                     systemJobLog.setStatus(Constants.Job.LogStatus.NONE.getCode()); | 
|                     systemJobLog.setTriggerType(dto.getTriggerType()); | 
|                     systemJobLogService.create(systemJobLog); | 
|                 } | 
|                 // 锁定当前分片,防止其他服务器重复执行 | 
|                 this.lock(snippet); | 
|                 locked = true; | 
|                 BaseJob jobBean = Utils.SpringContext.getBean(snippet.getHandler(), BaseJob.class); | 
|                 JobParam jobParam = new JobParam(); | 
|                 jobParam.setId(snippet.getId()); | 
|                 jobParam.setSnippetIndex(snippet.getSnippetIndex()); | 
|                 jobParam.setBusinessTime(snippet.getJobBusinessTime()); | 
|                 jobParam.setRuntimeData(snippet.getSnippetData()); | 
|                 // 如果该分片存在分片器,则执行分片器做进一步分发 | 
|                 if (StringUtils.isNotBlank(snippet.getDistributeHandler())) { | 
|                     String distributeGroup = UUID.randomUUID().toString(); | 
|                     int snippetCount = this.distribute(snippet, jobParam, distributeGroup); | 
|                     // 更新日志状态 | 
|                     if (snippet.getWithLog()) { | 
|                         systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode()); | 
|                         systemJobLog.setRemark("任务已分发成" + snippetCount + "个分片任务,分发组:" + distributeGroup); | 
|                     } | 
|                     // 更新分片状态 | 
|                     snippet.setStatus(Constants.Job.SnippetStatus.SUCCESS.getCode()); | 
|                     return; | 
|                 } | 
|                 // 执行任务 | 
|                 JobContext jobContext = jobBean.execute(jobParam); | 
|                 log(snippet, "Snippet context: {}", jobContext); | 
|                 // 更新日志状态 | 
|                 if (snippet.getWithLog()) { | 
|                     systemJobLog.setHandleTotalSize(jobContext.getHandleTotalSize()); | 
|                     systemJobLog.setHandleSuccessSize(jobContext.getHandleSuccessSize()); | 
|                     systemJobLog.setContext(jobContext.getContext()); | 
|                     systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode()); | 
|                     systemJobLog.setRemark("分片执行完成"); | 
|                 } | 
|                 // 更新分片状态 | 
|                 snippet.setStatus(Constants.Job.SnippetStatus.SUCCESS.getCode()); | 
|             } catch (LockedException e) { | 
|                 log(snippet, "Snippet execute end."); | 
|                 // 更新日志状态 | 
|                 if (snippet.getWithLog()) { | 
|                     systemJobLog.setStatus(Constants.Job.LogStatus.IGNORE.getCode()); | 
|                     systemJobLog.setRemark("分片正在被其他服务执行或锁定"); | 
|                 } | 
|             } catch (Exception e) { | 
|                 error(snippet, e); | 
|                 // 更新分片状态 | 
|                 snippet.setStatus(Constants.Job.SnippetStatus.FAILED.getCode()); | 
|                 // 更新日志状态 | 
|                 if (snippet.getWithLog()) { | 
|                     systemJobLog.setStatus(Constants.Job.LogStatus.FAILED.getCode()); | 
|                     systemJobLog.setRemark("出现异常:" + e.getMessage()); | 
|                 } | 
|             } finally { | 
|                 // 更新日志 | 
|                 if (systemJobLog.getId() != null && snippet.getWithLog()) { | 
|                     systemJobLog.setRunTimeEnd(new Date()); | 
|                     systemJobLog.setRunSpendTime(Integer.valueOf("" + (systemJobLog.getRunTimeEnd().getTime() - systemJobLog.getRunTimeStart().getTime()))); | 
|                     systemJobLogService.updateById(systemJobLog); | 
|                 } | 
|                 // 解锁分片 | 
|                 if (locked) { | 
|                     this.unlock(snippet); | 
|                 } | 
|             } | 
|         }); | 
|     } | 
|   | 
|     /** | 
|      * 锁定分片 | 
|      * | 
|      * @param snippet 分片 | 
|      */ | 
|     private void lock (SystemJobSnippet snippet) throws LockedException { | 
|         log(snippet, "Lock snippet."); | 
|         SystemJobSnippet newSnippet = new SystemJobSnippet(); | 
|         newSnippet.setStatus(Constants.Job.SnippetStatus.RUNNING.getCode()); | 
|         newSnippet.setLockNumber(snippet.getLockNumber() + 1); | 
|         int result = systemJobSnippetMapper.update(newSnippet, new LambdaUpdateWrapper<SystemJobSnippet>() | 
|                 .eq(SystemJobSnippet::getId, snippet.getId()) | 
|                 .eq(SystemJobSnippet::getLockNumber, snippet.getLockNumber()) | 
|         ); | 
|         if (result == 0) { | 
|             throw new LockedException("EVA: snippet is locked."); | 
|         } | 
|         log(snippet, "Lock snippet completed."); | 
|     } | 
|   | 
|     /** | 
|      * 完成分片处理 | 
|      * | 
|      * @param snippet 分片 | 
|      */ | 
|     private void unlock (SystemJobSnippet snippet) { | 
|         SystemJobSnippet newSnippet = new SystemJobSnippet(); | 
|         newSnippet.setStatus(snippet.getStatus()); | 
|         systemJobSnippetMapper.update(newSnippet, new LambdaUpdateWrapper<SystemJobSnippet>() | 
|                 .eq(SystemJobSnippet::getId, snippet.getId()) | 
|         ); | 
|         log(snippet, "Snippet execute completed."); | 
|     } | 
|   | 
|     /** | 
|      * 分发 | 
|      * | 
|      * @param snippet 分片实例 | 
|      * @param jobParam JOB执行参数 | 
|      * @param distributeGroup 分发组 | 
|      * @return int | 
|      */ | 
|     private int distribute (SystemJobSnippet snippet, JobParam jobParam, String distributeGroup) { | 
|         log(snippet, "Begin distribute."); | 
|         BaseDistributer distributer = Utils.SpringContext.getBean(snippet.getDistributeHandler(), BaseDistributer.class); | 
|         List<List<?>> snippetDatas = distributer.getSnippetData(new BaseDistributer.DistributeDTO(snippet.getDistributeLimit()), jobParam); | 
|         for (int i = 0; i < snippetDatas.size(); i++) { | 
|             List snippetData = snippetDatas.get(i); | 
|             SystemJobSnippet subSnippet = new SystemJobSnippet(); | 
|             subSnippet.setJobId(snippet.getJobId()); | 
|             subSnippet.setJobName(snippet.getJobName()); | 
|             subSnippet.setJobBusinessTime(jobParam.getBusinessTime()); | 
|             subSnippet.setJobDistributeGroup(distributeGroup); | 
|             subSnippet.setHandler(snippet.getHandler()); | 
|             subSnippet.setSnippetIndex(i); | 
|             subSnippet.setSnippetCode(UUID.randomUUID().toString()); | 
|             subSnippet.setSnippetData(JSON.toJSONString(snippetData)); | 
|             subSnippet.setSnippetDataSize(snippetData.size()); | 
|             subSnippet.setWithLog(snippet.getWithLog()); | 
|             subSnippet.setAllowServerIps(snippet.getAllowServerIps()); | 
|             subSnippet.setStatus(Constants.Job.JobStatus.READY.getCode()); | 
|             systemJobSnippetMapper.insert(subSnippet); | 
|         } | 
|         log(snippet, "distribute completed."); | 
|         return snippetDatas.size(); | 
|     } | 
|   | 
|     /** | 
|      * 验证分片是否可执行 | 
|      * | 
|      * @param snippet 分片 | 
|      * @return boolean | 
|      */ | 
|     private boolean checkSnippet (SystemJobSnippet snippet) { | 
|         // 服务器验证 | 
|         if (StringUtils.isNotBlank(snippet.getAllowServerIps())) { | 
|             return snippet.getAllowServerIps().contains(Utils.Server.getIP()); | 
|         } | 
|         return Boolean.TRUE; | 
|     } | 
|   | 
|     /** | 
|      * 打印日志 | 
|      * | 
|      * @param snippet 分片 | 
|      * @param text 日志模版内容 | 
|      * @param params 日志参数 | 
|      */ | 
|     private void log (SystemJobSnippet snippet, String text, Object... params) { | 
|         log.trace("{}: " + text, String.format("%s-SNIPPET-%s", snippet.getJobName(), snippet.getSnippetCode()), params); | 
|     } | 
|   | 
|     /** | 
|      * 打印错误日志 | 
|      * | 
|      * @param snippet 分片 | 
|      * @param e 异常对象 | 
|      */ | 
|     private void error (SystemJobSnippet snippet, Exception e) { | 
|         log.error(String.format("EVA: %s-SNIPPET-%s", snippet.getJobName(), snippet.getSnippetCode()) + ": Handle snippet throw an exception.", e); | 
|     } | 
| } |