数据流¶
runtime_data 与 collect 示例。
源码¶
from agently import TriggerFlow, TriggerFlowEventData
## TriggerFlow Data Flow: runtime_data + collect
def triggerflow_data_flow():
# Idea: share execution-scoped data across branches and collect results.
# Flow: set_runtime + set_runtime_context -> collect -> when(runtime_data)
# Expect: prints "[when runtime]" then two collect outputs.
flow = TriggerFlow()
async def set_runtime(data: TriggerFlowEventData):
data.set_runtime_data("user_id", "u-001")
return "runtime ok"
async def set_runtime_context(data: TriggerFlowEventData):
data.set_runtime_data("env", "prod")
return "runtime context ok"
# collect waits for multiple branches to fill values
(
flow.to(set_runtime)
.collect("done", "r1", mode="filled_then_empty")
.to(lambda data: print("[collect runtime]", data.value))
)
(
flow.to(set_runtime_context)
.collect("done", "r2", mode="filled_then_empty")
.to(lambda data: print("[collect runtime context]", data.value))
.end()
)
# when: wait for runtime_data signal
flow.when({"runtime_data": "user_id"}).to(lambda data: print("[when runtime]", data.value))
flow.start()
# triggerflow_data_flow()
讲解¶
- 运行态隔离避免数据串扰。
注释解读¶
- Idea 表示案例思路
- Flow 表示执行编排路径
- Expect 表示预期输出或行为
你学会了什么¶
- 理解 runtime_data 的执行态隔离
- 掌握 collect 的合并机制
练习任务¶
- 添加一个分支并观察 collect 输出