|
@@ -9,10 +9,14 @@ import cn.sciento.farm.automationv2.domain.enums.AckStatus;
|
|
|
import cn.sciento.farm.automationv2.domain.enums.ExecutionStatus;
|
|
import cn.sciento.farm.automationv2.domain.enums.ExecutionStatus;
|
|
|
import cn.sciento.farm.automationv2.domain.enums.NodeStatus;
|
|
import cn.sciento.farm.automationv2.domain.enums.NodeStatus;
|
|
|
import cn.sciento.farm.automationv2.domain.enums.NodeType;
|
|
import cn.sciento.farm.automationv2.domain.enums.NodeType;
|
|
|
|
|
+import cn.sciento.farm.automationv2.domain.repository.IrrigationTaskRepository;
|
|
|
|
|
+import cn.sciento.farm.automationv2.domain.repository.TaskExecutionRepository;
|
|
|
import cn.sciento.farm.automationv2.domain.valueobject.ExecutionNode;
|
|
import cn.sciento.farm.automationv2.domain.valueobject.ExecutionNode;
|
|
|
|
|
+import cn.sciento.farm.automationv2.domain.valueobject.ExecutionPlan;
|
|
|
import cn.sciento.farm.automationv2.infra.mq.producer.FlowControlProducer;
|
|
import cn.sciento.farm.automationv2.infra.mq.producer.FlowControlProducer;
|
|
|
import cn.sciento.farm.automationv2.infra.redis.AckManager;
|
|
import cn.sciento.farm.automationv2.infra.redis.AckManager;
|
|
|
import cn.sciento.farm.automationv2.infra.redis.IdempotencyManager;
|
|
import cn.sciento.farm.automationv2.infra.redis.IdempotencyManager;
|
|
|
|
|
+import cn.sciento.farm.automationv2.infra.redis.ExecutionPlanStore;
|
|
|
import cn.sciento.farm.automationv2.infra.mapper.IrrigationTaskMapper;
|
|
import cn.sciento.farm.automationv2.infra.mapper.IrrigationTaskMapper;
|
|
|
import cn.sciento.farm.automationv2.infra.mapper.TaskExecutionMapper;
|
|
import cn.sciento.farm.automationv2.infra.mapper.TaskExecutionMapper;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.RequiredArgsConstructor;
|
|
@@ -41,8 +45,8 @@ import java.util.stream.Collectors;
|
|
|
@RequiredArgsConstructor
|
|
@RequiredArgsConstructor
|
|
|
public class TaskExecutionEngine {
|
|
public class TaskExecutionEngine {
|
|
|
|
|
|
|
|
- private final TaskExecutionMapper taskExecutionMapper;
|
|
|
|
|
- private final IrrigationTaskMapper irrigationTaskMapper;
|
|
|
|
|
|
|
+ private final TaskExecutionRepository taskExecutionRepository;
|
|
|
|
|
+ private final IrrigationTaskRepository irrigationTaskRepository;
|
|
|
private final NodeHandlerFactory nodeHandlerFactory;
|
|
private final NodeHandlerFactory nodeHandlerFactory;
|
|
|
private final FlowControlProducer flowControlProducer;
|
|
private final FlowControlProducer flowControlProducer;
|
|
|
private final AckManager ackManager;
|
|
private final AckManager ackManager;
|
|
@@ -51,6 +55,7 @@ public class TaskExecutionEngine {
|
|
|
private final SafeShutdownService safeShutdownService;
|
|
private final SafeShutdownService safeShutdownService;
|
|
|
private final AlarmRecordService alarmRecordService;
|
|
private final AlarmRecordService alarmRecordService;
|
|
|
private final AlarmNotificationService alarmNotificationService;
|
|
private final AlarmNotificationService alarmNotificationService;
|
|
|
|
|
+ private final ExecutionPlanStore executionPlanStore;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 执行当前节点
|
|
* 执行当前节点
|
|
@@ -62,7 +67,7 @@ public class TaskExecutionEngine {
|
|
|
log.info("开始执行当前节点,executionId={}", executionId);
|
|
log.info("开始执行当前节点,executionId={}", executionId);
|
|
|
|
|
|
|
|
// 加载执行实例
|
|
// 加载执行实例
|
|
|
- TaskExecution execution = taskExecutionMapper.selectById(executionId);
|
|
|
|
|
|
|
+ TaskExecution execution = taskExecutionRepository.selectByPrimaryKey(executionId);
|
|
|
if (execution == null) {
|
|
if (execution == null) {
|
|
|
log.error("执行实例不存在,executionId={}", executionId);
|
|
log.error("执行实例不存在,executionId={}", executionId);
|
|
|
return;
|
|
return;
|
|
@@ -76,7 +81,14 @@ public class TaskExecutionEngine {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 获取当前节点
|
|
// 获取当前节点
|
|
|
- ExecutionNode currentNode = execution.getCurrentNode();
|
|
|
|
|
|
|
+ ExecutionPlan plan = executionPlanStore.get(executionId);
|
|
|
|
|
+ if (plan == null) {
|
|
|
|
|
+ log.error("执行计划不存在,executionId={}", executionId);
|
|
|
|
|
+ handleNodeFailure(executionId, execution.getCurrentIndex(), "执行计划不存在");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ ExecutionNode currentNode = plan.getNode(execution.getCurrentIndex());
|
|
|
if (currentNode == null) {
|
|
if (currentNode == null) {
|
|
|
log.error("当前节点不存在,executionId={}, currentIndex={}",
|
|
log.error("当前节点不存在,executionId={}, currentIndex={}",
|
|
|
executionId, execution.getCurrentIndex());
|
|
executionId, execution.getCurrentIndex());
|
|
@@ -97,15 +109,18 @@ public class TaskExecutionEngine {
|
|
|
execution.setStatus(ExecutionStatus.RUNNING);
|
|
execution.setStatus(ExecutionStatus.RUNNING);
|
|
|
execution.updateHeartbeat();
|
|
execution.updateHeartbeat();
|
|
|
|
|
|
|
|
- int updated = taskExecutionMapper.updateByVersion(execution);
|
|
|
|
|
|
|
+ int updated = taskExecutionRepository.updateByPrimaryKeySelective(execution);
|
|
|
if (updated == 0) {
|
|
if (updated == 0) {
|
|
|
log.error("更新执行实例失败(乐观锁冲突),executionId={}, version={}",
|
|
log.error("更新执行实例失败(乐观锁冲突),executionId={}, version={}",
|
|
|
- executionId, execution.getVersion());
|
|
|
|
|
|
|
+ executionId, execution.getObjectVersionNumber());
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // 更新执行计划到Redis
|
|
|
|
|
+ executionPlanStore.save(executionId, plan);
|
|
|
|
|
+
|
|
|
// 构建执行上下文
|
|
// 构建执行上下文
|
|
|
- IrrigationTask task = irrigationTaskMapper.selectById(execution.getTaskId());
|
|
|
|
|
|
|
+ IrrigationTask task = irrigationTaskRepository.selectByPrimaryKey(execution.getTaskId());
|
|
|
ExecutionContext context = ExecutionContext.builder()
|
|
ExecutionContext context = ExecutionContext.builder()
|
|
|
.execution(execution)
|
|
.execution(execution)
|
|
|
.currentNode(currentNode)
|
|
.currentNode(currentNode)
|
|
@@ -137,7 +152,10 @@ public class TaskExecutionEngine {
|
|
|
// 更新节点状态为SUCCESS
|
|
// 更新节点状态为SUCCESS
|
|
|
currentNode.setStatus(NodeStatus.SUCCESS.name());
|
|
currentNode.setStatus(NodeStatus.SUCCESS.name());
|
|
|
currentNode.setFinishedAt(LocalDateTime.now());
|
|
currentNode.setFinishedAt(LocalDateTime.now());
|
|
|
- taskExecutionMapper.updateByVersion(execution);
|
|
|
|
|
|
|
+ taskExecutionRepository.updateByPrimaryKeySelective(execution);
|
|
|
|
|
+
|
|
|
|
|
+ // 更新执行计划到Redis
|
|
|
|
|
+ executionPlanStore.save(executionId, plan);
|
|
|
|
|
|
|
|
// 标记推进(幂等)
|
|
// 标记推进(幂等)
|
|
|
idempotencyManager.markProceed(executionId, nodeIndex);
|
|
idempotencyManager.markProceed(executionId, nodeIndex);
|
|
@@ -166,7 +184,7 @@ public class TaskExecutionEngine {
|
|
|
log.info("检查ACK状态,executionId={}, nodeIndex={}", executionId, nodeIndex);
|
|
log.info("检查ACK状态,executionId={}, nodeIndex={}", executionId, nodeIndex);
|
|
|
|
|
|
|
|
// 加载执行实例
|
|
// 加载执行实例
|
|
|
- TaskExecution execution = taskExecutionMapper.selectById(executionId);
|
|
|
|
|
|
|
+ TaskExecution execution = taskExecutionRepository.selectByPrimaryKey(executionId);
|
|
|
if (execution == null) {
|
|
if (execution == null) {
|
|
|
log.error("执行实例不存在,executionId={}", executionId);
|
|
log.error("执行实例不存在,executionId={}", executionId);
|
|
|
return;
|
|
return;
|
|
@@ -180,7 +198,13 @@ public class TaskExecutionEngine {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 获取节点
|
|
// 获取节点
|
|
|
- ExecutionNode node = execution.getExecutionPlan().getNode(nodeIndex);
|
|
|
|
|
|
|
+ ExecutionPlan plan = executionPlanStore.get(executionId);
|
|
|
|
|
+ if (plan == null) {
|
|
|
|
|
+ log.error("执行计划不存在,executionId={}", executionId);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ ExecutionNode node = plan.getNode(nodeIndex);
|
|
|
if (node == null) {
|
|
if (node == null) {
|
|
|
log.error("节点不存在,executionId={}, nodeIndex={}", executionId, nodeIndex);
|
|
log.error("节点不存在,executionId={}, nodeIndex={}", executionId, nodeIndex);
|
|
|
return;
|
|
return;
|
|
@@ -214,7 +238,10 @@ public class TaskExecutionEngine {
|
|
|
node.setStatus(NodeStatus.SUCCESS.name());
|
|
node.setStatus(NodeStatus.SUCCESS.name());
|
|
|
node.setFinishedAt(LocalDateTime.now());
|
|
node.setFinishedAt(LocalDateTime.now());
|
|
|
execution.updateHeartbeat();
|
|
execution.updateHeartbeat();
|
|
|
- taskExecutionMapper.updateByVersion(execution);
|
|
|
|
|
|
|
+ taskExecutionRepository.updateByPrimaryKeySelective(execution);
|
|
|
|
|
+
|
|
|
|
|
+ // 更新执行计划到Redis
|
|
|
|
|
+ executionPlanStore.save(executionId, plan);
|
|
|
|
|
|
|
|
// 发送NEXT_NODE消息(立即,延迟=0)
|
|
// 发送NEXT_NODE消息(立即,延迟=0)
|
|
|
flowControlProducer.sendNextNode(executionId, nodeIndex, 0);
|
|
flowControlProducer.sendNextNode(executionId, nodeIndex, 0);
|
|
@@ -262,20 +289,26 @@ public class TaskExecutionEngine {
|
|
|
log.info("推进到下一节点,executionId={}", executionId);
|
|
log.info("推进到下一节点,executionId={}", executionId);
|
|
|
|
|
|
|
|
// 加载执行实例
|
|
// 加载执行实例
|
|
|
- TaskExecution execution = taskExecutionMapper.selectById(executionId);
|
|
|
|
|
|
|
+ TaskExecution execution = taskExecutionRepository.selectByPrimaryKey(executionId);
|
|
|
if (execution == null) {
|
|
if (execution == null) {
|
|
|
log.error("执行实例不存在,executionId={}", executionId);
|
|
log.error("执行实例不存在,executionId={}", executionId);
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 检查是否有下一节点
|
|
// 检查是否有下一节点
|
|
|
- if (!execution.hasNextNode()) {
|
|
|
|
|
|
|
+ ExecutionPlan plan = executionPlanStore.get(executionId);
|
|
|
|
|
+ if (plan == null) {
|
|
|
|
|
+ log.error("执行计划不存在,executionId={}", executionId);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (!plan.hasNext(execution.getCurrentIndex())) {
|
|
|
// 全部节点执行完成
|
|
// 全部节点执行完成
|
|
|
log.info("所有节点执行完成,任务成功,executionId={}", executionId);
|
|
log.info("所有节点执行完成,任务成功,executionId={}", executionId);
|
|
|
|
|
|
|
|
execution.markAsSuccess();
|
|
execution.markAsSuccess();
|
|
|
execution.setFinishedAt(LocalDateTime.now());
|
|
execution.setFinishedAt(LocalDateTime.now());
|
|
|
- taskExecutionMapper.updateByVersion(execution);
|
|
|
|
|
|
|
+ taskExecutionRepository.updateByPrimaryKeySelective(execution);
|
|
|
|
|
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
@@ -288,11 +321,11 @@ public class TaskExecutionEngine {
|
|
|
// 乐观锁更新
|
|
// 乐观锁更新
|
|
|
execution.setCurrentIndex(nextIndex);
|
|
execution.setCurrentIndex(nextIndex);
|
|
|
execution.updateHeartbeat();
|
|
execution.updateHeartbeat();
|
|
|
- int updated = taskExecutionMapper.updateByVersion(execution);
|
|
|
|
|
|
|
+ int updated = taskExecutionRepository.updateByPrimaryKeySelective(execution);
|
|
|
|
|
|
|
|
if (updated == 0) {
|
|
if (updated == 0) {
|
|
|
log.error("推进节点失败(乐观锁冲突),executionId={}, version={}",
|
|
log.error("推进节点失败(乐观锁冲突),executionId={}, version={}",
|
|
|
- executionId, execution.getVersion());
|
|
|
|
|
|
|
+ executionId, execution.getObjectVersionNumber());
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -313,23 +346,28 @@ public class TaskExecutionEngine {
|
|
|
executionId, nodeIndex, failReason);
|
|
executionId, nodeIndex, failReason);
|
|
|
|
|
|
|
|
// 加载执行实例
|
|
// 加载执行实例
|
|
|
- TaskExecution execution = taskExecutionMapper.selectById(executionId);
|
|
|
|
|
|
|
+ TaskExecution execution = taskExecutionRepository.selectByPrimaryKey(executionId);
|
|
|
if (execution == null) {
|
|
if (execution == null) {
|
|
|
log.error("执行实例不存在,executionId={}", executionId);
|
|
log.error("执行实例不存在,executionId={}", executionId);
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 更新节点状态为FAILED
|
|
// 更新节点状态为FAILED
|
|
|
- ExecutionNode node = execution.getExecutionPlan().getNode(nodeIndex);
|
|
|
|
|
- if (node != null) {
|
|
|
|
|
- node.setStatus(NodeStatus.FAILED.name());
|
|
|
|
|
- node.setFinishedAt(LocalDateTime.now());
|
|
|
|
|
|
|
+ ExecutionPlan plan = executionPlanStore.get(executionId);
|
|
|
|
|
+ if (plan != null) {
|
|
|
|
|
+ ExecutionNode node = plan.getNode(nodeIndex);
|
|
|
|
|
+ if (node != null) {
|
|
|
|
|
+ node.setStatus(NodeStatus.FAILED.name());
|
|
|
|
|
+ node.setFinishedAt(LocalDateTime.now());
|
|
|
|
|
+ // 更新执行计划到Redis
|
|
|
|
|
+ executionPlanStore.save(executionId, plan);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 更新执行实例状态为FAILED
|
|
// 更新执行实例状态为FAILED
|
|
|
execution.markAsFailed(failReason);
|
|
execution.markAsFailed(failReason);
|
|
|
execution.setFinishedAt(LocalDateTime.now());
|
|
execution.setFinishedAt(LocalDateTime.now());
|
|
|
- taskExecutionMapper.updateByVersion(execution);
|
|
|
|
|
|
|
+ taskExecutionRepository.updateByPrimaryKeySelective(execution);
|
|
|
|
|
|
|
|
// 触发安全关闭
|
|
// 触发安全关闭
|
|
|
log.info("触发安全关闭,executionId={}", executionId);
|
|
log.info("触发安全关闭,executionId={}", executionId);
|
|
@@ -347,7 +385,7 @@ public class TaskExecutionEngine {
|
|
|
.map(d -> "\"" + d + "\"")
|
|
.map(d -> "\"" + d + "\"")
|
|
|
.collect(Collectors.joining(",")));
|
|
.collect(Collectors.joining(",")));
|
|
|
execution.setSafeCloseDetails(closeDetails);
|
|
execution.setSafeCloseDetails(closeDetails);
|
|
|
- taskExecutionMapper.updateByVersion(execution);
|
|
|
|
|
|
|
+ taskExecutionRepository.updateByPrimaryKeySelective(execution);
|
|
|
|
|
|
|
|
// 创建报警记录
|
|
// 创建报警记录
|
|
|
Long alarmId = alarmRecordService.createTaskFailureAlarm(executionId, failReason);
|
|
Long alarmId = alarmRecordService.createTaskFailureAlarm(executionId, failReason);
|
|
@@ -386,14 +424,20 @@ public class TaskExecutionEngine {
|
|
|
// (已在Consumer中处理)
|
|
// (已在Consumer中处理)
|
|
|
|
|
|
|
|
// 加载执行实例
|
|
// 加载执行实例
|
|
|
- TaskExecution execution = taskExecutionMapper.selectById(executionId);
|
|
|
|
|
|
|
+ TaskExecution execution = taskExecutionRepository.selectByPrimaryKey(executionId);
|
|
|
if (execution == null) {
|
|
if (execution == null) {
|
|
|
log.error("执行实例不存在,executionId={}", executionId);
|
|
log.error("执行实例不存在,executionId={}", executionId);
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 检查ACK状态
|
|
// 检查ACK状态
|
|
|
- ExecutionNode node = execution.getExecutionPlan().getNode(nodeIndex);
|
|
|
|
|
|
|
+ ExecutionPlan plan = executionPlanStore.get(executionId);
|
|
|
|
|
+ if (plan == null) {
|
|
|
|
|
+ log.error("执行计划不存在,executionId={}", executionId);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ ExecutionNode node = plan.getNode(nodeIndex);
|
|
|
if (node == null) {
|
|
if (node == null) {
|
|
|
log.error("节点不存在,executionId={}, nodeIndex={}", executionId, nodeIndex);
|
|
log.error("节点不存在,executionId={}, nodeIndex={}", executionId, nodeIndex);
|
|
|
return;
|
|
return;
|