Agently Docs

Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.

View the Project on GitHub AgentEra/Agently

TriggerFlow 编排 Playbook

语言:English · 中文

何时用本 playbook

过程有 3 个或更多离散阶段。下列任一为真:

都不沾,留在 request 层 —— 见 快速开始输出控制

推荐结构

应用
   │
   ▼
TriggerFlow 定义(一个 flow 一个 Python 模块)
   ├── prepare         ← 校验 / 归一化输入
   ├── classify        ← 模型调用:按类型路由
   ├── (按分类分支)
   │     ├── handle_A → … → finalize
   │     ├── handle_B → … → finalize
   │     └── handle_C → … → finalize
   ├── for_each items  ← 任一 handler 返回 list 时 fan out
   ├── pause_for(...)  ← 可选人工批准
   └── finalize        ← 写最终 state,推到 runtime stream

flow 之外:
   • create_execution(auto_close=False, runtime_resources={...})
   • async_start(...)
   • 消费 runtime stream 给 live UI
   • async_close() → 给 API 响应的 close snapshot

骨架

from agently import TriggerFlow, TriggerFlowRuntimeData


def build_flow():
    flow = TriggerFlow(name="orchestration")

    async def prepare(data: TriggerFlowRuntimeData):
        # 校验 / 归一化输入
        await data.async_set_state("input", data.input)
        return data.input

    async def classify(data: TriggerFlowRuntimeData):
        agent = data.require_resource("agent")
        return await agent.input(data.input).output({
            "category": (str, "分类", True),
        }).async_start()

    async def handle_default(data: TriggerFlowRuntimeData):
        # ...
        await data.async_set_state("answer", "...")

    (
        flow.to(prepare)
            .to(classify)
            .match()
                .case("A").to(handle_default)
                .case("B").to(handle_default)
                .case_else().to(handle_default)
            .end_match()
    )

    return flow


async def run(input_value, agent):
    flow = build_flow()
    execution = flow.create_execution(
        auto_close=False,
        runtime_resources={"agent": agent},
    )
    await execution.async_start(input_value)
    return await execution.async_close()

骨架里的几个选择:

变体

需要 fan out

把单 handler 替换为 for_each

async def list_subtasks(data):
    return data.input["subtasks"]   # list

async def handle_one(data):
    return await some_agent.input(data.input).async_start()

(
    flow.to(list_subtasks)
        .for_each(concurrency=4)
            .to(handle_one)
        .end_for_each()
        .to(collect)
)

batchfor_each 与并发上限见 模式

需要人工批准

pause_for 步骤。execution 必须用 auto_close=False 创建;通过 continue_with 和显式 resume_to 目标恢复。

async def ask(data):
    return await data.async_pause_for(
        type="approval",
        payload={"summary": data.input["summary"]},
        resume_to="next",
    )

Pause 与 Resume

需要跨重启存活

在有意义的 checkpoint 保存 execution state(通常在 pause_for 未决时),把结果持久化到耐久存储,用 flow.create_execution(...).load(saved) 恢复。

saved = execution.save()
db.put(execution_id, saved)

# 后续,可能在另一进程:
restored = flow.create_execution(auto_close=False, runtime_resources={...})
restored.load(db.get(execution_id))

恢复侧重新注入 runtime resource。见 持久化与 Blueprint

需要流式 UI

在 FastAPI / WebSocket handler 内消费 execution.get_async_runtime_stream(...)。chunk 通过 data.async_put_into_stream(...) 推 item。见 FastAPI 服务封装事件与流

不要做什么

交叉链接