Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.
语言:English · 中文
两条独立的序列化路径,不要混淆。
| 方法 | 序列化什么 | 典型用途 |
|---|---|---|
execution.save() / execution.load(saved) |
一次 execution 在某个时刻的运行时 state | 跨进程重启恢复,交给另一 worker |
flow.save_blueprint() / flow.load_blueprint(blueprint) |
flow 定义的结构(chunk、分支、条件) | 把 flow 当配置 artifact 分发或版本控制 |
save() 捕获恢复 execution 所需的全部内容:
statepause_for(...))resource_keys —— 恢复时期望的 runtime resource 名,但不含 live 值它不捕获:
runtime_resources 本体(不可序列化;见 State 与 Resources)execution = flow.create_execution(auto_close=False)
await execution.async_start("refund request")
saved_state = execution.save()
# 把 saved_state 持久化到某处(Redis、DB、文件等)
后续恢复(可能是另一个进程):
restored = flow.create_execution(
auto_close=False,
runtime_resources={"db": new_db_client, "logger": new_logger},
)
restored.load(saved_state)
# 继续:emit、continue_with interrupt,再 close
await restored.async_emit("UserFeedback", {"approved": True})
snapshot = await restored.async_close()
flow 定义两端必须一致(或兼容)—— load() 不会从 saved_state 重建 chunk 图,要求 flow 已存在。
execution = flow.create_execution(auto_close=False)
await execution.async_start("topic")
# 此时 flow 可能已调 pause_for(...)
saved = execution.save()
# 几天后,另一 worker
restored = flow.create_execution(
auto_close=False,
runtime_resources={"search_tool": new_search_function},
)
restored.load(saved)
interrupt_id = next(iter(restored.get_pending_interrupts()))
await restored.async_continue_with(interrupt_id, {"approved": True})
snapshot = await restored.async_close()
get_pending_interrupts() 返回 pause_for(...) 创建的 interrupt id 集合。continue_with(id, payload) 解析一个 interrupt,并按该 interrupt 的 resume_to 目标继续图。
blueprint 序列化 flow 的结构 —— chunk 引用、分支、条件 —— 但不含 chunk 函数体(仍在代码里)。
def upper(data):
return str(data.input).upper()
def store(data):
return data.async_set_state("output", data.input)
source = TriggerFlow(name="source")
source.register_chunk_handler(upper)
source.register_chunk_handler(store)
source.to(upper).to(store)
blueprint = source.save_blueprint() # dict,可 JSON / YAML 序列化
另一端恢复:
restored = TriggerFlow(name="restored")
restored.register_chunk_handler(upper) # 同名函数体必须可用
restored.register_chunk_handler(store)
restored.load_blueprint(blueprint)
关键约束:blueprint 用到的 chunk 必须在恢复端按相同 handler 名注册。没 register_chunk_handler(...) loader 无法把名映回函数,load 失败。
服务代码优先使用这种封装形态:
TriggerFlow(...) 对象视为 flow definition surface。flow.update_runtime_resources(...) 注入。runtime_resources={...} 注入。state,不要放 flow_data。async def analyze(data):
agent_factory = data.require_resource("agent_factory")
prompts_path = data.require_resource("prompts_path")
question = data.input
data.set_state("question", question)
agent = agent_factory()
return agent.load_yaml_prompt(
prompts_path,
prompt_key_path="analyze",
mappings={"question": question},
).start()
async def answer(data):
policy_doc = data.require_resource("policy_doc")
question = data.get_state("question")
return f"{policy_doc}\n\nQ: {question}"
def build_policy_flow() -> TriggerFlow:
flow = TriggerFlow(name="policy")
flow.update_runtime_resources(
agent_factory=make_agent,
prompts_path=PROMPTS_DIR / "policy.yaml",
)
flow.to(analyze).to(answer)
return flow
flow = build_policy_flow()
snapshot = flow.start(
"travel subsidy?",
runtime_resources={"policy_doc": tenant_policy_doc},
)
这种写法让业务模块尽量轻,同时保留 config / blueprint 兼容性。闭包适合短脚本,但服务化推荐模块顶层 named handlers:更容易测试、注册、导出和重载。
当前行为:TriggerFlow 的 module-safe definition assembly 把 TriggerFlow(...)
本身视为规划面,把 create_execution(...) / start_execution(...) 视为进入一次运行的边界。
没有额外的 TriggerFlow.define(...) 模式。服务模块可以安全重放同一段 definition assembly:
named function 保持稳定 stage identity;同一个函数承担两个业务 stage 时,用 name=...
显式区分。
对于运行时由模型生成 To-Do List 或依赖图的模型应用,动态图应按 plan 或 request 局部生成。 extract / analyze 这类可复用 sub-flow template 可以放在模块级;per-plan executor 应用 task id 作为动态 stage identity,把 task 结果写入 execution state,并避免修改 main flow definition。
Flow 定义(chunk、分支、条件)
│
├── save_blueprint() → 描述图结构的 dict
│
▼
create_execution() ────► 一个 Execution
│
├── save() → 描述该 execution 状态的 dict
│
▼
async_close() → close snapshot
两条路径都返回 JSON 友好 dict。存储后端(Redis、Postgres、S3、文件)由应用层选 —— 框架不带后端。
单服务器恢复
saved = execution.save()
redis.set(f"flow:{exec_id}", json.dumps(saved))
# 后续
saved = json.loads(redis.get(f"flow:{exec_id}"))
restored = flow.create_execution(auto_close=False, runtime_resources={...})
restored.load(saved)
分布式 worker 拉起
把 blueprint(存一次)和 execution save(每个 execution 存一份)配对:
blueprint = source_flow.save_blueprint()
db.save("flow_blueprints", blueprint_id, blueprint)
# worker 中
flow = TriggerFlow(name="loaded")
register_all_handlers(flow) # 你的注册入口
flow.load_blueprint(db.load("flow_blueprints", blueprint_id))
execution = flow.create_execution(auto_close=False, runtime_resources=...)
execution.load(saved)
pause_for / continue_with,最常见的保存场景