Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.
Languages: English · 中文
Two distinct serialization paths exist. Don’t confuse them.
| Method | What it serializes | Typical use |
|---|---|---|
execution.save() / execution.load(saved) |
one execution’s runtime state at a moment in time | resume across process restarts, hand off to another worker |
flow.save_blueprint() / flow.load_blueprint(blueprint) |
the flow definition structure (chunks, branches, conditions) | distribute or version-control a flow as a config artifact |
save() captures everything needed to resume the execution where it stopped:
statepause_for(...) was hit)resource_keys — the names of runtime resources expected on resume, but not the live valuesWhat it does not capture:
runtime_resources themselves (they’re not serializable; see State and Resources)execution = flow.create_execution(auto_close=False)
await execution.async_start("refund request")
saved_state = execution.save()
# persist saved_state somewhere (Redis, DB, file, etc.)
Restore later (possibly in a different process):
restored = flow.create_execution(
auto_close=False,
runtime_resources={"db": new_db_client, "logger": new_logger},
)
restored.load(saved_state)
# Continue: emit, continue_with an interrupt, then close
await restored.async_emit("UserFeedback", {"approved": True})
snapshot = await restored.async_close()
The flow definition must be the same flow (or compatible) on both sides — load() doesn’t reconstruct the chunk graph from saved_state; it expects the flow to already exist.
execution = flow.create_execution(auto_close=False)
await execution.async_start("topic")
# at this point the flow may have called pause_for(...)
saved = execution.save()
# ... days later, in a different worker ...
restored = flow.create_execution(
auto_close=False,
runtime_resources={"search_tool": new_search_function},
)
restored.load(saved)
interrupt_id = next(iter(restored.get_pending_interrupts()))
await restored.async_continue_with(interrupt_id, {"approved": True})
snapshot = await restored.async_close()
get_pending_interrupts() returns ids of interrupts created via pause_for(...). continue_with(id, payload) resolves one interrupt and resumes the graph according to that interrupt’s resume_to target.
A blueprint serializes the structure of the flow — chunk references, branches, conditions — but not the chunk function bodies (those stay in code).
def upper(data):
return str(data.input).upper()
def store(data):
return data.async_set_state("output", data.input)
source = TriggerFlow(name="source")
source.register_chunk_handler(upper)
source.register_chunk_handler(store)
source.to(upper).to(store)
blueprint = source.save_blueprint() # dict, can be JSON / YAML serialized
Restore on the other end:
restored = TriggerFlow(name="restored")
restored.register_chunk_handler(upper) # same function bodies must be available
restored.register_chunk_handler(store)
restored.load_blueprint(blueprint)
Key constraint: any chunk used in the blueprint must be registered by the same handler name on the restored side. Without register_chunk_handler(...), the loader can’t bind names to functions and the load fails.
For service code, prefer this packaging shape:
TriggerFlow(...) object as the flow definition surface.flow.update_runtime_resources(...).runtime_resources={...}.state, not in flow_data.async def analyze(data):
agent_factory = data.require_resource("agent_factory")
prompts_path = data.require_resource("prompts_path")
question = data.input
data.set_state("question", question)
agent = agent_factory()
return agent.load_yaml_prompt(
prompts_path,
prompt_key_path="analyze",
mappings={"question": question},
).start()
async def answer(data):
policy_doc = data.require_resource("policy_doc")
question = data.get_state("question")
return f"{policy_doc}\n\nQ: {question}"
def build_policy_flow() -> TriggerFlow:
flow = TriggerFlow(name="policy")
flow.update_runtime_resources(
agent_factory=make_agent,
prompts_path=PROMPTS_DIR / "policy.yaml",
)
flow.to(analyze).to(answer)
return flow
flow = build_policy_flow()
snapshot = flow.start(
"travel subsidy?",
runtime_resources={"policy_doc": tenant_policy_doc},
)
This keeps business modules light while preserving config/blueprint compatibility. Closures are fine for short scripts, but named top-level handlers are the recommended service shape because they are easier to test, register, export, and reload.
Current behavior: TriggerFlow’s module-safe definition assembly treats
TriggerFlow(...) itself as the planning surface and create_execution(...) /
start_execution(...) as the boundary into one run. There is no separate
TriggerFlow.define(...) mode. Service modules can replay the same definition
assembly safely: named functions keep stable stage identities, and the same
function used as two logical stages should be disambiguated with name=....
For model applications that generate a To-Do List or dependency graph at runtime, keep that graph per plan or per request. Reusable templates such as extract / analyze sub-flows belong at module scope; the per-plan executor should use task ids as dynamic stage identities, write task results to execution state, and avoid mutating the main flow definition.
Flow definition (chunks, branches, conditions)
│
├── save_blueprint() → dict describing graph structure
│
▼
create_execution() ────► one Execution
│
├── save() → dict describing this execution's state
│
▼
async_close() → close snapshot
Both paths return JSON-friendly dicts. Pick storage (Redis, Postgres, S3, file) at the application level — the framework doesn’t ship a backend.
Single-server resume
saved = execution.save()
redis.set(f"flow:{exec_id}", json.dumps(saved))
# later
saved = json.loads(redis.get(f"flow:{exec_id}"))
restored = flow.create_execution(auto_close=False, runtime_resources={...})
restored.load(saved)
Distributed worker pickup
Pair a blueprint (stored once) with an execution save (stored per execution):
blueprint = source_flow.save_blueprint()
db.save("flow_blueprints", blueprint_id, blueprint)
# in worker
flow = TriggerFlow(name="loaded")
register_all_handlers(flow) # whatever your registration entry is
flow.load_blueprint(db.load("flow_blueprints", blueprint_id))
execution = flow.create_execution(auto_close=False, runtime_resources=...)
execution.load(saved)
pause_for / continue_with, the most common reason to save