package com.doumee.biz.system.impl; 
 | 
  
 | 
import com.alibaba.fastjson.JSON; 
 | 
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; 
 | 
import com.doumee.biz.system.SystemJobSnippetTriggerBiz; 
 | 
import com.doumee.biz.system.dto.TriggerJobSnippetDTO; 
 | 
import com.doumee.core.constants.Constants; 
 | 
import com.doumee.core.exception.LockedException; 
 | 
import com.doumee.core.job.BaseDistributer; 
 | 
import com.doumee.core.job.BaseJob; 
 | 
import com.doumee.core.job.JobContext; 
 | 
import com.doumee.core.job.JobParam; 
 | 
import com.doumee.core.utils.Utils; 
 | 
import com.doumee.dao.system.SystemJobSnippetMapper; 
 | 
import com.doumee.dao.system.model.SystemJobLog; 
 | 
import com.doumee.dao.system.model.SystemJobSnippet; 
 | 
import com.doumee.service.system.SystemJobLogService; 
 | 
import lombok.extern.slf4j.Slf4j; 
 | 
import org.apache.commons.lang3.StringUtils; 
 | 
import org.springframework.beans.factory.annotation.Autowired; 
 | 
import org.springframework.stereotype.Service; 
 | 
  
 | 
import java.util.Date; 
 | 
import java.util.List; 
 | 
import java.util.UUID; 
 | 
  
 | 
@Slf4j 
 | 
@Service 
 | 
public class SystemJobSnippetTriggerBizImpl implements SystemJobSnippetTriggerBiz { 
 | 
  
 | 
    @Autowired 
 | 
    private SystemJobLogService systemJobLogService; 
 | 
  
 | 
    @Autowired 
 | 
    private SystemJobSnippetMapper systemJobSnippetMapper; 
 | 
  
 | 
    @Override 
 | 
    public void trigger(TriggerJobSnippetDTO dto) { 
 | 
        Utils.ThreadPool.start(() -> { 
 | 
            SystemJobSnippet snippet = systemJobSnippetMapper.selectById(dto.getId()); 
 | 
            SystemJobLog systemJobLog = new SystemJobLog(); 
 | 
            // 验证分片 
 | 
            if(!this.checkSnippet(snippet)) { 
 | 
                return; 
 | 
            } 
 | 
            boolean locked = false; 
 | 
            try { 
 | 
                log(snippet, "Snippet started"); 
 | 
                // 记录日志 
 | 
                if (snippet.getWithLog()) { 
 | 
                    systemJobLog.setRunTimeStart(new Date()); 
 | 
                    systemJobLog.setJobId(snippet.getJobId()); 
 | 
                    systemJobLog.setJobName(snippet.getJobName()); 
 | 
                    systemJobLog.setJobDistributeGroup(snippet.getJobDistributeGroup()); 
 | 
                    systemJobLog.setSnippetId(snippet.getId()); 
 | 
                    systemJobLog.setSnippetCode(snippet.getSnippetCode()); 
 | 
                    systemJobLog.setBusinessTime(snippet.getJobBusinessTime()); 
 | 
                    systemJobLog.setRemark("分片正在执行中"); 
 | 
                    systemJobLog.setServerIp(Utils.Server.getIP()); 
 | 
                    systemJobLog.setStatus(Constants.Job.LogStatus.NONE.getCode()); 
 | 
                    systemJobLog.setTriggerType(dto.getTriggerType()); 
 | 
                    systemJobLogService.create(systemJobLog); 
 | 
                } 
 | 
                // 锁定当前分片,防止其他服务器重复执行 
 | 
                this.lock(snippet); 
 | 
                locked = true; 
 | 
                BaseJob jobBean = Utils.SpringContext.getBean(snippet.getHandler(), BaseJob.class); 
 | 
                JobParam jobParam = new JobParam(); 
 | 
                jobParam.setId(snippet.getId()); 
 | 
                jobParam.setSnippetIndex(snippet.getSnippetIndex()); 
 | 
                jobParam.setBusinessTime(snippet.getJobBusinessTime()); 
 | 
                jobParam.setRuntimeData(snippet.getSnippetData()); 
 | 
                // 如果该分片存在分片器,则执行分片器做进一步分发 
 | 
                if (StringUtils.isNotBlank(snippet.getDistributeHandler())) { 
 | 
                    String distributeGroup = UUID.randomUUID().toString(); 
 | 
                    int snippetCount = this.distribute(snippet, jobParam, distributeGroup); 
 | 
                    // 更新日志状态 
 | 
                    if (snippet.getWithLog()) { 
 | 
                        systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode()); 
 | 
                        systemJobLog.setRemark("任务已分发成" + snippetCount + "个分片任务,分发组:" + distributeGroup); 
 | 
                    } 
 | 
                    // 更新分片状态 
 | 
                    snippet.setStatus(Constants.Job.SnippetStatus.SUCCESS.getCode()); 
 | 
                    return; 
 | 
                } 
 | 
                // 执行任务 
 | 
                JobContext jobContext = jobBean.execute(jobParam); 
 | 
                log(snippet, "Snippet context: {}", jobContext); 
 | 
                // 更新日志状态 
 | 
                if (snippet.getWithLog()) { 
 | 
                    systemJobLog.setHandleTotalSize(jobContext.getHandleTotalSize()); 
 | 
                    systemJobLog.setHandleSuccessSize(jobContext.getHandleSuccessSize()); 
 | 
                    systemJobLog.setContext(jobContext.getContext()); 
 | 
                    systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode()); 
 | 
                    systemJobLog.setRemark("分片执行完成"); 
 | 
                } 
 | 
                // 更新分片状态 
 | 
                snippet.setStatus(Constants.Job.SnippetStatus.SUCCESS.getCode()); 
 | 
            } catch (LockedException e) { 
 | 
                log(snippet, "Snippet execute end."); 
 | 
                // 更新日志状态 
 | 
                if (snippet.getWithLog()) { 
 | 
                    systemJobLog.setStatus(Constants.Job.LogStatus.IGNORE.getCode()); 
 | 
                    systemJobLog.setRemark("分片正在被其他服务执行或锁定"); 
 | 
                } 
 | 
            } catch (Exception e) { 
 | 
                error(snippet, e); 
 | 
                // 更新分片状态 
 | 
                snippet.setStatus(Constants.Job.SnippetStatus.FAILED.getCode()); 
 | 
                // 更新日志状态 
 | 
                if (snippet.getWithLog()) { 
 | 
                    systemJobLog.setStatus(Constants.Job.LogStatus.FAILED.getCode()); 
 | 
                    systemJobLog.setRemark("出现异常:" + e.getMessage()); 
 | 
                } 
 | 
            } finally { 
 | 
                // 更新日志 
 | 
                if (systemJobLog.getId() != null && snippet.getWithLog()) { 
 | 
                    systemJobLog.setRunTimeEnd(new Date()); 
 | 
                    systemJobLog.setRunSpendTime(Integer.valueOf("" + (systemJobLog.getRunTimeEnd().getTime() - systemJobLog.getRunTimeStart().getTime()))); 
 | 
                    systemJobLogService.updateById(systemJobLog); 
 | 
                } 
 | 
                // 解锁分片 
 | 
                if (locked) { 
 | 
                    this.unlock(snippet); 
 | 
                } 
 | 
            } 
 | 
        }); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 锁定分片 
 | 
     * 
 | 
     * @param snippet 分片 
 | 
     */ 
 | 
    private void lock (SystemJobSnippet snippet) throws LockedException { 
 | 
        log(snippet, "Lock snippet."); 
 | 
        SystemJobSnippet newSnippet = new SystemJobSnippet(); 
 | 
        newSnippet.setStatus(Constants.Job.SnippetStatus.RUNNING.getCode()); 
 | 
        newSnippet.setLockNumber(snippet.getLockNumber() + 1); 
 | 
        int result = systemJobSnippetMapper.update(newSnippet, new LambdaUpdateWrapper<SystemJobSnippet>() 
 | 
                .eq(SystemJobSnippet::getId, snippet.getId()) 
 | 
                .eq(SystemJobSnippet::getLockNumber, snippet.getLockNumber()) 
 | 
        ); 
 | 
        if (result == 0) { 
 | 
            throw new LockedException("EVA: snippet is locked."); 
 | 
        } 
 | 
        log(snippet, "Lock snippet completed."); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 完成分片处理 
 | 
     * 
 | 
     * @param snippet 分片 
 | 
     */ 
 | 
    private void unlock (SystemJobSnippet snippet) { 
 | 
        SystemJobSnippet newSnippet = new SystemJobSnippet(); 
 | 
        newSnippet.setStatus(snippet.getStatus()); 
 | 
        systemJobSnippetMapper.update(newSnippet, new LambdaUpdateWrapper<SystemJobSnippet>() 
 | 
                .eq(SystemJobSnippet::getId, snippet.getId()) 
 | 
        ); 
 | 
        log(snippet, "Snippet execute completed."); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 分发 
 | 
     * 
 | 
     * @param snippet 分片实例 
 | 
     * @param jobParam JOB执行参数 
 | 
     * @param distributeGroup 分发组 
 | 
     * @return int 
 | 
     */ 
 | 
    private int distribute (SystemJobSnippet snippet, JobParam jobParam, String distributeGroup) { 
 | 
        log(snippet, "Begin distribute."); 
 | 
        BaseDistributer distributer = Utils.SpringContext.getBean(snippet.getDistributeHandler(), BaseDistributer.class); 
 | 
        List<List<?>> snippetDatas = distributer.getSnippetData(new BaseDistributer.DistributeDTO(snippet.getDistributeLimit()), jobParam); 
 | 
        for (int i = 0; i < snippetDatas.size(); i++) { 
 | 
            List snippetData = snippetDatas.get(i); 
 | 
            SystemJobSnippet subSnippet = new SystemJobSnippet(); 
 | 
            subSnippet.setJobId(snippet.getJobId()); 
 | 
            subSnippet.setJobName(snippet.getJobName()); 
 | 
            subSnippet.setJobBusinessTime(jobParam.getBusinessTime()); 
 | 
            subSnippet.setJobDistributeGroup(distributeGroup); 
 | 
            subSnippet.setHandler(snippet.getHandler()); 
 | 
            subSnippet.setSnippetIndex(i); 
 | 
            subSnippet.setSnippetCode(UUID.randomUUID().toString()); 
 | 
            subSnippet.setSnippetData(JSON.toJSONString(snippetData)); 
 | 
            subSnippet.setSnippetDataSize(snippetData.size()); 
 | 
            subSnippet.setWithLog(snippet.getWithLog()); 
 | 
            subSnippet.setAllowServerIps(snippet.getAllowServerIps()); 
 | 
            subSnippet.setStatus(Constants.Job.JobStatus.READY.getCode()); 
 | 
            systemJobSnippetMapper.insert(subSnippet); 
 | 
        } 
 | 
        log(snippet, "distribute completed."); 
 | 
        return snippetDatas.size(); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 验证分片是否可执行 
 | 
     * 
 | 
     * @param snippet 分片 
 | 
     * @return boolean 
 | 
     */ 
 | 
    private boolean checkSnippet (SystemJobSnippet snippet) { 
 | 
        // 服务器验证 
 | 
        if (StringUtils.isNotBlank(snippet.getAllowServerIps())) { 
 | 
            return snippet.getAllowServerIps().contains(Utils.Server.getIP()); 
 | 
        } 
 | 
        return Boolean.TRUE; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 打印日志 
 | 
     * 
 | 
     * @param snippet 分片 
 | 
     * @param text 日志模版内容 
 | 
     * @param params 日志参数 
 | 
     */ 
 | 
    private void log (SystemJobSnippet snippet, String text, Object... params) { 
 | 
        log.trace("{}: " + text, String.format("%s-SNIPPET-%s", snippet.getJobName(), snippet.getSnippetCode()), params); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 打印错误日志 
 | 
     * 
 | 
     * @param snippet 分片 
 | 
     * @param e 异常对象 
 | 
     */ 
 | 
    private void error (SystemJobSnippet snippet, Exception e) { 
 | 
        log.error(String.format("EVA: %s-SNIPPET-%s", snippet.getJobName(), snippet.getSnippetCode()) + ": Handle snippet throw an exception.", e); 
 | 
    } 
 | 
} 
 |