package com.doumee.config.quartz; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.doumee.core.constants.Constants; import com.doumee.core.job.SchedulerProxy; import com.doumee.dao.system.SystemJobMapper; import com.doumee.dao.system.SystemJobSnippetMapper; import com.doumee.dao.system.model.SystemJob; import com.doumee.dao.system.model.SystemJobSnippet; import com.doumee.service.system.SystemJobService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.List; /** * 任务初始化 * @author Eva.Caesar Liu * @since 2025/03/31 16:44 */ @Slf4j @Component public class JobInitializer { @Autowired private SystemJobService systemJobService; @Autowired private SystemJobMapper systemJobMapper; @Autowired private SystemJobSnippetMapper systemJobSnippetMapper; @Autowired private SchedulerProxy schedulerProxy; /** * 初始化Jobs */ @PostConstruct public void initJobs () { log.debug("Initializing jobs"); // 修复JOB this.repairJobs(); // 初始化JOB long startTime = System.currentTimeMillis(); SystemJob queryJobDto = new SystemJob(); queryJobDto.setStatus(Constants.Job.JobStatus.READY.getCode()); queryJobDto.setDisabled(Boolean.FALSE); queryJobDto.setDeleted(Boolean.FALSE); List jobs = systemJobService.findList(queryJobDto); for (SystemJob job : jobs) { schedulerProxy.createJob(new SchedulerProxy.Job(job.getId(), job.getHandler(), job.getCronExpression(), job.getDisallowConcurrent())); log.debug("job '{}' initialized.", job.getJobName()); } log.debug("Jobs initialization completed in {} ms", System.currentTimeMillis() - startTime); } /** * 修复Jobs和Snippets,防止因服务器宕机导致正在执行中的Job或Snippet状态未得到处理,导致任务或分片丢失 */ private void repairJobs () { log.debug("Repair jobs"); // 将运行中的JOB调整为准备状态 & 将预计下一次执行时间调整为null systemJobMapper.update(null, new LambdaUpdateWrapper() .set(SystemJob::getStatus, Constants.Job.JobStatus.READY.getCode()) .set(SystemJob::getNextFireTime, null) .eq(SystemJob::getStatus, Constants.Job.JobStatus.RUNNING.getCode()) .eq(SystemJob::getDeleted, Boolean.FALSE) ); log.debug("Repair job snippets"); // 将运行中的SNIPPET调整为准备状态 SystemJobSnippet newSnippet = new SystemJobSnippet(); newSnippet.setStatus(Constants.Job.SnippetStatus.READY.getCode()); systemJobSnippetMapper.update(newSnippet, new LambdaUpdateWrapper() .eq(SystemJobSnippet::getStatus, Constants.Job.SnippetStatus.READY.getCode()) ); } }