Agently Docs

Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.

View the Project on GitHub AgentEra/Agently

持久化与 Blueprint

语言:English · 中文

两条独立的序列化路径,不要混淆。

方法 序列化什么 典型用途
execution.save() / execution.load(saved) 一次 execution 在某个时刻的运行时 state 跨进程重启恢复,交给另一 worker
flow.save_blueprint() / flow.load_blueprint(blueprint) flow 定义的结构(chunk、分支、条件) 把 flow 当配置 artifact 分发或版本控制

Execution save / load

save() 捕获恢复 execution 所需的全部内容:

捕获:

execution = flow.create_execution(auto_close=False)
await execution.async_start("refund request")

saved_state = execution.save()
# 把 saved_state 持久化到某处(Redis、DB、文件等)

后续恢复(可能是另一个进程):

restored = flow.create_execution(
    auto_close=False,
    runtime_resources={"db": new_db_client, "logger": new_logger},
)
restored.load(saved_state)

# 继续:emit、continue_with interrupt,再 close
await restored.async_emit("UserFeedback", {"approved": True})
snapshot = await restored.async_close()

flow 定义两端必须一致(或兼容)—— load() 不会从 saved_state 重建 chunk 图,要求 flow 已存在。

跨 pause_for 的恢复

execution = flow.create_execution(auto_close=False)
await execution.async_start("topic")

# 此时 flow 可能已调 pause_for(...)
saved = execution.save()

# 几天后,另一 worker
restored = flow.create_execution(
    auto_close=False,
    runtime_resources={"search_tool": new_search_function},
)
restored.load(saved)

interrupt_id = next(iter(restored.get_pending_interrupts()))
await restored.async_continue_with(interrupt_id, {"approved": True})
snapshot = await restored.async_close()

get_pending_interrupts() 返回 pause_for(...) 创建的 interrupt id 集合。continue_with(id, payload) 解析一个 interrupt,并按该 interrupt 的 resume_to 目标继续图。

Flow blueprint save / load

blueprint 序列化 flow 的结构 —— chunk 引用、分支、条件 —— 但不含 chunk 函数体(仍在代码里)。

def upper(data):
    return str(data.input).upper()

def store(data):
    return data.async_set_state("output", data.input)

source = TriggerFlow(name="source")
source.register_chunk_handler(upper)
source.register_chunk_handler(store)
source.to(upper).to(store)

blueprint = source.save_blueprint()  # dict,可 JSON / YAML 序列化

另一端恢复:

restored = TriggerFlow(name="restored")
restored.register_chunk_handler(upper)   # 同名函数体必须可用
restored.register_chunk_handler(store)
restored.load_blueprint(blueprint)

关键约束:blueprint 用到的 chunk 必须在恢复端按相同 handler 名注册。没 register_chunk_handler(...) loader 无法把名映回函数,load 失败。

服务化推荐封装

服务代码优先使用这种封装形态:

  1. chunks 和 conditions 写成模块顶层 named functions。
  2. 把普通 TriggerFlow(...) 对象视为 flow definition surface。
  3. 稳定 live 依赖用 flow.update_runtime_resources(...) 注入。
  4. 请求级或租户级依赖用每次 execution 的 runtime_resources={...} 注入。
  5. 单次请求的业务中间值放 execution state,不要放 flow_data
async def analyze(data):
    agent_factory = data.require_resource("agent_factory")
    prompts_path = data.require_resource("prompts_path")
    question = data.input
    data.set_state("question", question)
    agent = agent_factory()
    return agent.load_yaml_prompt(
        prompts_path,
        prompt_key_path="analyze",
        mappings={"question": question},
    ).start()


async def answer(data):
    policy_doc = data.require_resource("policy_doc")
    question = data.get_state("question")
    return f"{policy_doc}\n\nQ: {question}"


def build_policy_flow() -> TriggerFlow:
    flow = TriggerFlow(name="policy")
    flow.update_runtime_resources(
        agent_factory=make_agent,
        prompts_path=PROMPTS_DIR / "policy.yaml",
    )
    flow.to(analyze).to(answer)
    return flow


flow = build_policy_flow()
snapshot = flow.start(
    "travel subsidy?",
    runtime_resources={"policy_doc": tenant_policy_doc},
)

这种写法让业务模块尽量轻,同时保留 config / blueprint 兼容性。闭包适合短脚本,但服务化推荐模块顶层 named handlers:更容易测试、注册、导出和重载。

当前行为:TriggerFlow 的 module-safe definition assembly 把 TriggerFlow(...) 本身视为规划面,把 create_execution(...) / start_execution(...) 视为进入一次运行的边界。 没有额外的 TriggerFlow.define(...) 模式。服务模块可以安全重放同一段 definition assembly: named function 保持稳定 stage identity;同一个函数承担两个业务 stage 时,用 name=... 显式区分。

对于运行时由模型生成 To-Do List 或依赖图的模型应用,动态图应按 plan 或 request 局部生成。 extract / analyze 这类可复用 sub-flow template 可以放在模块级;per-plan executor 应用 task id 作为动态 stage identity,把 task 结果写入 execution state,并避免修改 main flow definition。

何时用 blueprint

何时用 blueprint

save vs save_blueprint 对照

Flow 定义(chunk、分支、条件)
        │
        ├── save_blueprint()  →  描述图结构的 dict
        │
        ▼
   create_execution()  ────►  一个 Execution
                                  │
                                  ├── save()  →  描述该 execution 状态的 dict
                                  │
                                  ▼
                              async_close() → close snapshot

两条路径都返回 JSON 友好 dict。存储后端(Redis、Postgres、S3、文件)由应用层选 —— 框架不带后端。

实用模式

单服务器恢复

saved = execution.save()
redis.set(f"flow:{exec_id}", json.dumps(saved))

# 后续
saved = json.loads(redis.get(f"flow:{exec_id}"))
restored = flow.create_execution(auto_close=False, runtime_resources={...})
restored.load(saved)

分布式 worker 拉起

把 blueprint(存一次)和 execution save(每个 execution 存一份)配对:

blueprint = source_flow.save_blueprint()
db.save("flow_blueprints", blueprint_id, blueprint)

# worker 中
flow = TriggerFlow(name="loaded")
register_all_handlers(flow)            # 你的注册入口
flow.load_blueprint(db.load("flow_blueprints", blueprint_id))

execution = flow.create_execution(auto_close=False, runtime_resources=...)
execution.load(saved)

另见