Emit + When¶
事件路由与分支收集。
源码¶
from agently import TriggerFlow, TriggerFlowEventData
## TriggerFlow emit + when: custom event routing
def emit_when_demo():
# Idea: use emit + when to fan out work and collect results.
# Flow: planner emits Plan.Read/Plan.Write -> when listeners -> collect
# Expect: prints collected results; start() does not return a final value here.
flow = TriggerFlow()
async def planner(data: TriggerFlowEventData):
# emit two custom events for downstream branches
await data.async_emit("Plan.Read", {"task": "read"})
await data.async_emit("Plan.Write", {"task": "write"})
return "plan done"
async def reader(data: TriggerFlowEventData):
return f"read: {data.value['task']}"
async def writer(data: TriggerFlowEventData):
return f"write: {data.value['task']}"
# when listens to custom events emitted by planner
flow.to(planner).end()
flow.when("Plan.Read").to(reader).collect("plan", "read")
(
flow.when("Plan.Write")
.to(writer)
.collect("plan", "write")
.to(lambda d: print("[collect]", d.value))
.end()
)
flow.start("go", wait_for_result=False)
# emit_when_demo()
讲解¶
- emit + when 做扇出扇入。
注释解读¶
- Idea 表示案例思路
- Flow 表示执行编排路径
- Expect 表示预期输出或行为
你学会了什么¶
- 理解 emit + when 的事件路由
练习任务¶
- 增加一个新事件分支并 collect