Agently Docs

Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.

View the Project on GitHub AgentEra/Agently

Pause 与 Resume

语言:English · 中文

pause_for(...) 让 chunk 停在可持久化的 interrupt barrier,把控制权交回框架等待外部事件。execution 保持 alive 但空闲。pause_for 期间 auto-close 暂停。外部调 continue_with(...) 后,TriggerFlow 按该 interrupt 的恢复目标继续图。

用 pause_for 挂起

async def ask(data: TriggerFlowRuntimeData):
    return await data.async_pause_for(
        type="human_input",
        payload={"question": f"批准 {data.input} 的操作?"},
        resume_to="next",
    )

pause_for 做:

参数 含义
type= 字符串标签(如 "human_input""approval""webhook")。应用据此决定如何呈现 interrupt。
payload= 给负责恢复方的结构化细节(UI 渲染问题、webhook 接收方等)。
resume_to= 可选恢复目标:"next""self"{"event": "EventName"}
resume_event= 兼容快捷方式。未显式设置 resume_to 时,continue_with 与匹配的 emit(...) 会路由到该事件。
interrupt_id= 可选。自己指定 id;否则框架生成。

用 continue_with 恢复

interrupt_id = next(iter(execution.get_pending_interrupts()))
await execution.async_continue_with(interrupt_id, {"approved": True})

使用 resume_to="next" 时,payload 成为暂停 chunk 的输出,下一段 .to(...) 收到它。

使用 resume_to="self" 时,同一个 chunk 会再次运行。用 data.is_resumedata.resume.value 读取恢复上下文:

async def gate(data: TriggerFlowRuntimeData):
    if data.is_resume:
        return {"decision": data.resume.value}
    return await data.async_pause_for(
        type="approval",
        payload={"question": "批准?"},
        resume_to="self",
    )

使用 resume_to={"event": "ApprovalGiven"} 时,TriggerFlow 用恢复 payload 发出该事件。resume_event="ApprovalGiven" 保留旧的事件式恢复行为。

完整例子

import asyncio
from agently import TriggerFlow, TriggerFlowRuntimeData


async def main():
    flow = TriggerFlow(name="approval")

    async def ask(data: TriggerFlowRuntimeData):
        return await data.async_pause_for(
            type="approval",
            payload={"question": f"批准工单 {data.input} 退款?"},
            resume_to="next",
        )

    async def commit(data: TriggerFlowRuntimeData):
        await data.async_set_state("decision", data.input)

    flow.to(ask).to(commit)

    execution = flow.create_execution(auto_close=False)
    await execution.async_start("T-001")

    # 真实系统里 UI / webhook 后续调 continue_with。
    # 这里在同一协程里恢复仅作 demo。
    interrupt_id = next(iter(execution.get_pending_interrupts()))
    await execution.async_continue_with(interrupt_id, {"approved": True})

    snapshot = await execution.async_close()
    print(snapshot["decision"])  # {'approved': True}


asyncio.run(main())

注意:这个 flow 用了 pause_for(...)。必须用 flow.create_execution(...)(或 flow.start_execution(...)),不要flow.start(...) —— 隐式 execution 没有外部可用的 handle 来调 continue_with,走到 pause_for(...) 时 TriggerFlow 会直接报错。

模型自主决定中断的文档审查例子见 examples/step_by_step/11-triggerflow-19_document_review_pause_resume.py:模型拥有的 gate 先判断是否需要人工复核,需要时调用 pause_for(..., resume_to="self"),恢复后同一 gate 通过 data.is_resumedata.resume 继续。

跨进程重启的 pause

pause_for(...)save / load 配合得很好:

execution = flow.create_execution(auto_close=False)
await execution.async_start("topic")
# 此时已碰到 pause_for;存在 pending interrupt

saved = execution.save()
# 持久化 saved

# 后续在另一进程 / worker:
restored = flow.create_execution(
    auto_close=False,
    runtime_resources={...},   # chunk 需要的全部重新注入
)
restored.load(saved)
interrupt_id = next(iter(restored.get_pending_interrupts()))
await restored.async_continue_with(interrupt_id, {"approved": True})
snapshot = await restored.async_close()

interrupt 是 saved state 的一部分,新进程知道有什么待处理。详见 持久化与 Blueprint

多个并发 pause

单 execution 可有多个未决 interrupt(如两个并行分支各等人工输入)。get_pending_interrupts() 返回全部;continue_with(id, payload) 一次解一个。

需要指定 id 时,给 pause_for(...)interrupt_id="my-id"continue_with 用同 id。

Pause vs emit

模式 用途
pause_for(..., resume_to="next") + continue_with 下一个图步骤应收到恢复 payload
pause_for(..., resume_to="self") + continue_with 同一 chunk 应带 data.resume 上下文再次运行
emit + when(...) 单独的 handler 在事件发生时跑;原 chunk 不必等

人工介入用 pause —— chunk 逻辑依赖人工回应。fan-out 副作用用 emit/when。

auto_close 互动

只要存在未决 pause_forauto_close=True 不触发。continue_with 解掉最后一个 pending interrupt 后 execution 重新进入空闲,auto-close 计时从零重启。

希望等待时永不 auto-close 用 auto_close_timeout=None(记得显式 close())。

async_close() 默认拒绝关闭仍有 pending interrupt 的 execution。应先恢复这些 interrupt;如果确实要放弃等待,必须显式取消:

snapshot = await execution.async_close(pending_interrupts="cancel")

另见