版本: v1.6 日期: 2026-02-26 状态: 草稿 适用范围: 后端开发团队、架构评审
本系统为智能农业场景下的自动化轮灌控制平台,支持对多个灌区(灌溉组)按照预设规则进行有序、安全的轮流灌溉。系统具备以下核心能力:
轮灌任务支持以下触发方式,各触发方式可共存,同一任务允许同时关联多个触发来源(如既配定时又配联动)。无论哪个触发来源先触发,均创建同一套执行计划;任务正在运行时,其他触发来源的信号将被拒绝(见 §6.6 并发触发防护)。
重要说明:一个任务可以同时配置多个定时规则,模式 A 和模式 B 可以共存,每种模式也可以配置多条。
模式 A:按星期 + 时间重复
模式 B:启动时间 + 执行次数 + 执行周期
多规则配置示例:
{
"taskName": "农场A区智能轮灌",
"scheduleRules": [
{
"ruleName": "每周一三五凌晨2点",
"scheduleType": "CRON",
"cronExpression": "0 0 2 ? * MON,WED,FRI",
"enabled": true
},
{
"ruleName": "每月1号和15号",
"scheduleType": "CRON",
"cronExpression": "0 0 2 1,15 * ?",
"enabled": true
},
{
"ruleName": "连续10天灌溉计划",
"scheduleType": "SIMPLE",
"startTime": "2026-03-10 06:00:00",
"intervalDays": 1,
"totalTimes": 10,
"enabled": true
}
]
}
技术实现:
task_schedule_rule 表中大于(>) / 大于等于(>=) / 小于(<) / 小于等于(<=) / 等于(==) + 阈值。联动规则配置字段:
| 字段 | 说明 |
|---|---|
| 传感器设备 ID | 监听的具体传感器设备 |
| 比较符 | > / >= / < / <= / == |
| 阈值 | 触发条件的数值边界 |
| 冷却时间 | 单位:分钟,触发后的静默时间窗口 |
| 关联任务 ID | 满足条件时触发的轮灌任务 |
用户通过 APP 或后台手动点击“立即执行”,任务即时启动,无需等待定时或传感器触发。
每个轮灌任务可以独立配置以下设备:
灌溉系统的压力控制分为水泵压力和球阀压力两个维度,两者可以同时存在、独立生效:
维度一:水泵压力控制
通过变频恒压水泵控制管路压力,系统下发目标压力值给水泵,水泵自动调频维持恒压。任务创建时选择水泵压力模式:
| 模式 | 说明 | 配置位置 |
|---|---|---|
| 无 | 水泵作为普通水泵启停,不下发压力指令 | 无额外配置 |
| 统一压力 | 所有灌区使用同一压力值,整个任务只下发一次 SET_PUMP_PRESSURE |
任务级配置 targetPressureKpa |
| 分区压力 | 每个灌区可设置不同的水泵目标压力,灌区切换时自动下发新压力值 | 灌溉组级配置 zonePressureKpa |
维度二:球阀压力控制
通过球阀自身的压力控制功能调节管路压力,每个球阀在灌溉组中配置目标压力,系统在开启球阀时同时下发角度和压力指令。球阀压力控制独立于水泵压力控制,两者可同时工作,系统根据不同的配置综合控制水泵和球阀,确保整个灌溉系统的稳定性和安全性。
| 配置位置 | 说明 |
|---|---|
| 灌溉组设备列表中每个球阀 | 配置 targetPressureKpa,系统在 OPEN_GROUP 时同时下发角度+压力指令给球阀 |
共存示例:水泵设为统一压力 300kPa,同时灌区内的球阀各自配置了目标压力,系统会在启动时下发
SET_PUMP_PRESSURE(300)给水泵,同时在OPEN_GROUP时将各球阀的目标压力随开启指令一起下发。
水泵配置参数:
| 参数 | 说明 |
|---|---|
| 水泵压力模式 | NONE(普通启停)/ PUMP_UNIFIED(统一压力)/ PUMP_ZONE(分区压力) |
| 统一目标压力值 | 仅 PUMP_UNIFIED 模式需配置,单位:kPa |
施肥机由以下子设备控制,每个子设备独立接收指令和回传 ACK:
| 子设备 | 说明 |
|---|---|
| 施肥泵 | 将肥料从储肥桶注入灌溉管路的计量泵 |
| 搅拌电机 | 对储肥桶中的肥料持续搅拌,确保肥料均匀溶解 |
注意:施肥机的主水泵与灌溉系统的水泵是同一个水泵,都是灌溉清水的,施肥泵用于抽取肥料,搅拌电机用于搅拌肥料。此设计节省设备成本并简化系统操作。
施肥的配置参数、控制模式、时序规则及异常处理见 §2.7,技术方案见 §5.1.4。
| 参数 | 说明 | 默认值 |
|---|---|---|
switch_stable_seconds |
灌区切换时,开启下一灌区成功后到关闭上一灌区之前的稳压等待时间(秒)。用于平衡管路水压,防止压力突变损坏设备或爆管 | 5 秒 |
设计原则:
switch_stable_seconds由计划生成引擎在每个OPEN_GROUP(ZoneI)与CLOSE_GROUP(ZoneI-1)之间自动插入一个ZONE_SWITCH_WAIT节点,用户仅需配置秒数,无需关心插入位置。
灌溉组是灌区设备的预配置模板,用于统一管理一个灌区内的设备组合及其控制参数。灌溉组独立于任务存在,可被多个任务复用。
灌溉组配置参数:
| 参数 | 说明 |
|---|---|
| 灌区组名称 | 用于标识灌区组,方便管理和报警展示 |
| 分区压力 | 本灌区的水泵目标压力(kPa);仅在任务压力模式为 PUMP_ZONE 时生效,灌区切换时系统自动下发新压力值 |
| 设备列表 | 本灌区包含的设备及其控制参数(见下表) |
设备列表中每个设备的参数:
| 设备类型 | 参数 | 说明 |
|---|---|---|
| 电磁阀 | 设备ID | 开(ON)/ 关(OFF)控制 |
| 球阀 | 设备ID、目标角度 | 角度范围 0~100%,用于流量调节 |
| 球阀 | 目标压力 | 本球阀的目标压力(kPa);若球阀支持压力控制则配置,系统在开启球阀时同时下发角度和压力指令 |
一个灌溉组内可同时包含多个电磁阀和球阀,所有设备同步下发指令,全部 ACK 成功方视为开启/关闭成功。
配置说明:灌溉组中的分区压力仅在水泵压力模式为
PUMP_ZONE时生效;球阀目标压力只要球阀配置了即生效(与水泵压力可共存)。用户在灌溉组中统一配置好设备和压力参数后,创建任务时只需选择灌溉组并配置每个灌区的灌溉时长即可。
创建任务时,用户从已有灌溉组中选择灌区,并为每个灌区配置灌溉时长:
| 参数 | 说明 |
|---|---|
| 灌溉组 | 从已配置的灌溉组中选择,选择顺序即灌溉意图顺序 |
| 灌溉时长 | 本灌区的灌溉持续时间(分钟),每个灌区可设置不同时长 |
施肥配置:
是否施肥由任务是否选择了施肥机设备来决定,灌溉组中不需要额外的“是否施肥”字段。当任务选择了施肥机后,需配置施肥计划参数(任务级,所有灌区共用):
| 参数 | 说明 |
|---|---|
| 施肥延迟时间 | (分钟)灌区开始灌溉后,等待多少分钟再启动搅拌,相对于灌溉开始时间计算 |
| 搅拌提前时长 | (分钟)搅拌电机比施肥泵提前多少分钟启动,一直搅拌到施肥结束 |
| 施肥时长 | (分钟)施肥泵持续运行时长;与施肥量二选一 |
| 施肥量 | (升)施肥目标容积,下发定量指令给施肥机,施肥机自动停止;与施肥时长二选一 |
设计说明:灌溉时长属于任务级配置而非灌溉组模板配置,因为同一灌溉组在不同任务中可能需要不同的灌溉时长。施肥参数为任务级统一配置,所有启用施肥的灌区共用同一套施肥参数,系统根据每个灌区的灌溉时长自动判断施肥时间窗口是否充足。用户在表单中配置的灌溉组列表及顺序代表灌溉意图,实际执行顺序由系统根据安全规则自动编排。
安全执行规则是本系统的核心约束,所有执行计划必须严格遵守以下规则:
| 规则编号 | 规则描述 |
|---|---|
| S-01 | 水泵必须在第一个灌区开启成功之后才能启动,严禁在任何灌区开启前运行水泵 |
| S-02 | 灌区切换时,必须先开启下一个灌区成功,再等待稳压时间(switch_stable_seconds),最后关闭上一个灌区(先开→稳压→后关),确保管路中始终有水流路径且压力平稳 |
| S-03 | 正常结束时,固定执行顺序:先关水泵 → 最后关闭末个灌区电磁阀/球阀 |
| S-04 | 开路优先原则(先开后关,全场景适用):任何涉及设备通断的操作,必须先建立新通路/开启新设备,再断开旧通路/关闭旧设备,防止管路瞬间封闭导致水锤效应或爆管 |
| S-05 | 故障接续启动序列:任务从 PAUSED 恢复执行时,若水泵已在安全关闭中停止,必须按"先开灌区 → 再启水泵"的顺序重建启动序列,严禁直接继续执行 |
示例 1:3 个灌区,水泵普通启停 + 球阀配置了目标压力
步骤 操作
---- ──────────────────────────
1 开启灌区1(电磁阀/球阀,球阀同时下发角度+压力)
2 开启水泵(普通启停,不下发压力)
3 等待 3 分钟(灌溉)
4 开启灌区2
5 等待 5 秒(稳压,switch_stable_seconds) ← S-02/S-04 稳压等待
6 关闭灌区1
7 等待 3 分钟(灌溉)
8 开启灌区3
9 等待 5 秒(稳压)
10 关闭灌区2
11 等待 3 分钟(灌溉)
12 关闭水泵
13 关闭灌区3
示例 2:故障接续执行(泵已停,恢复后重建启动序列)
步骤 操作
---- ──────────────────────────
(故障前已完成)
✅ 开启灌区1 → 开启水泵 → 等待3分钟
(故障发生,安全关闭)
✅ 关闭水泵 → 关闭灌区1
(用户修复完成,调用 Resume API)
(系统检测到泵已停止,插入重建序列)
1 开启灌区1(重新开启,遵循 S-05) ← 先开灌区
2 设置水泵目标压力值 ← PUMP_UNIFIED/PUMP_ZONE 模式时插入
3 开启水泵 ← 再开水泵(S-05)
4 继续执行剩余灌溉(从中断处接续)
每次任务执行需保存完整日志,包括:
当任务发生异常(设备失败超过重试次数、超时、安全关闭等)时,触发多通道报警:
| 通道 | 说明 |
|---|---|
| 短信 | 向绑定手机号发送报警短信 |
| 电话 | 向绑定手机号拨打语音报警电话,适用于紧急告警 |
| 移动端推送 | 通过 APP 消息推送通知移动端用户 |
报警内容包括:
施肥机作为轮灌系统的组成部分,在灌溉过程中实现自动化施肥。是否施肥由任务是否选择了施肥机设备来决定,施肥参数在任务级统一配置(见 §2.3.2),所有灌区共用同一套施肥参数。施肥在灌溉时间窗口内并行执行,与主灌溉流程互不阻塞。设备组成见 §2.2.2。
注意:施肥机的主水泵与灌溉的水泵是同一个水泵,施肥机仅包含施肥泵和搅拌电机两个独立子设备。
施肥参数在创建任务时统一配置(见 §2.3.2),所有灌区共用同一套参数:
| 参数 | 类型 | 说明 |
|---|---|---|
| 施肥延迟时间 | 分钟 | 灌区开始灌溉后,等待多少分钟再启动搅拌电机(相对于本灌区灌溉开始时刻) |
| 搅拌提前时长 | 分钟 | 搅拌电机比施肥泵提前多少分钟启动;搅拌电机从启动后一直运行至施肥结束 |
| 施肥时长 | 分钟 | 施肥泵持续运行时长;与施肥量二选一,两者不可同时配置 |
| 施肥量 | 升 | 施肥目标容积,下发定量指令给施肥机,施肥机达到目标量后自动停止;与施肥时长二选一 |
约束:施肥延迟时间 + 搅拌提前时长 + 施肥时长 ≤ 灌区总灌溉时长。若超出,系统在保存时给出警告。施肥停止后灌溉继续进行至灌区时长结束,无需单独配置肥后清水时间。
以单个灌区为例,各设备的启停时序如下(灌区总时长 30 分钟,施肥延迟 3 分钟,搅拌提前 2 分钟,施肥时长 5 分钟):
灌区开始时刻(T=0)
├─ T+0: 水泵启动 + 灌区球阀/电磁阀开启(随灌溉开始)
├─ T+3: 搅拌电机启动(施肥延迟时间 = 3min)
├─ T+5: 施肥泵启动(T+3 + 搅拌提前 2min = T+5)
├─ T+10: 施肥泵停止 + 搅拌电机停止(T+5 + 施肥时长 5min = T+10)
├─ T+10 ~ T+30: 继续纯水灌溉(施肥已结束,灌溉继续)
└─ T+30: 灌区结束(关水泵、关阀门)
说明:施肥停止后灌溉自然继续至灌区时长结束,无需单独的“肥后清水时间”参数。
时间模式(配置施肥时长):
容量模式(配置施肥量):
搅拌停止规则:时间模式下,施肥泵停止时同步停止搅拌电机;容量模式下,搅拌电机由施肥机定量指令自行控制停止。但无论哪种模式,灌区结束时必须确保搅拌电机已关闭。
用户在创建任务时选择施肥机设备即启用施肥,施肥参数为任务级统一配置(见 §2.3.2),所有灌区共用同一套施肥参数。执行计划生成时:
实现方式:采用独立的施肥子任务(FertTask)模型,在
WAIT(IRRIGATE)节点执行时通过延迟 MQ 消息触发各子设备启停,与主链路并行运行、互不阻塞。技术方案见 §5.1.4。
| 异常类型 | 处理方式 |
|---|---|
| 施肥泵启动失败 | 停止搅拌电机,向主任务报告警告,灌溉继续 |
| 搅拌电机启动失败 | 停止施肥泵(防止未搅匀施肥),向主任务报告警告,灌溉继续 |
| 施肥过程整体异常 | 记录异常原因,本灌区施肥放弃,灌溉继续执行(不触发任务级 PAUSED) |
| 灌区结束时施肥未完成 | 强制关闭施肥泵和搅拌电机,确保安全 |
设计原则:施肥异常不中断灌溉主任务,二者相互独立,施肥失败仅触发告警,不影响灌区轮灌流程。灌溉水泵即施肥主水泵,水泵状态由灌溉主流程统一控制。
┌─────────────────────────────────────────────────────────────────┐
│ 触发层 │
│ ┌──────────────────────┐ ┌───────────────────────────────┐ │
│ │ Quartz 定时调度 │ │ 传感器联动规则引擎 │ │
│ │ (模式A: Cron表达式) │ │ (实时数据 → 规则比对) │ │
│ │ (模式B: Simple调度) │ │ (冷却时间: Redis TTL) │ │
│ └──────────┬───────────┘ └────────────────┬──────────────┘ │
└──────────────┼─────────────────────────────────┼─────────────────┘
│ 触发信号(发MQ消息) │
▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ 流程编排层 │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 执行计划生成(任务保存时预生成) │ │
│ │ 安全规则 → 有序节点列表 → JSON存储 │ │
│ └──────────────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────▼──────────────────────────────┐ │
│ │ 责任链节点执行器(NodeHandler Chain) │ │
│ │ OPEN_GROUP / CLOSE_GROUP / START_PUMP / STOP_PUMP │ │
│ │ SET_PUMP_PRESSURE / WAIT / ZONE_SWITCH_WAIT │ │
│ └──────────────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────▼──────────────────────────────┐ │
│ │ RocketMQ 延迟消息队列 │ │
│ │ CHECK消息(10s) / TIMEOUT兜底(30s) / 推进消息(Nmin) │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ MQTT指令下发
▼
┌─────────────────────────────────────────────────────────────────┐
│ 设备通信层 │
│ │
│ ┌─────────────────────┐ ┌──────────────────────────┐ │
│ │ EMQX MQTT Broker │◄──────►│ 电磁阀 / 球阀 / 水泵 │ │
│ │ (指令下发 + ACK) │ │ 施肥机(4G连接) │ │
│ └─────────────────────┘ └──────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 存储层 │
│ │
│ ┌──────────────┐ ┌────────────────┐ ┌────────────────────┐ │
│ │ MySQL │ │ Redis │ │ InfluxDB │ │
│ │ 执行计划JSON │ │ 幂等+ACK注册表 │ │ 传感器时序数据 │ │
│ │ 执行记录 │ │ 冷却时间TTL │ │ (可选) │ │
│ │ 报警记录 │ │ │ │ │ │
│ └──────────────┘ └────────────────┘ └────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 监控报警层 │
│ │
│ ┌──────────────┐ ┌─────────────┐ ┌────────────────────────┐ │
│ │ 短信 │ │ 电话 │ │ 移动端推送(APP) │ │
│ │ 报警通知 │ │ 语音报警 │ │ 报警通知 │ │
│ └──────────────┘ └─────────────┘ └────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
0 0 2 ? * MON,WED,FRIDoNothing,错过的任务不补跑(轮灌场景下错过就跳过)。SimpleScheduleBuilder 按间隔天数重复执行。executed_count),每次触发后自增。Job 只做触发,不做执行:
[Quartz Job 触发]
│
├─► 生成 task_execution 记录(status=PENDING)
│ 写入自动生成的 execution_plan JSON
│
├─► 发送 MQ 消息:TASK_START(含 execution_id)
│
└─► 立即返回(Job 线程释放,不做任何等待)
job-store-type: jdbc),支持集群。isClustered: true,自动ID分配。threadCount: 20,Job 触发后立即返回,线程不会被长占。withMisfireHandlingInstructionDoNothing(),服务重启后不补跑错过的任务。[传感器数据上报(MQTT)]
│
▼
[后端实时接收传感器数据]
│
▼
[查询关联此传感器的联动规则(从DB缓存)]
│
▼
[规则比对:设备ID + 比较符 + 阈值]
│
┌────┴────┐
满足条件 不满足
│ │
▼ └─► 忽略
[查询Redis: 冷却时间Key是否存在]
│
┌────┴────┐
Key存在 Key不存在(冷却结束)
│ │
▼ ▼
忽略 [写入冷却时间Key(SET NX + TTL=冷却时长)]
│
▼
[生成 task_execution 记录]
│
▼
[发送 MQ 消息:TASK_START]
linkage:cooldown:{rule_id}SET NX 写入,TTL = 冷却时长(分钟级别),Key 自然过期代表冷却结束。SET NX 原子操作保证并发场景下只允许一次触发成功。湿度 < 30 且 温度 > 35),可引入 Drools 规则引擎。系统在设计上明确将"用户表达的意图"与"系统实际执行的动作"分为三层,彻底解决配置顺序与执行顺序不一致的问题。
┌────────────────────────────────────────────────────────────────┐
│ 第一层:用户配置层(Intent Layer) │
│ 用户只表达"我要什么",不关心怎么做 │
│ 水泵类型+压力 / 灌区组选择+各灌区时长 │
└──────────────────────┬─────────────────────────────────────────┘
│ 任务保存 / 运行时动态调整时
▼
┌────────────────────────────────────────────────────────────────┐
│ 第二层:计划生成引擎(Plan Generator) │
│ 系统根据配置 + 安全规则,自动生成完整有序节点列表 │
│ 处理:先开后关 / 水泵开启时机 / 跳过灌区重算 │
└──────────────────────┬─────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────┐
│ 第三层:执行层(Execution Layer) │
│ 按物理节点列表顺序执行,支持暂停 / 恢复 / 跳过 / 动态重算 │
└────────────────────────────────────────────────────────────────┘
三层职责对照表:
| 用户配置的(意图) | 计划生成引擎转换为(物理执行节点) |
|---|---|
| "水泵恒压-统一,300kPa" | 自动在 Zone1 成功后插入 SET_PUMP_PRESSURE(300) → START_PUMP,仅下发一次 |
| "水泵恒压-分区" | Zone1 后插入 SET_PUMP_PRESSURE(zone1.压力) → START_PUMP,每次灌区切换时追加 SET_PUMP_PRESSURE(新压力) |
| "球阀配置了目标压力" | 压力参数随 OPEN_GROUP 下发给球阀,与水泵压力模式无关,两者独立生效 |
| "选择灌区组1→2→3,各配时长" | 自动展开为 OPEN(1)→START_PUMP→WAIT(时长1)→OPEN(2)→ZONE_SWITCH_WAIT→CLOSE(1)→WAIT(时长2)→...→STOP_PUMP→CLOSE(N),严格遵循先开后关 |
| "稳压等待 5 秒" | 在每个 OPEN_GROUP(ZoneI) 成功后、CLOSE_GROUP(ZoneI-1) 之前插入 ZONE_SWITCH_WAIT(5s) |
| "跳过灌区2(运行时)" | 截断后续节点,重新生成,安全衔接已开启灌区 |
| "人工修复后继续" | PAUSED 状态接受 Resume API,检测泵状态,自动插入 S-05 重建序列后接续执行 |
执行计划在任务保存时(而非运行时)根据安全规则自动计算并生成完整的有序节点列表,以 JSON 格式存储到 task_execution.execution_plan 字段。
运行时只按顺序逐个执行,不再做任何安全判断逻辑,保证执行路径的确定性和可追溯性。
输入:
灌溉组列表 [Zone1, Zone2, ... ZoneN] ← 任务中选择的灌溉组(§2.3.2)
水泵压力模式 NONE / PUMP_UNIFIED / PUMP_ZONE
统一目标压力值(PUMP_UNIFIED 模式)
每组灌溉时长 ← 任务级配置(§2.3.2),每个灌区可不同
switch_stable_seconds ← 灌区切换稳压等待秒数(任务级配置)
(球阀目标压力在灌溉组设备列表中配置,若有则随 OPEN_GROUP 下发)
输出节点列表:
1. OPEN_GROUP(Zone1) ← 开启第一个灌区(S-01 前置)
│ 球阀配置了目标压力时:同时下发角度+压力给球阀
2. [SET_PUMP_PRESSURE(压力值)] ← PUMP_UNIFIED/PUMP_ZONE 模式插入;NONE 模式不插入
3. START_PUMP ← 开启水泵(S-01:灌区开启后)
循环 i = 2 到 N:
WAIT(灌溉时长)
OPEN_GROUP(ZoneI) ← 先开下一个灌区(S-02/S-04)
[SET_PUMP_PRESSURE(ZoneI.压力值)] ← 仅 PUMP_ZONE 模式:灌区切换时调整水泵压力
ZONE_SWITCH_WAIT(switch_stable_seconds) ← 稳压等待(S-02,防止压力突变)
CLOSE_GROUP(ZoneI-1) ← 再关上一个灌区(S-02:稳压后才关)
最后灌区结束:
WAIT(灌溉时长)
STOP_PUMP ← 关水泵(S-03)
CLOSE_GROUP(ZoneN) ← 最后关最后一个灌区(S-03/S-04)
只有一个灌区:
index=0 OPEN_GROUP 灌区1
index=1 SET_PUMP_PRESSURE (PUMP_UNIFIED/PUMP_ZONE 模式)
index=2 START_PUMP
index=3 WAIT N分钟
index=4 STOP_PUMP
index=5 CLOSE_GROUP 灌区1
→ 没有中间的先开后关逻辑,直接安全关闭
每个灌区等待时长不同:
灌区1等3分钟,灌区2等5分钟,灌区3等10分钟,稳压等待5秒
index=0 OPEN_GROUP 灌区1
index=1 START_PUMP
index=2 WAIT 3分钟 ← 灌区1的灌溉等待
index=3 OPEN_GROUP 灌区2
index=4 ZONE_SWITCH_WAIT 5秒 ← 稳压等待(S-02)
index=5 CLOSE_GROUP 灌区1
index=6 WAIT 5分钟 ← 灌区2的灌溉等待
index=7 OPEN_GROUP 灌区3
index=8 ZONE_SWITCH_WAIT 5秒 ← 稳压等待
index=9 CLOSE_GROUP 灌区2
index=10 WAIT 10分钟 ← 灌区3的灌溉等待
index=11 STOP_PUMP
index=12 CLOSE_GROUP 灌区3
→ ZONE_SWITCH_WAIT 确保两个灌区同时开启的时间窗口内压力趋于平稳后再关旧灌区
本节设计施肥机在轮灌执行过程中的控制方案,包括子任务模型、状态机、触发流程、数据结构及安全约束。
施肥机的两个子设备(搅拌电机、施肥泵)的启停时序完全在单个灌区时间窗口内发生,与主执行计划的节点链路平行运行,互不阻塞。灌溉水泵即施肥主水泵,由灌溉主流程统一控制,施肥子任务不单独管理水泵。
设计采用施肥子任务(FertTask)模型:
主流程节点链路(顺序执行):
OPEN_GROUP(Z1) → START_PUMP → WAIT(30min) → ... → STOP_PUMP → CLOSE_GROUP(Z1)
施肥子任务(并行,由延迟 MQ 驱动):
T+3min → 搅拌电机 ON (延迟MQ:延迟=施肥延迟时间)
T+5min → 施肥泵 ON (延迟MQ:延迟=施肥延迟+搅拌提前)
T+10min→ 施肥泵 OFF + 搅拌电机 OFF (延迟MQ 或 施肥机自动停止)
主流程的 WAIT(30min) 节点执行时,同步向 MQ 发送一条带参数的施肥调度消息,由施肥子任务处理器接管后续的子设备控制,不影响主链路推进。
FertTask 状态:
PENDING → STIR_WAITING → STIRRING → FERT_RUNNING → DONE
│ │ │
│ │ └─ 施肥泵 ON,等待完成
│ └─ 搅拌电机 ON,等待施肥泵启动
└─ 等待施肥延迟时间到
FAILED(任意子设备失败时进入,不影响主任务)
| 状态 | 说明 | 下一状态 |
|---|---|---|
PENDING |
已创建,等待施肥延迟时间到 | STIR_WAITING → 发延迟MQ |
STIR_WAITING |
发出搅拌电机启动指令,等待ACK | STIRRING |
STIRRING |
搅拌中,等待搅拌提前时长到 | FERT_RUNNING → 发延迟MQ |
FERT_RUNNING |
施肥泵运行中,等待时长/容量达到 | DONE |
DONE |
施肥子任务成功完成 | — |
FAILED |
子设备故障,施肥放弃,主任务继续 | — |
[主流程执行 WAIT(IRRIGATE) 节点]
│
├─► 发主链路延迟MQ(N分钟后推进主节点)
│
└─ 检查任务是否选择了施肥机设备?
│ 是
▼
[创建 FertTask 记录,status = PENDING]
[写入 fert_task 表:绑定 execution_id + zone_id]
│
▼
[发施肥延迟MQ]
消息类型:FERT_STIR_START
延迟时间:施肥延迟时间(分钟)
│
─── 施肥延迟时间后 ───────────────
│
[FERT_STIR_START 消息到达]
│
▼
[下发搅拌电机 ON 指令,等待ACK]
├─ ACK成功 → status = STIRRING
│ └─ 发 FERT_PUMP_START 延迟MQ(延迟=搅拌提前时长)
└─ ACK失败 → status = FAILED,发告警,不影响主任务
─── 搅拌提前时长后 ───────────────
│
[FERT_PUMP_START 消息到达]
│
▼
[下发施肥泵 ON 指令,等待ACK]
├─ ACK成功 → status = FERT_RUNNING
│ ├─ [时间模式] 发 FERT_PUMP_STOP 延迟MQ(延迟=施肥时长)
│ └─ [容量模式] 下发定量施肥指令给施肥机(含目标施肥量参数)
└─ ACK失败 → 关搅拌电机 → status = FAILED,发告警
─── 时间模式:施肥时长后 ─────────
│
[FERT_PUMP_STOP 消息到达]
│
▼
[下发施肥泵 OFF 指令]
[下发搅拌电机 OFF 指令](同步停止)
[status = DONE]
─── 容量模式:施肥机自动停止 ─────
│
施肥机达到目标量后自动停止施肥泵
搅拌电机由施肥机定量控制指令自行控制停止
系统无需主动发送停止指令
[status = DONE]
─── 灌区结束 / 灌区切换(两种模式共用)───
│
[主 WAIT 节点完成 或 灌区切换]
│
▼
[检查 FertTask.status]
├─ DONE → 已正常完成,灌区切换时额外发送关闭施肥机指令(安全兜底)
└─ FERT_RUNNING → 施肥未完成
→ 发送关闭施肥机指令(施肥泵 OFF + 搅拌电机 OFF)
→ 记录实际施肥信息
→ status = DONE(标记未完成目标)
→ 发送告警
容量模式(status = FERT_RUNNING 期间):
系统下发定量施肥指令给施肥机(含目标施肥量参数)
施肥机内部通过流量计监控,达到目标量后自动停止施肥泵
搅拌电机的停止由施肥机定量控制指令自行控制
灌区灌溉结束时(主 WAIT 节点完成):
FertTask.status 仍为 FERT_RUNNING?
→ 未达到目标量,发送关闭施肥机指令(施肥泵 OFF + 搅拌电机 OFF)
→ 记录实际施肥量到 fert_task.actual_amount
→ status = DONE(标记未完成目标)
→ 发送告警:本次施肥未达目标量
灌区切换时:
→ 系统额外发送关闭施肥机指令,确保安全性
FertTask {
Long id;
Long executionId; // 关联主任务执行实例
Long zoneId; // 绑定的灌区ID
// 施肥参数(从任务级统一配置冗余存储,防止任务修改影响执行记录)
Integer delayMinutes; // 施肥延迟时间
Integer preStirMinutes; // 搅拌提前时长
Integer fertDurationMinutes; // 施肥时长(时间模式)
Integer fertTargetLiters; // 施肥目标量(容量模式,null=时间模式)
// 执行状态
FertStatus status; // PENDING/STIR_WAITING/STIRRING/FERT_RUNNING/DONE/FAILED
Integer actualFertSeconds; // 实际施肥时长
Integer actualLiters; // 实际施肥量(容量模式)
String failReason; // 失败原因
// 时间戳
DateTime stirStartAt; // 搅拌电机实际启动时间
DateTime pumpStartAt; // 施肥泵实际启动时间
DateTime pumpStopAt; // 施肥泵实际停止时间
DateTime doneAt; // 子任务完成时间
}
施肥参数在创建任务时按任务级统一配置(见 §2.3.2),是否施肥由任务是否选择了施肥机设备来决定,所有灌区共用同一套施肥参数:
任务灌区配置示例(任务已选择施肥机,施肥参数:延迟3min,搅拌提前2min,施肥时长5min):
灌区1(Z1):灌溉时长 30min → 创建 FertTask(时间模式,共用施肥参数)
灌区2(Z2):灌溉时长 20min → 创建 FertTask(时间模式,共用施肥参数)
灌区3(Z3):灌溉时长 25min → 创建 FertTask(时间模式,共用施肥参数)
执行时:
执行 Z1 的 WAIT 节点 → 创建 FertTask(共用施肥参数)
执行 Z2 的 WAIT 节点 → 创建 FertTask(共用施肥参数)
执行 Z3 的 WAIT 节点 → 创建 FertTask(共用施肥参数)
| 约束 | 说明 |
|---|---|
| 施肥子任务不阻塞主链路 | FertTask 失败时,主任务继续;主任务可以无视 FertTask 状态 |
| 灌区结束时强制停止所有施肥设备 | 无论 FertTask 处于什么状态,灌区 WAIT 完成时必须关停施肥泵和搅拌电机 |
| 搅拌先于施肥泵关闭 | 施肥泵停止后,搅拌电机同步停止(防止残肥沉积) |
| 灌区切换时发送关闭指令 | 容量模式下灌区切换时额外发送关闭施肥机指令,确保安全 |
计划生成引擎是系统安全性的核心保障,负责将用户配置的"灌溉意图"转换为"安全有序的物理执行节点列表"。本节详细描述引擎的内部架构、Pipeline 各阶段处理逻辑及调用时机。
引擎在以下三种场景下被调用,每次调用均输出完整或增量的节点列表:
| 调用场景 | 触发时机 | 输出 |
|---|---|---|
| 全量生成 | 任务首次保存 / 任务配置修改 | 完整的 execution_plan.nodes |
| 跳过重算 | 运行时跳过某灌区(5.4 节) | 截断当前 index 后的 PENDING 节点,重新生成后续节点追加 |
| 故障重建 | Resume API 触发(5.5.4 节) | 在 current_index 前插入启动重建序列节点 |
PlanGeneratorInput {
// 基础信息
Long taskId;
Long executionId; // 执行实例ID(全量生成时为新ID)
// 灌区配置(有序列表,顺序即灌溉意图顺序)
// ZoneConfig 合并灌溉组模板(zoneName, zonePressureKpa, deviceList)与任务级配置(durationSeconds)
List<ZoneConfig> zones; // 每个灌区含:zoneId, zoneName, durationSeconds(任务级), zonePressureKpa(模板), deviceList(模板)
// 施肥配置(任务级统一,所有灌区共用;为null表示不施肥)
FertConfig fertConfig; // 含:delayMinutes, preStirMinutes, fertDurationMinutes/fertTargetLiters
Long fertPumpDeviceId; // 施肥泵设备ID(选择了施肥机时有值,否则null)
Long stirDeviceId; // 搅拌电机设备ID(选择了施肥机时有值,否则null)
// 压力控制配置
PressureMode pressureMode; // NONE / PUMP_UNIFIED / PUMP_ZONE
Integer targetPressureKpa; // 仅 PUMP_UNIFIED 模式有值
// 球阀目标压力在各灌溉组的设备列表中配置,若有则随 OPEN_GROUP 下发,与水泵压力模式无关
// 安全参数
Integer switchStableSeconds; // 灌区切换稳压等待时间
// 运行时增量参数(跳过重算 / 故障重建时使用)
GenerateMode mode; // FULL / SKIP_REBUILD / RESUME_REBUILD
Integer fromIndex; // 增量生成时的起始节点索引
Long currentOpenZoneId; // 当前已开启但未关闭的灌区ID(增量时用于衔接)
List<Long> skipZoneIds; // 运行时已跳过的灌区ID列表
}
PlanGeneratorOutput {
List<ExecutionNode> nodes; // 生成的节点列表(全量或增量)
Integer startIndex; // 节点在完整计划中的起始 index
Integer estimatedSeconds; // 本批节点预计总耗时(用于前端展示预估完成时间)
}
引擎内部按固定顺序执行五个阶段,每个阶段向节点列表中插入对应节点:
输入(PlanGeneratorInput)
│
▼
┌──────────────────────────────────────────────────────┐
│ Phase 1:输入校验(Validator) │
│ - 灌区列表不能为空 │
│ - switch_stable_seconds ≥ 0 │
│ - PUMP_UNIFIED 模式必须有 targetPressureKpa │
│ - PUMP_ZONE 模式每个灌区必须有 zonePressureKpa │
│ - NONE 模式无额外压力校验 │
│ - 跳过重算时 currentOpenZoneId 必须不为空 │
└──────────────────────┬───────────────────────────────┘
│ 校验通过
▼
┌──────────────────────────────────────────────────────┐
│ Phase 2:首灌区启动序列生成(StartupBuilder) │
│ │
│ 输出节点(按 S-01 规则): │
│ OPEN_GROUP(Zone1) │
│ ← 球阀配置了目标压力时,压力参数随球阀指令下发 │
│ [SET_PUMP_PRESSURE] ← PUMP_UNIFIED/PUMP_ZONE 模式 │
│ START_PUMP │
│ NONE 模式: │
│ OPEN_GROUP(Zone1) │
│ START_PUMP ← 不插入 SET_PUMP_PRESSURE │
│ │
│ 跳过重算(SKIP_REBUILD): │
│ 直接跳过此阶段(接续之前的启动状态) │
│ 故障重建(RESUME_REBUILD): │
│ 生成 S-05 重建启动序列(先开灌区→再开泵) │
└──────────────────────┬───────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ Phase 3:灌区轮转序列展开(ZoneRotationExpander) │
│ │
│ 输入:有效灌区列表(去掉 skipZoneIds), │
│ 当前已开启灌区(增量模式时用于衔接) │
│ │
│ 对每个灌区(从第2个开始,或增量模式从下一个灌区开始): │
│ 追加: WAIT(zone[i-1].durationSeconds) │
│ 追加: OPEN_GROUP(zone[i]) ← S-04 先开 │
│ 追加: [SET_PUMP_PRESSURE(zone[i].pressure)] │
│ ← 仅 PUMP_ZONE 模式:灌区切换时调整压力 │
│ 追加: ZONE_SWITCH_WAIT(switchStableSeconds) ← S-02 │
│ 追加: CLOSE_GROUP(zone[i-1]) ← S-04 后关 │
│ │
│ 最后一个灌区额外追加: │
│ 追加: WAIT(zone[N].durationSeconds) │
└──────────────────────┬───────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ Phase 4:正常结束序列追加(SafeShutdownBuilder) │
│ │
│ 遵循 S-03(正常结束顺序): │
│ STOP_PUMP │
│ CLOSE_GROUP(ZoneN) ← 最后一个灌区(S-04 最后关) │
│ │
│ 注意:此处为正常结束流程的计划生成,异常安全关闭见§6.4 │
│ 异常时只关水泵+施肥设备,阀门保持当前状态 │
└──────────────────────┬───────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ Phase 5:节点索引分配与元数据填充(NodeIndexer) │
│ │
│ 对所有节点: │
│ 分配连续 index(全量从0开始,增量从 fromIndex 开始) │
│ 填充 nodeName(用于日志和报警展示) │
│ 填充 maxRetry(ACK类节点=3,WAIT类节点=0) │
│ 填充 params.source(IRRIGATE/ZONE_SWITCH/REBUILD) │
│ 初始化 status = PENDING │
│ 初始化 devices(从 DB 查关联设备信息填入) │
│ │
│ 施肥标注(当 fertConfig 不为 null 时): │
│ 对每个 WAIT(source=IRRIGATE) 节点: │
│ 标注 params.fertEnabled = true │
│ 标注 params.fertPumpDeviceId / stirDeviceId │
│ 运行时执行引擎据此创建 FertTask 并发送施肥MQ消息 │
└──────────────────────┬───────────────────────────────┘
│
▼
输出(PlanGeneratorOutput)
| Phase | 全量生成(FULL) | 跳过重算(SKIP_REBUILD) | 故障重建(RESUME_REBUILD) |
|---|---|---|---|
| Phase 1 校验 | 完整校验 | 额外校验 currentOpenZoneId |
额外校验泵状态 |
| Phase 2 首启动序列 | ✅ 生成 Zone1 启动 | ❌ 跳过(接续之前的启动状态) | ✅ 生成重建启动序列(S-05) |
| Phase 3 灌区展开 | 从 Zone2 开始展开 | 从跳过后的下一个灌区开始,以 currentOpenZoneId 为前驱 |
从当前灌区接续,跳过已完成灌区 |
| Phase 4 安全关闭 | ✅ 追加 | ✅ 追加 | ✅ 追加 |
| Phase 5 索引分配 | 从 index=0 | 从 fromIndex 续编 |
插入到 current_index 之前,已有节点 index 不变 |
@Component
public class PlanGenerator {
// 五个 Phase 处理器,按顺序注入
private final InputValidator validator;
private final StartupBuilder startupBuilder;
private final ZoneRotationExpander rotationExpander;
private final SafeShutdownBuilder shutdownBuilder;
private final NodeIndexer indexer;
public PlanGeneratorOutput generate(PlanGeneratorInput input) {
// Phase 1
validator.validate(input);
List<ExecutionNode> nodes = new ArrayList<>();
// Phase 2
if (input.getMode() != SKIP_REBUILD) {
nodes.addAll(startupBuilder.build(input));
}
// Phase 3
nodes.addAll(rotationExpander.expand(input, nodes));
// Phase 4
nodes.addAll(shutdownBuilder.build(input, nodes));
// Phase 5
return indexer.assign(nodes, input);
}
}
每个 Phase 在构建节点前都内嵌对应安全规则检查,违反则抛出异常(不生成计划):
| Phase | 检查点 | 违反时行为 |
|---|---|---|
| Phase 2 | START_PUMP 前必须已有 OPEN_GROUP(S-01);PUMP_UNIFIED/PUMP_ZONE 模式必须有 SET_PUMP_PRESSURE;NONE 模式不插入 SET_PUMP_PRESSURE | 抛出 SafetyViolationException |
| Phase 3 | CLOSE_GROUP 前必须已有对应的 OPEN_GROUP 且中间有 ZONE_SWITCH_WAIT(S-02) | 抛出 SafetyViolationException |
| Phase 4 | CLOSE_GROUP(ZoneN) 必须在 STOP_PUMP 之后(S-03) | 抛出 SafetyViolationException |
设计意图:安全检查在生成时一次性验证,运行时执行引擎无需重复校验,保证执行路径的确定性。
NodeHandler 执行,执行只做下发指令,立即返回,不等待设备响应。[MQ消息到达: TASK_START / NEXT_NODE]
│
▼
[从DB加载 task_execution(含 execution_plan JSON)]
│
▼
[读取 current_index,取当前节点]
│
▼
[查找对应 NodeHandler(按节点类型路由)]
│
▼
[NodeHandler.execute()]
│ 下发 MQTT 指令给所有关联设备
│ 在 Redis 注册 ACK 期望列表(ACK 注册表)
│ 立即返回(不等待设备响应)
│
▼
[发送两条延迟MQ消息]
├─► 消息A: CHECK_ACK(10秒后)
└─► 消息B: ACK_TIMEOUT(30秒后,兜底)
│
▼
[线程释放,等待延迟消息到达]
─────── 10秒后 ──────────────────────────────────────────
[MQ消息到达: CHECK_ACK]
│
▼
[查询 Redis ACK 注册表,检查所有设备状态]
│
┌─────┴──────────────┐
全部成功 有失败 / 未全回
│ │
▼ ▼
[更新节点状态=SUCCESS] [触发重试逻辑(见第6章)]
[发送延迟MQ消息] │
NEXT_NODE(N分钟后) [或5秒后再次CHECK]
│
▼
[线程释放]
─────── N分钟后 ─────────────────────────────────────────
[MQ消息到达: NEXT_NODE]
│
▼
[current_index + 1,更新DB(乐观锁)]
│
▼
[index 超出节点列表长度?]
│ │
否 是
│ │
▼ ▼
[取下一节点] [status = SUCCESS]
[重复执行] [记录完成时间]
WAIT 节点不下发任何设备指令,仅发送一条延迟 MQ 消息(延迟时长 = 配置的灌溉等待时长),消息到达后直接推进到下一节点。线程不做任何等待。
施肥子任务触发:当 WAIT 节点类型为
IRRIGATE(灌区灌溉等待)且任务选择了施肥机设备时,WAIT 节点执行时还会同步创建 FertTask 记录并发送施肥延迟 MQ 消息,由施肥子任务处理器接管后续子设备控制,不阻塞主链路。详见 §5.1.4.3。
Thread.sleep方案:
[线程]═══════════════════════════════════════(占用3小时)
责任链+延迟MQ方案:
[线程]═[释放] [线程]═[释放] [线程]═[释放]
↑延迟消息触发↑ ↑延迟消息触发↑
每次只占用毫秒级线程,等待期间零占用
| 节点类型 | 说明 | 关联设备 | 是否需要 ACK |
|---|---|---|---|
OPEN_GROUP |
开启灌溉组(电磁阀开 + 球阀到目标角度;球阀配置了目标压力时同时下发压力指令) | 电磁阀、球阀 | 是 |
CLOSE_GROUP |
关闭灌溉组(电磁阀关 + 球阀归零) | 电磁阀、球阀 | 是 |
START_PUMP |
启动水泵 | 水泵 | 是 |
STOP_PUMP |
关闭水泵 | 水泵 | 是 |
SET_PUMP_PRESSURE |
设置变频恒压水泵目标压力(PUMP_UNIFIED 首次下发,PUMP_ZONE 每次灌区切换时下发;NONE 模式不插入此节点) | 水泵 | 是 |
WAIT |
灌溉等待指定时长(灌区灌溉中) | 无 | 否 |
ZONE_SWITCH_WAIT |
灌区切换稳压等待(switch_stable_seconds),在 OPEN 后、CLOSE 前执行,防止压力突变 |
无 | 否 |
施肥消息类型:施肥子任务通过独立的 MQ 延迟消息驱动,不在主节点链路中,因此不作为主链路节点类型。施肥相关的 MQ 消息类型定义如下:
| 施肥消息类型 | 说明 | 触发时机 |
|---|---|---|
FERT_STIR_START |
搅拌电机启动 | 施肥延迟时间到达后触发 |
FERT_PUMP_START |
施肥泵启动 | 搅拌提前时长到达后触发 |
FERT_PUMP_STOP |
施肥泵+搅拌电机停止(仅时间模式) | 施肥时长到达后触发 |
FERT_FORCE_STOP |
强制关闭施肥设备(灌区结束/切换时兜底) | 灌区 WAIT 完成或灌区切换时触发 |
NodeHandler 接口:
├── getType() 返回节点类型(用于路由匹配)
├── execute(context) 执行本节点动作(发MQTT指令,立即返回)
├── checkResult() 检查执行结果(查Redis ACK状态)
└── onFailure() 失败处理(记录日志、触发重试或终止)
系统采用策略模式 + 节点类型路由的扩展设计:
NodeHandler 实现类。handle(context) 方法。NodeHandler未来可扩展的节点类型:
| 节点类型 | 说明 |
|---|---|
WEATHER_CHECK |
天气检查节点(恶劣天气自动终止) |
SOIL_SENSOR |
土壤传感器检查节点(湿度达标跳过) |
NOTIFY |
通知节点(流程中间发通知) |
MANUAL_CONFIRM |
人工确认节点(等待人工确认继续) |
当某个灌区的设备开启失败且用户决定跳过时,不能简单标记该节点为跳过,因为先开后关的拓扑关系需要重新计算。系统采用截断重算方案:
在 execution_plan JSON 根节点新增两个字段,用于支持运行时调整:
{
"zone_sequence": [1, 2, 3],
"skip_zones": [],
"nodes": [ ... ]
}
| 字段 | 说明 |
|---|---|
zone_sequence |
当前有效的灌区执行顺序(逻辑顺序,运行时可调整) |
skip_zones |
运行时被跳过的灌区 ID 列表 |
nodes |
物理执行节点列表(由生成引擎根据 zone_sequence 生成) |
[灌区 N 开启失败,用户/系统决定跳过]
│
▼
1. 将 zone_n 写入 skip_zones 列表
│
▼
2. 截断 current_index 之后所有 PENDING 节点
│
▼
3. 以【剩余灌区(去掉 skip_zones)】为输入,
以【当前已开启但未关闭的灌区】为"前一个灌区",
调用 Plan Generator 重新生成后续节点
│
▼
4. 将新节点追加到 execution_plan.nodes 中
│
▼
5. 更新 current_index,继续执行
(先开后关的安全逻辑在重新生成时自动保证)
示例(跳过灌区2):
原始计划:
OPEN(Z1) → START_PUMP → WAIT(10) → OPEN(Z2) → CLOSE(Z1)
→ WAIT(10) → OPEN(Z3) → CLOSE(Z2) → WAIT(10) → STOP_PUMP → CLOSE(Z3)
灌区2开启失败,跳过:
✅ OPEN(Z1) → START_PUMP → WAIT(10)
❌ OPEN(Z2) 失败 → 跳过,截断后续节点
重新生成:→ OPEN(Z3) → CLOSE(Z1) ← 正确衔接已开启的 Zone1
→ WAIT(10) → STOP_PUMP → CLOSE(Z3)
新增 PAUSED 状态,用于区分"可接续的暂停"与"不可恢复的终止":
| 状态 | 含义 | 后续操作 |
|---|---|---|
FAILED |
硬件故障,不可恢复,触发安全关闭 | 无,任务结束 |
PAUSED |
网络/超时类重试耗尽,等待人工介入 | 调用 Resume API 接续执行 |
CANCELLED |
人工主动放弃 | 触发安全关闭 |
重试策略判断(见 6.3):
失败类型为"网络/超时" 且 retryCount >= 3:
→ status = PAUSED
→ 发送多通道报警(提示人工检查设备后接续)
→ 停止发送任何新的 MQ 消息(流程冻结)
→ 看门狗不再 recover(PAUSED 是预期状态)
失败类型为"硬件故障":
→ status = FAILED
→ 触发安全关闭流程
POST /task-execution/{id}/resume
Body:
{
"action": "RETRY_CURRENT" | "SKIP_CURRENT"
}
| Action | 行为 |
|---|---|
RETRY_CURRENT |
重置当前节点状态为 PENDING,重置失败设备 ACK 为 PENDING,重新下发 MQTT 指令,流程继续 |
SKIP_CURRENT |
若当前节点为 OPEN_GROUP,触发 5.4 的跳过流程;否则直接标记当前节点为 SKIPPED,推进到下一节点 |
接续流程:
[人工修复设备完成]
│
▼
[调用 Resume API(action=RETRY_CURRENT)]
│
▼
[校验 status = PAUSED,否则拒绝]
│
▼
[检查安全关闭记录:泵是否已停止?]
├─ 已停止 ──► [触发 5.5.4 故障接续启动序列重建]
└─ 未停止 ──► [直接重置当前节点,继续执行]
│
▼
[status = RUNNING]
[流程从重建序列(或当前节点)继续执行]
当任务从 PAUSED 恢复时,若安全关闭流程已停止水泵,直接继续原节点会导致无泵运行灌区的危险状态。系统必须在继续执行前插入重建序列。
重建序列生成规则:
检查执行计划中已执行的 STOP_PUMP(安全关闭节点):
如果水泵已停止:
→ 检查当前应灌的灌区(从 zone_sequence 和 skip_zones 推算)
→ 生成重建序列:
OPEN_GROUP(当前灌区) ← S-05:幂等确认灌区已开启(安全关闭未关阀门,灌区仍开启)
[SET_PUMP_PRESSURE(压力值)] ← PUMP_UNIFIED/PUMP_ZONE 模式
START_PUMP ← S-05:再开水泵
→ 将重建序列插入 execution_plan.nodes 中 current_index 之前
→ 更新 current_index 指向重建序列首节点
如果水泵未停止(仅部分设备故障):
→ 直接重置当前节点,不插入重建序列
示例(灌区1灌溉中,故障停泵后恢复):
原执行计划(执行到 index=3 的 WAIT 时发生故障):
[0] OPEN_GROUP(Z1) ✅
[1] START_PUMP ✅
[2] WAIT(10min) ← 执行中故障
...
安全关闭(仅关水泵+施肥设备,阀门保持当前状态,见§6.4):
[SC-1] STOP_PUMP ✅
(灌区1的电磁阀/球阀仍处于开启状态)
用户 Resume,系统插入重建序列:
[R-0] OPEN_GROUP(Z1) ← 幂等确认灌区已开启(S-05)
[R-1] SET_PUMP_PRESSURE ← 设置压力(PUMP_UNIFIED/PUMP_ZONE 模式)
[R-2] START_PUMP ← 再开水泵(S-05)
[2'] WAIT(剩余灌溉时间) ← 继续灌区1的剩余灌溉时间
[3] OPEN_GROUP(Z2)
[4] ZONE_SWITCH_WAIT(5s)
[5] CLOSE_GROUP(Z1)
...(后续节点不变)
用户可主动取消正在运行或暂停中的任务,触发安全关闭流程。
POST /task-execution/{id}/cancel
Body:(无参数)
取消流程:
[用户调用 Cancel API]
│
▼
[校验 status = RUNNING 或 PAUSED,否则拒绝]
│
▼
[status = CANCELLED]
│
▼
[触发安全关闭流程(见 §6.4)]
→ 停止水泵 → 关闭施肥设备(阀门保持当前状态)
│
▼
[清理 Redis 运行标记(task:running:{task_id})]
[发送通知:任务已被手动取消]
与 FAILED 的区别:CANCELLED 是用户主动行为,FAILED 是系统判定不可恢复。两者均触发安全关闭,但报警内容和日志类型不同。
执行引擎工作在"物理节点"层次,按顺序逐个执行 OPEN_GROUP、START_PUMP、WAIT、CLOSE_GROUP 等节点。但客户需要看到的是"灌区"层次的进度:现在在灌哪个灌区、已经灌了多久、有没有出错、出错在哪里。
两层之间存在以下核心矛盾:
| 维度 | 物理节点层(执行引擎视图) | 灌区层(客户视图) |
|---|---|---|
| 粒度 | 每个 MQTT 指令为一个节点 | 灌区为单位 |
| 顺序 | 严格线性,含安全插入节点 | 灌区列表顺序 |
| 归属 | START_PUMP、SET_PUMP_PRESSURE 无灌区归属 |
需映射到某灌区 |
| 施肥 | 并行 FertTask,不在主链路 | 作为灌区子状态展示 |
| 失败 | 记录失败节点 index | 需翻译为灌区+失败原因 |
解决方案总览:
物理节点层(execution_plan.nodes)
OPEN_GROUP(Z1) → SET_PUMP_PRESSURE → START_PUMP → WAIT(Z1) → OPEN_GROUP(Z2)
→ ZONE_SWITCH_WAIT → CLOSE_GROUP(Z1) → WAIT(Z2) → ... → STOP_PUMP → CLOSE_GROUP(ZN)
│
│ ZonePlanInterpreter(§5.7)
▼
灌区视图层(zone_logs + 进度 API)
[灌区1: SUCCESS, 实灌3min] [灌区2: IRRIGATING, 已灌1m20s] [灌区3: PENDING]
Phase 5(NodeIndexer)生成节点时,为每个节点增加两个元数据字段,供解析器反向推导灌区视图:
| 节点类型 | zoneIndex 归属 | zonePhase |
|---|---|---|
OPEN_GROUP(Z0)(首个灌区启动) |
0 | STARTING |
SET_PUMP_PRESSURE(Zone1 首次启动后) |
0 | STARTING |
START_PUMP(首次启动) |
0 | STARTING |
WAIT(source=IRRIGATE)(ZI 灌溉等待) |
I | IRRIGATING |
OPEN_GROUP(ZI)(灌区切换时,I≥1) |
I | STARTING |
SET_PUMP_PRESSURE(PUMP_ZONE 切换时) |
I | STARTING |
ZONE_SWITCH_WAIT(ZI-1 → ZI 切换) |
I-1 | SWITCHING |
CLOSE_GROUP(ZI)(非最后灌区) |
I | SWITCHING |
STOP_PUMP(最后灌区结束) |
N-1 | STOPPING |
CLOSE_GROUP(ZN)(最后灌区) |
N-1 | STOPPING |
说明:
zoneIndex是zone_sequence数组的位置(0-based),而非灌区ID。ZONE_SWITCH_WAIT归属 ZI-1,因其语义是"上一灌区切换出去的稳压过渡期",在客户视图中属于上一灌区的最后阶段。
客户视图中每个灌区经历以下阶段,由解析器(§5.7)根据 current_index 对应节点的 zonePhase 字段推导:
| 阶段 | 说明 | 包含的物理节点 | 客户展示文案 |
|---|---|---|---|
PENDING |
未开始 | 无 | 等待中 |
STARTING |
启动中(开阀、启泵) | OPEN_GROUP + SET_PUMP_PRESSURE + START_PUMP |
开启灌区中… |
IRRIGATING |
灌溉中(含施肥子状态) | WAIT(IRRIGATE) |
灌溉中 X分Y秒 |
SWITCHING |
切换中(稳压过渡,属于本灌区收尾) | ZONE_SWITCH_WAIT + CLOSE_GROUP(非末区) |
切换至下一灌区… |
STOPPING |
结束关闭中 | STOP_PUMP + CLOSE_GROUP(ZN) |
正在关闭… |
SUCCESS |
本灌区正常完成 | — | ✅ 已完成,实灌 Xmin |
FAILED |
本灌区失败 | — | ❌ 失败:[原因] |
SKIPPED |
被跳过(§5.4) | — | ⏭ 已跳过 |
施肥机以 FertTask 并行运行,不在主节点链路中,因此不作为独立灌区阶段展示,而是作为灌区 IRRIGATING 阶段的子状态:
灌区2 ── IRRIGATING(已灌溉 1分20秒 / 3分钟)
└─ 施肥子状态: FERT_RUNNING(施肥泵运行中)
FertTask 处理器每次状态变更时,通过以下路径将施肥状态回写到 zone_logs:
FertTask 状态变更
│
▼
[查找 zone_logs 中 zoneId 匹配的记录]
│
▼
[更新 zone_logs[i].fertPhase]
[更新 zone_logs[i].stirStartAt / pumpStartAt / pumpStopAt]
│
▼
[UPDATE task_execution.execution_plan JSON(乐观锁)]
全局节点(START_PUMP/STOP_PUMP/SET_PUMP_PRESSURE)失败时,按以下规则归入对应灌区的失败记录:
| 失败节点 | 归属 zoneIndex | failPhase | 处理策略 |
|---|---|---|---|
START_PUMP 失败 |
0(第一灌区) | STARTING |
触发安全关闭,任务 FAILED |
SET_PUMP_PRESSURE 失败 |
当前节点 zoneIndex | STARTING |
写入警告,灌溉继续(不阻断主流程) |
STOP_PUMP 失败 |
N-1(最后灌区) | STOPPING |
记录报警,尽力而为(§6.4.2) |
| 施肥泵/搅拌电机失败 | FertTask 绑定灌区 | IRRIGATING |
写入 fertFailReason,不影响主流程 |
SET_PUMP_PRESSURE失败策略:压力设置失败不中断灌溉,仅触发告警,在zone_logs对应记录写入pumpPressureWarning字段。灌溉可在非目标压力下继续完成,避免辅助节点失败导致任务整体中止。
GET /task-execution/{id}/progress
响应:
{
"executionId": 10001,
"taskName": "农场A区轮灌",
"status": "RUNNING",
"currentNodeIndex": 7,
"currentNodeType": "WAIT",
// 灌区层进度(由 ZonePlanInterpreter 推导,见 §5.7)
"totalZones": 3,
"currentZoneIndex": 1,
"currentZoneName": "灌区2",
"currentZonePhase": "IRRIGATING",
"irrigationElapsedSeconds": 80,
"irrigationTotalSeconds": 180,
// 施肥子状态(仅 IRRIGATING 阶段有值)
"currentFertPhase": "FERT_RUNNING",
// 各灌区摘要(与客户配置表单的灌区列表一一对应)
"zones": [
{
"zoneId": 101, "zoneName": "灌区1",
"phase": "SUCCESS",
"actualIrrigationSeconds": 180,
"fertPhase": "DONE",
"failReason": null
},
{
"zoneId": 102, "zoneName": "灌区2",
"phase": "IRRIGATING",
"actualIrrigationSeconds": 80,
"fertPhase": "FERT_RUNNING",
"failReason": null
},
{
"zoneId": 103, "zoneName": "灌区3",
"phase": "PENDING",
"actualIrrigationSeconds": null,
"fertPhase": null,
"failReason": null
}
]
}
任务失败时,客户视图精确定位到灌区 + 阶段 + 失败设备:
灌区2 ❌ 失败
阶段: 启动中(STARTING)
失败步骤: 开启灌区2(节点 index=4,类型 OPEN_GROUP)
失败设备: ball-valve-003 → ACK超时,已重试3次
失败时间: 2026-02-25 02:03:35
zone_logs 对应记录:
{
"zoneId": 102,
"phase": "FAILED",
"failPhase": "STARTING",
"failNodeIndex": 4,
"failNodeType": "OPEN_GROUP",
"failReason": "ball-valve-003 ACK超时,重试3次",
"failedAt": "2026-02-25T02:03:35"
}
ZonePlanInterpreter 是连接"物理节点层"与"灌区视图层"的翻译器,不参与任何执行逻辑,只做只读推导。其核心职责是:
execution_plan.nodes 中按 zoneIndex + zonePhase 聚合出每个灌区当前所处的阶段;currentZoneIndex);actualIrrigationSeconds);合并 FertTask 状态到灌区子状态。
输入:
execution_plan.nodes(含 zoneIndex、zonePhase 标注)
execution_plan.zone_logs(灌区摘要日志)
current_index(当前节点游标)
fert_task 列表(按 execution_id 查询)
输出:
ZoneViewResult(进度 API 的响应体,见 §5.6.6)
调用时机:进度查询接口(
GET /task-execution/{id}/progress)、日志查询接口,以及任务完成后生成执行摘要时,均调用此解析器。解析器不写 DB,只读数据后实时计算。
// 解析器输入
ZoneInterpreterInput {
List<ExecutionNode> nodes; // 所有物理节点(含 zoneIndex、zonePhase)
List<ZoneLog> zoneLogs; // zone_logs 数组(from execution_plan JSON)
int currentIndex; // task_execution.current_index
List<ZoneConfig> zoneConfigs; // 灌区配置列表(含 zoneId、zoneName、durationSeconds)
List<Long> skipZones; // 已跳过灌区 ID 列表
List<FertTask> fertTasks; // 施肥子任务列表
}
// 单灌区视图
ZoneView {
int zoneIndex; // 灌区在 zone_sequence 中的位置(0-based)
Long zoneId;
String zoneName;
ZonePhase phase; // PENDING/STARTING/IRRIGATING/SWITCHING/STOPPING/SUCCESS/FAILED/SKIPPED
// 时间统计
DateTime openAt; // 灌区开阀时刻
DateTime irrigationStartAt; // 灌溉等待开始时刻(WAIT(IRRIGATE) 开始执行时)
DateTime irrigationEndAt; // 灌溉等待结束时刻
Integer actualIrrigationSeconds; // 实际灌溉时长(irrigationEndAt - irrigationStartAt)
Integer elapsedSeconds; // 灌溉已进行时长(IRRIGATING 阶段:NOW - irrigationStartAt)
// 失败信息
String failPhase; // 失败时所在阶段
Integer failNodeIndex; // 失败的物理节点 index
String failNodeType; // 失败节点类型
String failReason; // 聚合失败描述(含失败设备列表)
DateTime failedAt;
// 施肥子状态(来自 FertTask)
FertPhase fertPhase; // PENDING/STIRRING/FERT_RUNNING/DONE/FAILED/null
DateTime stirStartAt;
DateTime pumpStartAt;
DateTime pumpStopAt;
String fertFailReason;
// 压力警告(SET_PUMP_PRESSURE 失败时置入)
Boolean pumpPressureWarning;
String pumpPressureWarningReason;
}
Step 1:建立节点分组(按 zoneIndex 聚合)
Step 2:推导每个灌区当前阶段(ZonePhase)
Step 3:计算灌溉时长
Step 4:翻译失败信息
Step 5:合并施肥子状态
按 zoneIndex 将所有节点分组,建立 Map<Integer, List<ExecutionNode>> nodesByZone:
Map<Integer, List<ExecutionNode>> nodesByZone = nodes.stream()
.filter(n -> n.getZoneIndex() >= 0) // 过滤掉 zoneIndex=-1 的全局节点(不应存在,NodeIndexer 已全部分配)
.collect(Collectors.groupingBy(ExecutionNode::getZoneIndex));
注意:
NodeIndexer确保所有节点均有zoneIndex分配(START_PUMP、SET_PUMP_PRESSURE等全局节点归 Zone0 或 ZoneN,见 §5.6.2),无zoneIndex=-1的游离节点。
对每个 zoneIndex I,按以下优先级判断其 ZonePhase:
ZonePhase resolvePhase(int zoneIndex, List<ExecutionNode> zoneNodes, int currentIndex) {
// 1. 如果灌区在 skip_zones 列表中 → SKIPPED
if (skipZones.contains(zoneConfigs.get(zoneIndex).getZoneId())) {
return SKIPPED;
}
// 2. 找到本灌区的 WAIT(IRRIGATE) 节点(主等待节点)
ExecutionNode waitNode = zoneNodes.stream()
.filter(n -> n.getNodeType() == WAIT && "IRRIGATE".equals(n.getParams().getSource()))
.findFirst().orElse(null);
// 3. 找到本灌区的 CLOSE_GROUP 节点(结束节点)
ExecutionNode closeNode = zoneNodes.stream()
.filter(n -> n.getNodeType() == CLOSE_GROUP)
.findFirst().orElse(null);
// 4. 找到本灌区的 OPEN_GROUP 节点(首节点,开始标志)
ExecutionNode openNode = zoneNodes.stream()
.filter(n -> n.getNodeType() == OPEN_GROUP)
.findFirst().orElse(null);
// 5. 检查是否有失败节点(任意 status=FAILED 的节点)
boolean hasFailedNode = zoneNodes.stream()
.anyMatch(n -> n.getStatus() == NodeStatus.FAILED);
if (hasFailedNode) return FAILED;
// 6. 按节点执行状态推导阶段
if (closeNode != null && closeNode.getStatus() == SUCCESS) {
return SUCCESS;
}
if (waitNode != null && waitNode.getStatus() == SUCCESS) {
// WAIT 完成但 CLOSE 未完成 → 切换中(SWITCHING)或 结束中(STOPPING)
return zoneIndex == lastZoneIndex ? STOPPING : SWITCHING;
}
if (waitNode != null && waitNode.getStatus() == RUNNING) {
return IRRIGATING;
}
if (openNode != null && openNode.getStatus() != PENDING) {
return STARTING; // OPEN_GROUP 已开始(RUNNING/SUCCESS),泵可能还在启动
}
return PENDING;
}
从 zone_logs 中直接读取(由执行引擎在 WAIT 节点开始/结束时回写):
Integer resolveIrrigationSeconds(ZoneView view, ZoneLog zoneLog, ZonePhase phase) {
if (phase == IRRIGATING) {
// 灌溉中:计算已进行时长
if (zoneLog.getIrrigationStartAt() != null) {
return (int) Duration.between(zoneLog.getIrrigationStartAt(), now).getSeconds();
}
}
if (phase == SUCCESS || phase == SWITCHING || phase == STOPPING) {
// 灌溉已结束:使用实际完成时长
if (zoneLog.getIrrigationStartAt() != null && zoneLog.getIrrigationEndAt() != null) {
return (int) Duration.between(
zoneLog.getIrrigationStartAt(),
zoneLog.getIrrigationEndAt()
).getSeconds();
}
}
return null;
}
zone_logs回写时机(执行引擎侧):
WAIT(IRRIGATE)节点开始执行 → 回写zone_logs[I].irrigationStartAt = NOW()WAIT(IRRIGATE)节点完成(status=SUCCESS)→ 回写zone_logs[I].irrigationEndAt = NOW(),计算actualIrrigationSecondsOPEN_GROUP(ZI)节点完成 → 回写zone_logs[I].openAt = NOW()CLOSE_GROUP(ZI)节点完成 → 更新zone_logs[I].phase = SUCCESS
void resolveFailInfo(ZoneView view, List<ExecutionNode> zoneNodes) {
zoneNodes.stream()
.filter(n -> n.getStatus() == NodeStatus.FAILED)
.findFirst()
.ifPresent(failedNode -> {
view.setFailPhase(failedNode.getZonePhase().name());
view.setFailNodeIndex(failedNode.getIndex());
view.setFailNodeType(failedNode.getNodeType().name());
view.setFailedAt(failedNode.getFinishedAt());
// 聚合失败设备描述
String failReason = failedNode.getDevices().stream()
.filter(d -> d.getAckStatus() != SUCCESS)
.map(d -> d.getDeviceId() + " → " + translateFailReason(d))
.collect(Collectors.joining(";"));
view.setFailReason(failReason);
// 特殊处理:SET_PUMP_PRESSURE 失败 → 写警告,不写 failReason
if (failedNode.getNodeType() == SET_PUMP_PRESSURE) {
view.setPumpPressureWarning(true);
view.setPumpPressureWarningReason(failReason);
view.setFailReason(null); // 不影响灌区状态
}
});
}
String translateFailReason(DeviceAck d) {
return switch (d.getAckStatus()) {
case TIMEOUT -> "ACK超时,重试" + d.getRetryCount() + "次";
case FAIL -> "设备返回失败:" + d.getFailReason();
default -> "未知原因";
};
}
void mergeFertStatus(ZoneView view, List<FertTask> fertTasks, Long zoneId) {
fertTasks.stream()
.filter(f -> f.getZoneId().equals(zoneId))
.findFirst()
.ifPresent(fert -> {
view.setFertPhase(fert.getStatus().toZonePhase());
view.setStirStartAt(fert.getStirStartAt());
view.setPumpStartAt(fert.getPumpStartAt());
view.setPumpStopAt(fert.getPumpStopAt());
if (fert.getStatus() == FertStatus.FAILED) {
view.setFertFailReason(fert.getFailReason());
}
});
}
当执行到 ZONE_SWITCH_WAIT 或 CLOSE_GROUP(ZI-1) 时,ZI-1 和 ZI 的阀门同时开启:
物理状态:ZI-1 阀门开 + ZI 阀门开 + 泵运行
解析器输出:
ZI-1: phase = SWITCHING(CLOSE_GROUP 还未完成)
ZI: phase = STARTING(OPEN_GROUP 已完成,等待 WAIT(IRRIGATE) 开始)
客户视图展示:
灌区1 ── 切换中(正在稳压)
灌区2 ── 启动中(阀门已开启)
这是预期的正常状态,解析器无需特殊处理,按节点状态顺序推导即可。
跳过 Z2 后,节点列表被截断重算,skip_zones = [102],新生成节点中 CLOSE_GROUP(Z1) 的 zoneIndex 仍为 0(Z1 的结束),OPEN_GROUP(Z3) 的 zoneIndex 为 2。
解析器:
// skip_zones 中的灌区直接返回 SKIPPED,不参与其他推导
if (skipZones.contains(config.getZoneId())) return SKIPPED;
客户视图:
灌区1 ── SUCCESS
灌区2 ── ⏭ 已跳过
灌区3 ── IRRIGATING
Resume 后插入的重建节点(source=RESUME_REBUILD)仍按正常 zoneIndex 归属,解析器不需要区分,正常按节点状态推导即可。但需过滤掉安全关闭期间插入的 STOP_PUMP(source=SAFE_CLOSE),避免误判最后灌区进入 STOPPING:
// 过滤安全关闭节点,不纳入阶段推导
boolean isSafeCloseNode = "SAFE_CLOSE".equals(node.getParams().getSource());
if (isSafeCloseNode) continue;
节点列表:OPEN_GROUP(Z0) → SET_PUMP_PRESSURE → START_PUMP → WAIT(Z0) → STOP_PUMP → CLOSE_GROUP(Z0)
全部 zoneIndex = 0
解析结果:
zones[0] 经历 PENDING → STARTING → IRRIGATING → STOPPING → SUCCESS
无 SWITCHING 阶段(单灌区无切换)
解析器对此无特殊分支,lastZoneIndex = 0,WAIT 完成后判断为 STOPPING,逻辑自洽。
@Component
public class ZonePlanInterpreter {
public ZoneViewResult interpret(ZoneInterpreterInput input) {
// Step 1:节点分组
Map<Integer, List<ExecutionNode>> nodesByZone = groupByZone(input.getNodes());
int lastZoneIndex = input.getZoneConfigs().size() - 1;
List<ZoneView> views = new ArrayList<>();
for (int i = 0; i < input.getZoneConfigs().size(); i++) {
ZoneConfig config = input.getZoneConfigs().get(i);
ZoneLog log = findLog(input.getZoneLogs(), config.getZoneId());
List<ExecutionNode> zoneNodes = nodesByZone.getOrDefault(i, List.of());
ZoneView view = new ZoneView();
view.setZoneIndex(i);
view.setZoneId(config.getZoneId());
view.setZoneName(config.getZoneName());
// Step 2:推导阶段
view.setPhase(resolvePhase(i, zoneNodes, input.getCurrentIndex(), lastZoneIndex, input.getSkipZones(), config));
// Step 3:计算时长
view.setActualIrrigationSeconds(resolveIrrigationSeconds(view, log, view.getPhase()));
view.setElapsedSeconds(resolveElapsedSeconds(view, log, view.getPhase()));
// Step 4:翻译失败
resolveFailInfo(view, zoneNodes);
// Step 5:施肥子状态
mergeFertStatus(view, input.getFertTasks(), config.getZoneId());
views.add(view);
}
// 找出当前灌区(第一个不是 SUCCESS/FAILED/SKIPPED 的灌区)
ZoneView currentZone = views.stream()
.filter(v -> !Set.of(SUCCESS, FAILED, SKIPPED).contains(v.getPhase()))
.findFirst()
.orElse(views.get(lastZoneIndex)); // 全部完成时返回最后灌区
return ZoneViewResult.builder()
.zones(views)
.currentZoneIndex(currentZone.getZoneIndex())
.currentZoneName(currentZone.getZoneName())
.currentZonePhase(currentZone.getPhase())
.irrigationElapsedSeconds(currentZone.getElapsedSeconds())
.irrigationTotalSeconds(
input.getZoneConfigs().get(currentZone.getZoneIndex()).getDurationSeconds()
)
.currentFertPhase(currentZone.getFertPhase())
.build();
}
}
解析器依赖执行引擎在节点执行过程中的回写行为,双方通过 zone_logs 和节点 status 字段通信:
| 执行引擎行为 | zone_logs 回写字段 | 解析器依赖 |
|---|---|---|
OPEN_GROUP(ZI) 成功 |
zone_logs[I].openAt |
推导 STARTING → IRRIGATING 时机 |
WAIT(IRRIGATE) 开始 |
zone_logs[I].irrigationStartAt |
计算 elapsedSeconds |
WAIT(IRRIGATE) 完成 |
zone_logs[I].irrigationEndAt、actualIrrigationSeconds |
展示实际灌溉时长 |
CLOSE_GROUP(ZI) 成功 |
zone_logs[I].phase = SUCCESS |
判断灌区完成 |
| 任意节点 FAILED | 节点 status=FAILED |
触发 Step 4 失败翻译 |
| FertTask 状态变更 | zone_logs[I].fertPhase |
Step 5 施肥子状态 |
设计原则:
zone_logs作为执行引擎与解析器之间的状态缓存层,执行引擎负责写入,解析器只读。避免解析器重新扫描大量节点进行时间计算,降低进度接口的查询开销。
第一层:生成时保障
→ 任务保存时根据安全规则(S-01~S-05)自动生成执行顺序
→ 人工无法配置出错误的执行顺序
→ 先开后关(S-02/S-04)、稳压等待(ZONE_SWITCH_WAIT)、水泵启动时机全部自动处理
第二层:执行时保障
→ 每个节点必须ACK成功才能进入下一个节点
→ ACK失败立即触发重试或终止
→ 不会出现上一步没完成就执行下一步
第三层:故障时保障
→ 安全关闭:关闭水泵 + 施肥设备(施肥泵、搅拌电机),阀门保持当前状态
→ 从执行记录查已开启设备,确保不遗漏
→ 关闭失败的设备加入报警,提示人工处理
→ 故障恢复时自动重建启动序列(S-05):先开灌区→再开泵
发送 MQTT 指令后,在 Redis 中为每个设备写入期望 ACK 记录:
ack:{execution_id}:{node_index}:{device_id}PENDINGSUCCESS 或 FAIL[发送MQTT指令]
│
├─► Redis 注册所有设备为 PENDING
│
├─► 发延迟消息A: CHECK_ACK(10秒后)
└─► 发延迟消息B: ACK_TIMEOUT(30秒后,兜底)
─── 10秒后:CHECK_ACK 消息到达 ──────────────────────
[检查 Redis ACK 注册表]
│
├─ 所有设备 SUCCESS ──► [发 NEXT_NODE 延迟消息(N分钟后)]
│ └─► Redis 写幂等标记(已推进)
│
├─ 有设备 FAIL ──────► [触发重试流程(见6.3)]
│
└─ 有设备仍 PENDING ─► [未超deadline → 再发CHECK_ACK(5秒后)]
[已超deadline → 按超时处理]
─── 30秒后:ACK_TIMEOUT 消息到达 ────────────────────
[检查 Redis 幂等标记]
│
┌────┴────┐
已推进 未推进
│ │
▼ ▼
忽略 [触发超时处理]
(幂等) │
├─► 记录超时设备列表
├─► 进入重试流程(或直接终止)
└─► 更新 task_execution.status
两条消息(CHECK 和 TIMEOUT)的幂等关系:
正常情况:
T+10s: CHECK消息到,ACK全部回来 → 状态推进 → 写幂等标记
T+30s: TIMEOUT消息到 → 发现幂等标记存在 → 忽略 ✅
超时情况:
T+10s: CHECK消息到,ACK没全回 → 再发CHECK(T+15s)
T+15s: CHECK消息到,还没回 → 再发CHECK(T+20s)
T+30s: TIMEOUT消息到 → 无幂等标记 → 触发超时处理 ✅
关键:用 status 字段 + Redis幂等标记 做幂等,两条消息不会重复处理
ACK_FAIL 或 TIMEOUT 的设备重新下发 MQTT 指令。SUCCESS 的设备不重复操作。| 失败次数 | 处理方式 |
|---|---|
| 第 1 次失败 | 立即重试(无等待) |
| 第 2 次失败 | 等待 5 秒后重试 |
| 第 3 次失败 | 等待 15 秒后重试 |
| 超过 3 次(网络/超时类) | 任务进入 PAUSED 状态 → 多通道报警 → 等待人工调用 Resume API 接续执行 |
| 硬件故障类错误 | 不重试,直接 FAILED → 触发安全关闭 → 发送报警 |
[ACK检查发现有失败设备]
│
▼
[检查失败原因]
│
├─► 硬件故障 → 直接 FAILED + 安全关闭 + 报警 ❌
│
└─► 网络/超时 → 检查 retryCount
│
┌──────┴──────┐
retryCount < 3 retryCount >= 3
│ │
▼ ▼
[只对失败设备重发指令] [status = PAUSED]
[retryCount + 1] [发送多通道报警]
[重置失败设备ACK=PENDING] [等待人工调用 Resume API]
[发延迟消息(等待间隔后CHECK)] [见 5.5 故障接续执行]
[更新last_heartbeat_at]
灌区1有5个球阀:
valve_001 → ACK_OK ✅
valve_002 → ACK_OK ✅
valve_003 → TIMEOUT ❌ → 重试
valve_004 → ACK_OK ✅
valve_005 → ACK_FAIL❌ → 重试
重试时:只对 valve_003 和 valve_005 重发指令
valve_001、002、004 不重复发送
当任务因异常(重试耗尽、不可恢复错误)而终止时,必须触发安全关闭流程。
[任务异常终止触发]
│
▼
[从 execution_plan JSON 查询已开启设备]
(已执行且status=SUCCESS的OPEN/START类节点中的设备)
│
▼
步骤1: 关闭水泵(如已启动)
│ 下发 STOP_PUMP MQTT 指令
│ 等待 ACK(最多30秒)
│ 超时也继续(尽力而为原则)
└─► 失败设备记录入报警列表
│
▼
步骤2: 关闭施肥设备(如已启动)
│ 下发施肥泵 OFF、搅拌电机 OFF 指令
│ 等待 ACK(最多30秒)
│ 超时也继续(尽力而为原则)
└─► 失败设备记录入报警列表
│
▼
注意:电磁阀/球阀保持当前状态,不在异常关闭流程中关闭
(管路中残余水自然排出,避免封闭管路导致水锤效应)
│
▼
[汇总安全关闭结果]
│
▼
[更新 task_execution.status = FAILED]
[写入 fail_reason 字段]
│
▼
[发送多通道报警]
├─► 短信:失败原因 + 设备列表 + 安全关闭状态
├─► 电话:语音播报关键失败信息(紧急告警时)
└─► 移动端推送:同短信内容,推送至 APP
安全关闭过程中,即使某个设备关闭失败或超时,也继续执行后续关闭步骤。不因单个设备失败而中断整个安全关闭流程。最终将所有失败设备汇总到报警通知中,提示人工处理。
为什么不关阀门:异常情况下,水泵已关停、管路无水压,阀门保持当前状态不会造成危害。反之,在管路仍有残余压力时关闭阀门可能导致水锤效应或爆管。阀门的正常开关由计划执行引擎(S-03 正常结束序列)统一管理。
轮灌任务执行失败
─────────────────────────────
任务名称:农场A区轮灌
失败节点:开启灌区2
失败时间:2026-02-25 02:03:35
重试次数:3次(已达上限)
失败设备:
valve_003 超时未响应
ball_005 设备离线
成功设备:
valve_001 ✅
valve_002 ✅
valve_004 ✅
安全关闭状态:
水泵 已关闭 ✅
施肥泵 已关闭 ✅
搅拌电机 已关闭 ✅
阀门 保持当前状态(不主动关闭)
请检查失败设备后手动处理
─────────────────────────────
系统依赖 RocketMQ 延迟消息驱动流程推进。当 MQ 消息因 Broker 宕机、网络异常等原因丢失时,流程可能停滞。看门狗机制作为最后一道保障,定期扫描"疑似卡住"的任务并触发恢复。
设计要点:由于灌区灌溉时长可能长达数小时甚至 10 小时以上,采用简单的 12 小时心跳超时方案。每个节点执行时更新
last_heartbeat_at,超过 12 小时未更新则判定为流程卡住。该方案简单可靠,避免了基于预计完成时间的复杂计算,且不会因 PAUSED 状态产生误判逻辑冲突。
每个节点执行时(包括 WAIT 节点开始执行时)更新 last_heartbeat_at = NOW():
| 更新时机 | 说明 |
|---|---|
| 节点开始执行时 | 任何节点(ACK类、WAIT类)开始执行时立即更新 |
| CHECK_ACK 消息处理时 | 每次检查 ACK 状态时更新,表明流程仍在活跃处理中 |
| 重试发生时 | 每次重试下发指令时更新 |
| NEXT_NODE 推进时 | 推进到下一节点时更新 |
通过定时任务(Quartz/ScheduledExecutor)每 30 分钟 扫描一次:
SELECT * FROM task_execution
WHERE status = 'RUNNING'
AND last_heartbeat_at < NOW() - INTERVAL 12 HOUR
为什么是 12 小时:单个灌区灌溉时长最长约 10 小时,加上执行前的 WAIT 启动和切换等待,12 小时是一个合理的安全阈值。正常情况下每个节点执行都会更新心跳,只有 MQ 消息真正丢失时才会超过 12 小时无心跳。
[看门狗发现疑似卡住的任务]
│
▼
[检查 task_execution.status]
│
┌────┴──────────┐
RUNNING PAUSED
│ │
▼ ▼
[重新发送当前节点] [忽略,PAUSED 是预期状态]
[NEXT_NODE 消息] [等待人工 Resume]
│
▼
[查 current_index 对应节点状态]
│
├─ PENDING → 重新触发 execute()
├─ RUNNING → 重新发送 CHECK_ACK
└─ SUCCESS → 发送 NEXT_NODE 推进
last_heartbeat_at,防止下一轮扫描重复处理。同一轮灌任务不允许出现多个同时运行的执行实例。由于同一任务可关联多个触发来源(定时 + 联动 + 手动),任意触发来源在创建新执行实例前,必须检查该任务是否存在未结束的执行实例:
[触发信号到达(Quartz / 联动规则引擎 / 手动触发)]
│
▼
[查询 task_execution 表:同一 task_id 是否存在 status = RUNNING / PAUSED 的记录]
│
┌────┴────────────┐
存在(正在执行) 不存在
│ │
▼ ▼
[本次触发放弃] [正常创建 task_execution]
[记录日志] [发送 TASK_START MQ 消息]
[可选:发送通知]
实现方式:
所有触发来源统一使用 Redis 原子标记(SET NX)作为并发门禁:
task:running:{task_id},值为 execution_id。SET NX,成功则创建执行实例,失败则拒绝本次触发。| 字段名 | 类型 | 说明 |
|---|---|---|
id |
BIGINT PK | 执行实例唯一ID |
task_id |
BIGINT | 关联的轮灌任务ID |
trigger_type |
VARCHAR(20) | 触发类型:SCHEDULED / LINKAGE / MANUAL |
execution_plan |
JSON | 完整有序节点列表(核心字段,见7.2) |
current_index |
INT | 当前正在执行的节点索引 |
status |
VARCHAR(20) | 执行状态:PENDING / RUNNING / SUCCESS / FAILED / PAUSED / CANCELLED |
version |
INT | 乐观锁版本号,防止并发更新冲突 |
started_at |
DATETIME | 实际开始时间 |
finished_at |
DATETIME | 完成时间 |
expected_finish_at |
DATETIME | 预计完成时间(含重试顺延后更新,前端展示用) |
last_heartbeat_at |
DATETIME | 最近一次心跳时间(看门狗主要判据,每个节点执行时更新,超过12小时未更新则判定卡住) |
paused_at |
DATETIME | 进入 PAUSED 状态的时间 |
fail_reason |
TEXT | 失败/暂停原因描述 |
乐观锁使用场景:
UPDATE task_execution
SET execution_plan = ?, current_index = ?, version = version + 1
WHERE id = ? AND version = ?
→ 防止集群下多节点同时更新同一个执行实例
| 字段名 | 类型 | 说明 |
|---|---|---|
id |
BIGINT PK | 报警记录唯一ID |
execution_id |
BIGINT | 关联的执行实例ID |
task_id |
BIGINT | 关联的任务ID |
alarm_type |
VARCHAR(50) | 报警类型:DEVICE_FAIL / TIMEOUT / SAFE_CLOSE 等 |
failed_node |
VARCHAR(100) | 失败节点名称 |
fail_reason |
TEXT | 详细失败原因 |
failed_devices |
JSON | 失败设备列表 |
success_devices |
JSON | 成功设备列表 |
safe_close_status |
VARCHAR(20) | 安全关闭结果:SUCCESS / PARTIAL / FAILED |
notified_channels |
VARCHAR(100) | 已通知渠道(SMS/PHONE/APP) |
created_at |
DATETIME | 报警时间 |
数据结构详见 §5.1.4.5。
| 字段名 | 类型 | 说明 |
|---|---|---|
id |
BIGINT PK | 施肥子任务唯一ID |
execution_id |
BIGINT | 关联主任务执行实例ID |
zone_id |
BIGINT | 绑定的灌区ID |
delay_minutes |
INT | 施肥延迟时间(分钟,来自任务级灌区配置) |
pre_stir_minutes |
INT | 搅拌提前时长(分钟) |
fert_duration_minutes |
INT | 施肥时长(时间模式,分钟) |
fert_target_liters |
INT | 施肥目标量(容量模式,升;NULL 表示时间模式) |
status |
VARCHAR(20) | 子任务状态:PENDING / STIR_WAITING / STIRRING / FERT_RUNNING / DONE / FAILED |
actual_fert_seconds |
INT | 实际施肥时长(秒) |
actual_liters |
INT | 实际施肥量(容量模式) |
fail_reason |
TEXT | 失败原因 |
stir_start_at |
DATETIME | 搅拌电机实际启动时间 |
pump_start_at |
DATETIME | 施肥泵实际启动时间 |
pump_stop_at |
DATETIME | 施肥泵实际停止时间 |
done_at |
DATETIME | 子任务完成时间 |
{
"zone_sequence": [101, 102, 103],
"skip_zones": [],
"zone_logs": [
{
"zoneId": 101,
"zoneName": "灌区1",
"zoneIndex": 0,
"openAt": "2026-02-25T02:00:00",
"irrigationStartAt": "2026-02-25T02:00:10",
"irrigationEndAt": "2026-02-25T02:03:10",
"actualIrrigationSeconds": 180,
"phase": "SUCCESS",
"fertPhase": "DONE",
"stirStartAt": "2026-02-25T02:00:13",
"pumpStartAt": "2026-02-25T02:00:15",
"pumpStopAt": "2026-02-25T02:00:20",
"fertFailReason": null,
"pumpPressureWarning": false,
"failPhase": null,
"failNodeIndex": null,
"failNodeType": null,
"failReason": null,
"failedAt": null
},
{
"zoneId": 102,
"zoneName": "灌区2",
"zoneIndex": 1,
"openAt": "2026-02-25T02:03:10",
"irrigationStartAt": "2026-02-25T02:03:25",
"irrigationEndAt": null,
"actualIrrigationSeconds": null,
"phase": "IRRIGATING",
"fertPhase": "FERT_RUNNING",
"stirStartAt": "2026-02-25T02:03:28",
"pumpStartAt": "2026-02-25T02:03:30",
"pumpStopAt": null,
"fertFailReason": null,
"pumpPressureWarning": false,
"failPhase": null,
"failNodeIndex": null,
"failNodeType": null,
"failReason": null,
"failedAt": null
},
{
"zoneId": 103,
"zoneName": "灌区3",
"zoneIndex": 2,
"openAt": null,
"irrigationStartAt": null,
"irrigationEndAt": null,
"actualIrrigationSeconds": null,
"phase": "PENDING",
"fertPhase": null,
"stirStartAt": null,
"pumpStartAt": null,
"pumpStopAt": null,
"fertFailReason": null,
"pumpPressureWarning": false,
"failPhase": null,
"failNodeIndex": null,
"failNodeType": null,
"failReason": null,
"failedAt": null
}
],
"nodes": [
{
"index": 0,
"nodeType": "OPEN_GROUP",
"nodeName": "开启灌区1",
"refId": 101,
"zoneIndex": 0,
"zonePhase": "STARTING",
"params": {
"targetAngle": 80,
"targetPressureKpa": 250
},
"status": "SUCCESS",
"retryCount": 0,
"maxRetry": 3,
"startedAt": "2026-02-25T02:00:00",
"finishedAt": "2026-02-25T02:00:05",
"devices": [
{
"deviceId": "valve-001",
"deviceType": "SOLENOID_VALVE",
"ackStatus": "SUCCESS",
"retryCount": 0,
"failReason": null,
"ackAt": "2026-02-25T02:00:03"
},
{
"deviceId": "ball-valve-001",
"deviceType": "BALL_VALVE",
"ackStatus": "SUCCESS",
"retryCount": 1,
"failReason": null,
"ackAt": "2026-02-25T02:00:04"
}
]
},
{
"index": 1,
"nodeType": "SET_PUMP_PRESSURE",
"nodeName": "设置水泵压力",
"refId": 201,
"zoneIndex": 0,
"zonePhase": "STARTING",
"params": {
"pressureKpa": 300
},
"status": "SUCCESS",
"retryCount": 0,
"maxRetry": 3,
"startedAt": "2026-02-25T02:00:06",
"finishedAt": "2026-02-25T02:00:08",
"devices": [
{
"deviceId": "pump-001",
"deviceType": "PRESSURE_PUMP",
"ackStatus": "SUCCESS",
"retryCount": 0,
"failReason": null,
"ackAt": "2026-02-25T02:00:07"
}
]
},
{
"index": 2,
"nodeType": "START_PUMP",
"nodeName": "开启水泵",
"refId": 201,
"zoneIndex": 0,
"zonePhase": "STARTING",
"params": {},
"status": "RUNNING",
"retryCount": 0,
"maxRetry": 3,
"startedAt": "2026-02-25T02:00:09",
"finishedAt": null,
"devices": [
{
"deviceId": "pump-001",
"deviceType": "PRESSURE_PUMP",
"ackStatus": "PENDING",
"retryCount": 0,
"failReason": null,
"ackAt": null
}
]
},
{
"index": 3,
"nodeType": "WAIT",
"nodeName": "灌溉灌区1,等待3分钟",
"refId": null,
"zoneIndex": 0,
"zonePhase": "IRRIGATING",
"params": {
"seconds": 180,
"source": "IRRIGATE",
"fertEnabled": true,
"fertPumpDeviceId": "fert-pump-001",
"stirDeviceId": "stir-motor-001"
},
"status": "PENDING",
"retryCount": 0,
"maxRetry": 0,
"startedAt": null,
"finishedAt": null,
"devices": []
}
]
}
字段说明:
根节点字段说明:
| 字段 | 说明 |
|---|---|
zone_sequence |
当前有效的灌区执行顺序(灌区 ID 列表,运行时可调整) |
skip_zones |
运行时被跳过的灌区 ID 列表,跳过后重新生成后续节点 |
zone_logs |
灌区摘要日志数组,由执行引擎回写,供 ZonePlanInterpreter(§5.7)读取 |
nodes |
物理执行节点列表(由计划生成引擎生成,执行引擎逐节点执行并更新状态) |
zone_logs[] 字段说明:
| 字段 | 说明 | 回写时机 |
|---|---|---|
zoneId |
灌区 ID | 计划生成时初始化 |
zoneName |
灌区名称 | 计划生成时初始化 |
zoneIndex |
灌区在 zone_sequence 中的位置(0-based) | 计划生成时初始化 |
openAt |
OPEN_GROUP 节点完成时刻 |
OPEN_GROUP 成功后回写 |
irrigationStartAt |
WAIT(IRRIGATE) 节点开始执行时刻 |
WAIT(IRRIGATE) 开始时回写 |
irrigationEndAt |
WAIT(IRRIGATE) 节点完成时刻 |
WAIT(IRRIGATE) 完成时回写 |
actualIrrigationSeconds |
实际灌溉时长(秒),由 irrigationEndAt - irrigationStartAt 计算 | WAIT(IRRIGATE) 完成时回写 |
phase |
灌区当前阶段(由解析器推导,也在 CLOSE_GROUP 后由引擎置为 SUCCESS) | CLOSE_GROUP 成功 / 解析器推导 |
fertPhase |
施肥子状态(由 FertTask 处理器回写) | FertTask 每次状态变更时回写 |
stirStartAt |
搅拌电机实际启动时刻 | FertTask 回写 |
pumpStartAt |
施肥泵实际启动时刻 | FertTask 回写 |
pumpStopAt |
施肥泵停止时刻 | FertTask 回写 |
fertFailReason |
施肥失败原因 | FertTask 失败时回写 |
pumpPressureWarning |
水泵压力设置失败警告标志(不阻断主流程) | SET_PUMP_PRESSURE 失败时回写 |
failPhase |
失败时所在灌区阶段(STARTING / IRRIGATING / SWITCHING / STOPPING) | 节点 FAILED 时回写 |
failNodeIndex |
失败的物理节点 index | 节点 FAILED 时回写 |
failNodeType |
失败节点类型 | 节点 FAILED 时回写 |
failReason |
聚合失败描述(含失败设备列表,由解析器翻译) | 节点 FAILED 时回写 |
failedAt |
失败时刻 | 节点 FAILED 时回写 |
nodes[] 字段说明:
| 字段 | 说明 |
|---|---|
index |
节点顺序索引,从 0 开始 |
nodeType |
节点类型,对应 Handler 路由 |
nodeName |
可读的节点名称,用于日志和报警展示 |
refId |
关联实体 ID(灌溉组ID / 水泵ID) |
zoneIndex |
节点所属灌区在 zone_sequence 中的位置(0-based);由 NodeIndexer(Phase 5)按§5.6.2 规则分配;供 ZonePlanInterpreter 分组使用 |
zonePhase |
节点在灌区生命周期中的阶段标识:STARTING / IRRIGATING / SWITCHING / STOPPING;由 NodeIndexer 按§5.6.2 规则分配 |
params |
节点执行参数;OPEN_GROUP 含 targetAngle、targetPressureKpa(球阀配置了目标压力时);SET_PUMP_PRESSURE 含 pressureKpa;WAIT 含 seconds |
params.source |
WAIT 类节点来源标识:IRRIGATE(灌溉等待)/ ZONE_SWITCH(稳压等待)/ RESUME_REBUILD(故障接续重建节点)/ SAFE_CLOSE(安全关闭节点) |
params.fertEnabled |
仅 WAIT(source=IRRIGATE) 节点有值;true 表示执行此 WAIT 时需创建 FertTask 并触发施肥流程 |
params.fertPumpDeviceId |
施肥泵设备 ID(fertEnabled=true 时有值) |
params.stirDeviceId |
搅拌电机设备 ID(fertEnabled=true 时有值) |
status |
节点状态:PENDING / RUNNING / SUCCESS / FAILED / SKIPPED |
retryCount |
节点级别已重试次数 |
maxRetry |
最大重试次数(默认3,WAIT/ZONE_SWITCH_WAIT 节点为0) |
startedAt / finishedAt |
节点执行时间记录 |
devices[].ackStatus |
单设备 ACK 状态:PENDING / SUCCESS / FAIL / TIMEOUT |
devices[].retryCount |
单设备已重试次数 |
devices[].failReason |
单设备失败原因 |
查某次执行的完整计划 → JSON一次读完 ✅
查当前执行到哪一步 → current_index字段 ✅
查某次执行是否成功 → status字段 ✅
安全关闭查已开启设备 → 从JSON中内存过滤 ✅
跳过灌区后重算后续计划 → 截断nodes + 重新追加新节点 ✅
故障接续判断是否需要重建序列 → 查JSON中 STOP_PUMP 安全关闭节点 ✅
执行历史归档 → task_execution记录保留,JSON即完整记录 ✅
(新增,支持灌区视图层)
查客户进度视图(当前灌哪个灌区)→ ZonePlanInterpreter 读 nodes[].zoneIndex + zone_logs ✅
查灌区实际灌溉时长 → zone_logs[I].actualIrrigationSeconds ✅
查灌区失败原因及失败节点 → zone_logs[I].failNodeIndex + failReason ✅
查施肥子任务进度 → zone_logs[I].fertPhase + FertTask记录 ✅
压力设置失败不阻断,单独告警 → zone_logs[I].pumpPressureWarning ✅
JSON大小估算:
10个节点,每节点含10个设备,3个灌区 zone_logs → 约8-15KB
完全在MySQL JSON字段承受范围内