doum
2025-09-23 bcadfb6de692c138774688efd7928a973d59860f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
package com.doumee.biz.system.impl;
 
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.doumee.biz.system.SystemJobTriggerBiz;
import com.doumee.biz.system.dto.TriggerJobDTO;
import com.doumee.core.constants.Constants;
import com.doumee.core.exception.LockedException;
import com.doumee.core.job.BaseDistributer;
import com.doumee.core.job.BaseJob;
import com.doumee.core.job.JobContext;
import com.doumee.core.job.JobParam;
import com.doumee.core.utils.Utils;
import com.doumee.dao.system.SystemJobLogMapper;
import com.doumee.dao.system.SystemJobMapper;
import com.doumee.dao.system.SystemJobSnippetMapper;
import com.doumee.dao.system.model.SystemJob;
import com.doumee.dao.system.model.SystemJobLog;
import com.doumee.dao.system.model.SystemJobSnippet;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import java.util.Date;
import java.util.List;
import java.util.UUID;
 
@Slf4j
@Service
public class SystemJobTriggerBizImpl implements SystemJobTriggerBiz {
 
    @Autowired
    private SystemJobLogMapper systemJobLogMapper;
 
    @Autowired
    private SystemJobMapper systemJobMapper;
 
    @Autowired
    private SystemJobSnippetMapper systemJobSnippetMapper;
 
    private ThreadLocal<String> jobUnique = new ThreadLocal<>();
 
    @Override
    public void trigger(TriggerJobDTO dto) {
        SystemJob job = null;
        JobParam jobParam = new JobParam();
        boolean locked = false;
        SystemJobLog systemJobLog = new SystemJobLog();
        try {
            Integer jobId = dto.getId();
            job = systemJobMapper.selectById(jobId);
            // 验证JOB
            if (!this.checkJob(job, dto.getBusinessTime())) {
                return;
            }
            // 初始化JOB数据
            jobUnique.set(RandomStringUtils.randomAlphabetic(8));
            log(job, "Job started");
            log(job, "Job thread name: {}", Thread.currentThread().getName());
            log(job, "Job scheduled fire time: {}", Utils.Date.format(dto.getScheduledFireTime()));
            log(job, "Job fire time: {}", Utils.Date.format(dto.getFireTime()));
            log(job, "Job business time: {}", Utils.Date.format(dto.getBusinessTime()));
            jobParam.setId(jobId);
            jobParam.setTriggerType(dto.getTriggerType());
            jobParam.setBusinessTime(dto.getBusinessTime());
            // 记录日志
            if (job.getWithLog()) {
                systemJobLog.setJobId(jobId);
                systemJobLog.setJobName(job.getJobName());
                systemJobLog.setRunTimeStart(new Date());
                systemJobLog.setServerIp(Utils.Server.getIP());
                systemJobLog.setBusinessTime(dto.getBusinessTime());
                systemJobLog.setStatus(Constants.Job.LogStatus.NONE.getCode());
                systemJobLog.setTriggerType(dto.getTriggerType());
                systemJobLog.setRemark("任务正在执行中");
                systemJobLogMapper.insert(systemJobLog);
            }
            // 任务加锁,防止多服务器重复执行
            this.lock(job);
            locked = true;
            // 如果开启了异步执行,则创建一个分片
            if (job.getWithAsync()) {
                String snippetCode = this.createAsyncSnippet(job, jobParam);
                systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode());
                systemJobLog.setRemark("异步任务已转为分片任务,分片编码:" + snippetCode);
                return;
            }
            // 如果存在分片
            if (StringUtils.isNotBlank(job.getDistributeHandler())) {
                String distributeGroup = UUID.randomUUID().toString();
                int snippetCount = this.distribute(job, jobParam, distributeGroup);
                systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode());
                systemJobLog.setRemark("任务已分发成" + snippetCount + "个分片任务,分发组:" + distributeGroup);
                return;
            }
            // 不存在分片,直接执行任务(根据任务处理器名称获取任务处理器实例并执行)
            BaseJob jobBean = Utils.SpringContext.getBean(job.getHandler(), BaseJob.class);
            JobContext jobContext = jobBean.execute(jobParam);
            // 解锁任务
            this.unlock(job, dto.getNextFireTime());
            locked = false;
            // 修改日志信息
            if (job.getWithLog()) {
                systemJobLog.setRemark("任务执行完成");
                systemJobLog.setHandleTotalSize(jobContext.getHandleTotalSize());
                systemJobLog.setHandleSuccessSize(jobContext.getHandleSuccessSize());
                systemJobLog.setContext(jobContext.getContext());
                systemJobLog.setStatus(Constants.Job.LogStatus.SUCCESS.getCode());
            }
        } catch (LockedException e) {
            log(job, e.getMessage());
            // 修改日志状态
            if (job.getWithLog()) {
                systemJobLog.setStatus(Constants.Job.LogStatus.IGNORE.getCode());
                systemJobLog.setRemark("任务正在被其他服务执行或锁定");
            }
        } catch (Exception e) {
            if (job != null) {
                error(job, e);
                // 修改日志状态
                if (job.getWithLog()) {
                    systemJobLog.setStatus(Constants.Job.LogStatus.FAILED.getCode());
                    systemJobLog.setRemark("任务执行失败,出现异常:" + e.getMessage());
                }
            } else {
                e.printStackTrace();
            }
        } finally {
            // 更新日志
            if (job != null && job.getWithLog() && systemJobLog.getId() != null) {
                systemJobLog.setRunTimeEnd(new Date());
                systemJobLog.setRunSpendTime(Integer.valueOf("" + (systemJobLog.getRunTimeEnd().getTime() - systemJobLog.getRunTimeStart().getTime())));
                systemJobLogMapper.updateById(systemJobLog);
            }
            // 任务解锁
            if (locked) {
                this.unlock(job, dto.getNextFireTime());
                log(job, "Job execute completed.");
            }
        }
    }
 
    /**
     * 锁定JOB
     *
     * @param job JOB
     */
    private void lock (SystemJob job) throws LockedException{
        log(job, "Lock job.");
        if (Constants.equalsInteger(job.getStatus(),Constants.Job.JobStatus.RUNNING.getCode())) {
            throw new LockedException("EVA: job is running.");
        }
        SystemJob newJob = new SystemJob();
        newJob.setStatus(Constants.Job.JobStatus.RUNNING.getCode());
        newJob.setLockNumber(job.getLockNumber() + 1);
        // 不修改更新人和更新时间
        newJob.setUpdateUser(Constants.Ignore.IGNORE_USER);
        newJob.setUpdateTime(Constants.Ignore.IGNORE_TIME);
        int result = systemJobMapper.update(newJob, new UpdateWrapper<SystemJob>()
                .lambda()
                .eq(SystemJob::getId, job.getId())
                .eq(SystemJob::getLockNumber, job.getLockNumber())
        );
        if (result == 0) {
            throw new LockedException("EVA: job is locked.");
        }
        log(job, "Lock job completed.");
    }
 
    /**
     * 解锁JOB
     *
     * @param job JOB
     */
    private void unlock(SystemJob job, Date nextFireTime) {
        log(job, "Unlock job.");
        SystemJob newJob = new SystemJob();
        newJob.setId(job.getId());
        newJob.setStatus(Constants.Job.JobStatus.READY.getCode());
        newJob.setNextFireTime(nextFireTime);
        // 不修改更新人和更新时间
        newJob.setUpdateUser(Constants.Ignore.IGNORE_USER);
        newJob.setUpdateTime(Constants.Ignore.IGNORE_TIME);
        systemJobMapper.updateById(newJob);
        log(job, "Unlock job completed.");
    }
 
    /**
     * 创建异步分片
     *
     * @param job JOB
     * @param jobParam JOB参数
     */
    private String createAsyncSnippet (SystemJob job, JobParam jobParam) {
        log(job, "Begin create async snippet.");
        String distributeGroup = UUID.randomUUID().toString();
        SystemJobSnippet snippet = new SystemJobSnippet();
        snippet.setJobId(job.getId());
        snippet.setJobName(job.getJobName());
        snippet.setJobBusinessTime(jobParam.getBusinessTime());
        snippet.setJobDistributeGroup(distributeGroup);
        snippet.setDistributeHandler(job.getDistributeHandler());
        snippet.setDistributeLimit(job.getDistributeLimit());
        snippet.setHandler(job.getHandler());
        snippet.setSnippetIndex(0);
        snippet.setSnippetCode(UUID.randomUUID().toString());
        snippet.setWithLog(job.getWithLog());
        snippet.setAllowServerIps(job.getAllowServerIps());
        snippet.setStatus(Constants.Job.JobStatus.READY.getCode());
        systemJobSnippetMapper.insert(snippet);
        log(job, "Create async snippet completed.");
        return snippet.getSnippetCode();
    }
 
    /**
     * 分发
     *
     * @param job JOB
     * @param jobParam JOB参数
     * @param distributeGroup 分发组
     * @return int
     */
    private int distribute (SystemJob job, JobParam jobParam, String distributeGroup) {
        log(job, "Begin distribute.");
        BaseDistributer distributer = Utils.SpringContext.getBean(job.getDistributeHandler(), BaseDistributer.class);
        List<List<?>> snippetDatas = distributer.getSnippetData(new BaseDistributer.DistributeDTO(job.getDistributeLimit()), jobParam);
        for (int i = 0; i < snippetDatas.size(); i++) {
            List snippetData = snippetDatas.get(i);
            SystemJobSnippet snippet = new SystemJobSnippet();
            snippet.setJobId(job.getId());
            snippet.setJobName(job.getJobName());
            snippet.setJobBusinessTime(jobParam.getBusinessTime());
            snippet.setJobDistributeGroup(distributeGroup);
            snippet.setHandler(job.getHandler());
            snippet.setSnippetIndex(i);
            snippet.setSnippetCode(UUID.randomUUID().toString());
            snippet.setSnippetData(JSON.toJSONString(snippetData));
            snippet.setSnippetDataSize(snippetData.size());
            snippet.setWithLog(job.getWithLog());
            snippet.setAllowServerIps(job.getAllowServerIps());
            snippet.setStatus(Constants.Job.JobStatus.READY.getCode());
            systemJobSnippetMapper.insert(snippet);
        }
        log(job, "distribute completed.");
        return snippetDatas.size();
    }
 
    /**
     * 验证Job是否可执行
     *
     * @param job JOB
     */
    private boolean checkJob (SystemJob job, Date businessTime) {
        // 执行时间验证(任务预计下一次执行<=当前执行时间即可执行)
        if (businessTime != null && job.getNextFireTime() != null && job.getNextFireTime().compareTo(businessTime) == 1) {
            return Boolean.FALSE;
        }
        // 服务器验证
        if (StringUtils.isNotBlank(job.getAllowServerIps())) {
            return job.getAllowServerIps().contains(Utils.Server.getIP());
        }
        return Boolean.TRUE;
    }
 
    /**
     * 打印日志
     *
     * @param job JOB
     * @param text 日志模版内容
     * @param params 日志参数
     */
    private void log (SystemJob job, String text, Object... params) {
        log.trace("{}: " + text, String.format("%s_%s", job.getJobName(), jobUnique.get()), params);
    }
 
    /**
     * 打印错误日志
     *
     * @param job JOB
     * @param e 异常对象
     */
    private void error (SystemJob job, Exception e) {
        log.error(String.format("EVA: %s_%s", job.getJobName(), jobUnique.get()) + ": Handle job throw an exception.", e);
    }
}