利用数据库锁使定时任务在分布式环境中只执行一次
最编程
2024-03-21 07:27:14
...
我们有个task服务,这个服务是双节点,即两台服务器上都部署有这个服务,但是每天的定时任务我们只想执行一次,于是我们想到用数据库的行锁来实现这种功能。
核心代码是JobLocksMo lock = brJobLocksMongoDaoImpl.lock(jobLocksMo.getId(), jobLocksMo.getVersion());这块,当拿到任务Id后,判断当前任务是否被执行,如果没有被执行则给任务上锁,并把版本加1,这样下一个线程进来就没法更新数据库行记录
(在 MongoDB 中,findAndModify 方法用于查找文档并对其进行修改。如果查询条件找不到文档,findAndModify 方法将返回 null,如果更新成功则返回最新的对象)。
@Async("taskExecutor")
@Scheduled(cron = "0 0 5 9 12 ?")
public void updateAgentReport() {
log.info("处理顾问历史数据,开始....");
JobLocksMo jobLocksMo = brJobLocksMongoDaoImpl.findById("br_data_job_lock_report_agent");
if (null == jobLocksMo || jobLocksMo.getStatus() == 1) {
log.info("任务锁未释放,不执行后续操作");
return;
}
StopWatch watch = new StopWatch();
watch.start();
try {
log.info("br_data_job_lock_report_agent status : " + jobLocksMo.getStatus());
if (jobLocksMo.getStatus() == 0) {
// 如果当前任务处于空闲状态则给当前任务加锁,核对版本正确才更新
JobLocksMo lock = brJobLocksMongoDaoImpl.lock(jobLocksMo.getId(), jobLocksMo.getVersion());
if (null != lock) {
LocalDateRangeBean localDateRangeBean = new LocalDateRangeBean();
localDateRangeBean.setStartDate(LocalDate.of(2019, 8, 1));
localDateRangeBean.setEndDate(LocalDate.now());
historyServiceImpl.updateAgentReport(localDateRangeBean);
}
}
} catch (Exception e) {
log.error("12.9 5点处理顾问历史数据异常:", e);
} finally {
watch.stop();
// 只要是给当前任务加上了锁最后必须释放锁
if (jobLocksMo.getStatus() == 0) {
brJobLocksMongoDaoImpl.unlock(jobLocksMo.getId());
}
}
}
public JobLocksMo lock(String id, Long version) {
Query query = new Query();
Criteria criteria = new Criteria();
criteria.and("id").is(id);
criteria.and("status").is(0);
criteria.and("version").is(version);
query.addCriteria(criteria);
Update update = new Update();
update.set("status", 1);
update.inc("version", 1);
update.set("updateId", "system_job_lock");
update.set("updateTime", LocalDateTime.now());
return brMongoTemplate.findAndModify(query, update, JobLocksMo.class);
}
public JobLocksMo unlock(String id) {
Query query = new Query();
Criteria criteria = new Criteria();
criteria.and("id").is(id);
criteria.and("status").is(1);
query.addCriteria(criteria);
Update update = new Update();
update.set("status", 0);
update.set("updateId", "system_job_unlock");
update.set("updateTime", LocalDateTime.now());
return brMongoTemplate.findAndModify(query, update, JobLocksMo.class);