package com.doumee.config.quartz;
|
|
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
import com.doumee.core.job.SchedulerProxy;
|
import com.doumee.core.constants.Constants;
|
import com.doumee.dao.system.SystemJobMapper;
|
import com.doumee.dao.system.SystemJobSnippetMapper;
|
import com.doumee.dao.system.model.SystemJob;
|
import com.doumee.dao.system.model.SystemJobSnippet;
|
import com.doumee.service.system.SystemJobService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.PostConstruct;
|
import java.util.List;
|
|
/**
|
* 任务初始化
|
* @author dm
|
* @since 2025/03/31 16:44
|
*/
|
@Slf4j
|
@Component
|
public class JobInitializer {
|
|
@Autowired
|
private SystemJobService systemJobService;
|
|
@Autowired
|
private SystemJobMapper systemJobMapper;
|
|
@Autowired
|
private SystemJobSnippetMapper systemJobSnippetMapper;
|
|
@Autowired
|
private SchedulerProxy schedulerProxy;
|
|
/**
|
* 初始化Jobs
|
*/
|
@PostConstruct
|
public void initJobs () {
|
log.debug("Initializing jobs");
|
// 修复JOB
|
this.repairJobs();
|
// 初始化JOB
|
long startTime = System.currentTimeMillis();
|
SystemJob queryJobDto = new SystemJob();
|
queryJobDto.setStatus(Constants.Job.JobStatus.READY.getCode());
|
queryJobDto.setDisabled(Boolean.FALSE);
|
queryJobDto.setDeleted(Boolean.FALSE);
|
List<SystemJob> jobs = systemJobService.findList(queryJobDto);
|
for (SystemJob job : jobs) {
|
schedulerProxy.createJob(new SchedulerProxy.Job(job.getId(), job.getHandler(), job.getCronExpression(), job.getDisallowConcurrent()));
|
log.debug("job '{}' initialized.", job.getJobName());
|
}
|
log.debug("Jobs initialization completed in {} ms", System.currentTimeMillis() - startTime);
|
}
|
|
/**
|
* 修复Jobs和Snippets,防止因服务器宕机导致正在执行中的Job或Snippet状态未得到处理,导致任务或分片丢失
|
*/
|
private void repairJobs () {
|
log.debug("Repair jobs");
|
// 将运行中的JOB调整为准备状态 & 将预计下一次执行时间调整为null
|
systemJobMapper.update(null, new LambdaUpdateWrapper<SystemJob>()
|
.set(SystemJob::getStatus, Constants.Job.JobStatus.READY.getCode())
|
.set(SystemJob::getNextFireTime, null)
|
.eq(SystemJob::getStatus, Constants.Job.JobStatus.RUNNING.getCode())
|
.eq(SystemJob::getDeleted, Boolean.FALSE)
|
);
|
log.debug("Repair job snippets");
|
// 将运行中的SNIPPET调整为准备状态
|
SystemJobSnippet newSnippet = new SystemJobSnippet();
|
newSnippet.setStatus(Constants.Job.SnippetStatus.READY.getCode());
|
systemJobSnippetMapper.update(newSnippet, new LambdaUpdateWrapper<SystemJobSnippet>()
|
.eq(SystemJobSnippet::getStatus, Constants.Job.SnippetStatus.READY.getCode())
|
);
|
}
|
}
|