Agently Docs

Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.

View the Project on GitHub AgentEra/Agently

事件与流

语言:English · 中文

TriggerFlow 这里讨论两条和 flow 执行直接相关的通道。不要混淆

通道 flow 内部 flow 外部
emit / when chunk emit 一个事件,挂在 when(event) 上的 chunk 被触发 外部代码也可在 execution 还 openexecution.async_emit(...)
runtime stream chunk 通过 put_into_stream(...) 推 item 外部通过 execution.get_async_runtime_stream(...) 消费给 UI / SSE / 日志

emit 是图内的控制流。runtime stream 是把数据推到外部。

emit / when —— 控制流

import asyncio
from agently import TriggerFlow, TriggerFlowRuntimeData


async def main():
    flow = TriggerFlow(name="emit-when")

    async def prepare(data: TriggerFlowRuntimeData):
        await data.async_set_state("flag", "ready")
        await data.async_emit("Prepared", {"flag": "ready"})

    async def route(data: TriggerFlowRuntimeData):
        await data.async_set_state("when_payload", data.input)

    flow.to(prepare)
    flow.when("Prepared").to(route)

    snapshot = await flow.async_start(None)
    print(snapshot["when_payload"])  # {'flag': 'ready'}


asyncio.run(main())

机制:

Definition 安全 vs runtime signal

TriggerFlow 的 module-safe definition 工作解决的是服务模块被 import、reload 或重复组装时,不要把同一条图边或同一个生成的 when(...) gate 声明两遍。 它不是 runtime signal 去重。

在一次 execution 中,每一次 emit / emit_nowait 调用仍然是一次业务事件。 如果某个 chunk 发三次 Tickwhen("Tick") 就应该响应三次。这正是 emit_nowait(...) + when(...) 能支撑动态 To-Do executor、依赖 join、side branch 和 reflection loop 的原因。

多依赖 join 使用:

flow.when({"event": ["done:a", "done:b"]}, mode="and").to(continue_after_both)

join 状态属于单个 execution,不能跨 execution 泄漏,也不应放进共享 flow data。

外部 emit

execution 还 open 时外部也可 emit:

await execution.async_emit("UserClicked", {"id": 42})
execution.emit_nowait("UserClicked", {"id": 42})

seal()close() 后外部 emit 被拒。

Runtime stream —— 数据出

async def main():
    flow = TriggerFlow(name="runtime-stream")

    async def stream_steps(data: TriggerFlowRuntimeData):
        await data.async_put_into_stream("step-1")
        await data.async_put_into_stream("step-2")
        await data.async_set_state("done", True)

    flow.to(stream_steps)

    execution = flow.create_execution(auto_close=False)
    await execution.async_start("start")

    close_task = asyncio.create_task(execution.async_close())
    items = [item async for item in execution.get_async_runtime_stream(timeout=None)]
    snapshot = await close_task

    print(items)        # ['step-1', 'step-2']
    print(snapshot)     # {'done': True}

机制:

Stream timeout vs auto-close timeout

两者独立:

Timeout 控制
get_async_runtime_stream(timeout=N) 消费者等下一 item 多久后抛/停
execution 上的 auto_close_timeout execution 空闲多久后自动 close

stream timeout 设 None 意味着消费者等到 stream 真正关(即 close() 完成)才停。收集所有 item 时通常这么用。

隐式 stream 语法糖

flow.get_async_runtime_stream(...)flow.get_runtime_stream(...) 在内部建一个隐式 execution 并 stream。和 flow.start() 一样,仅适用于自闭合 flow(无 pause_for、无外部 emit)。如果隐式 stream execution 走到 pause_for(...),TriggerFlow 会 fail fast,因为外部没有可恢复 execution handle;需要等待/恢复时应创建显式 execution,再调用 execution.get_async_runtime_stream(...)

不要把 live item 放进 state

大或 live 的 item 走 runtime stream,不进 state。state 是给 close snapshot 用的 —— 应该小且可序列化。put_into_stream 让消费者一来就处理,不撑大 snapshot。

Observation events 不属于这条控制流

Agently 还会通过 Event Center 发出 observation event(观测事件),例如 TriggerFlow 生命周期、Session 应用、观察日志等。那是框架级观测通道,不是 emit / when 控制流,也不是 runtime stream 数据流。见 Event Center

另见