跳转至

数据流

runtime_data 与 collect 示例。

源码

from agently import TriggerFlow, TriggerFlowEventData


## TriggerFlow Data Flow: runtime_data + collect
def triggerflow_data_flow():
    # Idea: share execution-scoped data across branches and collect results.
    # Flow: set_runtime + set_runtime_context -> collect -> when(runtime_data)
    # Expect: prints "[when runtime]" then two collect outputs.
    flow = TriggerFlow()

    async def set_runtime(data: TriggerFlowEventData):
        data.set_runtime_data("user_id", "u-001")
        return "runtime ok"

    async def set_runtime_context(data: TriggerFlowEventData):
        data.set_runtime_data("env", "prod")
        return "runtime context ok"

    # collect waits for multiple branches to fill values
    (
        flow.to(set_runtime)
        .collect("done", "r1", mode="filled_then_empty")
        .to(lambda data: print("[collect runtime]", data.value))
    )
    (
        flow.to(set_runtime_context)
        .collect("done", "r2", mode="filled_then_empty")
        .to(lambda data: print("[collect runtime context]", data.value))
        .end()
    )

    # when: wait for runtime_data signal
    flow.when({"runtime_data": "user_id"}).to(lambda data: print("[when runtime]", data.value))

    flow.start()


# triggerflow_data_flow()

讲解

  • 运行态隔离避免数据串扰。

注释解读

  • Idea 表示案例思路
  • Flow 表示执行编排路径
  • Expect 表示预期输出或行为

你学会了什么

  • 理解 runtime_data 的执行态隔离
  • 掌握 collect 的合并机制

练习任务

  • 添加一个分支并观察 collect 输出