|
@@ -147,7 +147,6 @@ public class TaskExecutionEngine {
|
|
|
} else {
|
|
} else {
|
|
|
// WAIT节点:不需要ACK,直接推进
|
|
// WAIT节点:不需要ACK,直接推进
|
|
|
int waitSeconds = handler.getWaitSeconds(context);
|
|
int waitSeconds = handler.getWaitSeconds(context);
|
|
|
- int waitMinutes = waitSeconds / 60;
|
|
|
|
|
|
|
|
|
|
// 更新节点状态为SUCCESS
|
|
// 更新节点状态为SUCCESS
|
|
|
currentNode.setStatus(NodeStatus.SUCCESS.name());
|
|
currentNode.setStatus(NodeStatus.SUCCESS.name());
|
|
@@ -157,14 +156,14 @@ public class TaskExecutionEngine {
|
|
|
// 更新执行计划到Redis
|
|
// 更新执行计划到Redis
|
|
|
executionPlanStore.save(executionId, plan);
|
|
executionPlanStore.save(executionId, plan);
|
|
|
|
|
|
|
|
- // 标记推进(幂等)
|
|
|
|
|
- idempotencyManager.markProceed(executionId, nodeIndex);
|
|
|
|
|
|
|
+ // 幂等性保护:标记节点已推进
|
|
|
|
|
+// idempotencyManager.markProceed(executionId, nodeIndex);
|
|
|
|
|
|
|
|
// 发送NEXT_NODE延迟消息
|
|
// 发送NEXT_NODE延迟消息
|
|
|
- flowControlProducer.sendNextNode(executionId, nodeIndex, waitMinutes);
|
|
|
|
|
|
|
+ flowControlProducer.sendNextNode(executionId, nodeIndex, waitSeconds);
|
|
|
|
|
|
|
|
- log.info("WAIT节点执行完成,等待{}分钟后推进,executionId={}, nodeIndex={}",
|
|
|
|
|
- waitMinutes, executionId, nodeIndex);
|
|
|
|
|
|
|
+ log.info("WAIT节点执行完成,等待{}秒后推进,executionId={}, nodeIndex={}",
|
|
|
|
|
+ waitSeconds, executionId, nodeIndex);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
@@ -226,13 +225,13 @@ public class TaskExecutionEngine {
|
|
|
log.info("设备ACK全部成功,准备推进,executionId={}, nodeIndex={}",
|
|
log.info("设备ACK全部成功,准备推进,executionId={}, nodeIndex={}",
|
|
|
executionId, nodeIndex);
|
|
executionId, nodeIndex);
|
|
|
|
|
|
|
|
- // 标记推进(幂等)
|
|
|
|
|
- boolean canProceed = idempotencyManager.markProceed(executionId, nodeIndex);
|
|
|
|
|
- if (!canProceed) {
|
|
|
|
|
- log.warn("节点已被推进,忽略重复推进,executionId={}, nodeIndex={}",
|
|
|
|
|
- executionId, nodeIndex);
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // 幂等性保护:防止多个CHECK_ACK消息重复发送NEXT_NODE
|
|
|
|
|
+// boolean canProceed = idempotencyManager.markProceed(executionId, nodeIndex);
|
|
|
|
|
+// if (!canProceed) {
|
|
|
|
|
+// log.info("节点已被标记推进(可能是重复的CHECK_ACK消息),忽略,executionId={}, nodeIndex={}",
|
|
|
|
|
+// executionId, nodeIndex);
|
|
|
|
|
+// return;
|
|
|
|
|
+// }
|
|
|
|
|
|
|
|
// 更新节点状态为SUCCESS
|
|
// 更新节点状态为SUCCESS
|
|
|
node.setStatus(NodeStatus.SUCCESS.name());
|
|
node.setStatus(NodeStatus.SUCCESS.name());
|
|
@@ -305,7 +304,7 @@ public class TaskExecutionEngine {
|
|
|
if (!plan.hasNext(execution.getCurrentIndex())) {
|
|
if (!plan.hasNext(execution.getCurrentIndex())) {
|
|
|
// 全部节点执行完成
|
|
// 全部节点执行完成
|
|
|
log.info("所有节点执行完成,任务成功,executionId={}", executionId);
|
|
log.info("所有节点执行完成,任务成功,executionId={}", executionId);
|
|
|
-
|
|
|
|
|
|
|
+ execution.setExecutionPlan(plan);
|
|
|
execution.markAsSuccess();
|
|
execution.markAsSuccess();
|
|
|
execution.setFinishedAt(LocalDateTime.now());
|
|
execution.setFinishedAt(LocalDateTime.now());
|
|
|
taskExecutionRepository.updateByPrimaryKeySelective(execution);
|
|
taskExecutionRepository.updateByPrimaryKeySelective(execution);
|
|
@@ -355,6 +354,7 @@ public class TaskExecutionEngine {
|
|
|
// 更新节点状态为FAILED
|
|
// 更新节点状态为FAILED
|
|
|
ExecutionPlan plan = executionPlanStore.get(executionId);
|
|
ExecutionPlan plan = executionPlanStore.get(executionId);
|
|
|
if (plan != null) {
|
|
if (plan != null) {
|
|
|
|
|
+ execution.setExecutionPlan(plan);
|
|
|
ExecutionNode node = plan.getNode(nodeIndex);
|
|
ExecutionNode node = plan.getNode(nodeIndex);
|
|
|
if (node != null) {
|
|
if (node != null) {
|
|
|
node.setStatus(NodeStatus.FAILED.name());
|
|
node.setStatus(NodeStatus.FAILED.name());
|
|
@@ -393,7 +393,7 @@ public class TaskExecutionEngine {
|
|
|
// 发送报警通知(异步,不阻塞)
|
|
// 发送报警通知(异步,不阻塞)
|
|
|
if (alarmId != null) {
|
|
if (alarmId != null) {
|
|
|
try {
|
|
try {
|
|
|
- alarmNotificationService.sendAlarm(executionId);
|
|
|
|
|
|
|
+// alarmNotificationService.sendAlarm(executionId);
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error("发送报警通知异常,executionId={}, alarmId={}", executionId, alarmId, e);
|
|
log.error("发送报警通知异常,executionId={}, alarmId={}", executionId, alarmId, e);
|
|
|
// 不抛出异常,避免影响主流程
|
|
// 不抛出异常,避免影响主流程
|
|
@@ -413,6 +413,19 @@ public class TaskExecutionEngine {
|
|
|
public void handleAckTimeout(Long executionId, Integer nodeIndex) {
|
|
public void handleAckTimeout(Long executionId, Integer nodeIndex) {
|
|
|
log.warn("ACK超时兜底处理,executionId={}, nodeIndex={}", executionId, nodeIndex);
|
|
log.warn("ACK超时兜底处理,executionId={}, nodeIndex={}", executionId, nodeIndex);
|
|
|
|
|
|
|
|
|
|
+ // 加载执行实例
|
|
|
|
|
+ TaskExecution execution = taskExecutionRepository.selectByPrimaryKey(executionId);
|
|
|
|
|
+ if (execution == null) {
|
|
|
|
|
+ log.error("执行实例不存在,executionId={}", executionId);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (execution.isTerminal()) {
|
|
|
|
|
+ log.warn("执行已终止,忽略超时处理,executionId={}, status={}",
|
|
|
|
|
+ executionId, execution.getStatus());
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// 幂等性检查1:如果节点已推进,忽略超时
|
|
// 幂等性检查1:如果节点已推进,忽略超时
|
|
|
if (idempotencyManager.isProceed(executionId, nodeIndex)) {
|
|
if (idempotencyManager.isProceed(executionId, nodeIndex)) {
|
|
|
log.info("节点已推进,忽略超时处理,executionId={}, nodeIndex={}",
|
|
log.info("节点已推进,忽略超时处理,executionId={}, nodeIndex={}",
|
|
@@ -423,13 +436,6 @@ public class TaskExecutionEngine {
|
|
|
// 幂等性检查2:如果超时已处理,忽略重复
|
|
// 幂等性检查2:如果超时已处理,忽略重复
|
|
|
// (已在Consumer中处理)
|
|
// (已在Consumer中处理)
|
|
|
|
|
|
|
|
- // 加载执行实例
|
|
|
|
|
- TaskExecution execution = taskExecutionRepository.selectByPrimaryKey(executionId);
|
|
|
|
|
- if (execution == null) {
|
|
|
|
|
- log.error("执行实例不存在,executionId={}", executionId);
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
// 检查ACK状态
|
|
// 检查ACK状态
|
|
|
ExecutionPlan plan = executionPlanStore.get(executionId);
|
|
ExecutionPlan plan = executionPlanStore.get(executionId);
|
|
|
if (plan == null) {
|
|
if (plan == null) {
|