Agently Docs

Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.

View the Project on GitHub AgentEra/Agently

State and Resources

Languages: English · 中文

A TriggerFlow execution carries three distinct storage layers. They look similar but solve different problems. Mixing them is a common source of subtle bugs.

Three layers at a glance

  state flow_data runtime_resources
Scope execution-local flow-shared (across all executions) execution-local
Serializable yes yes no
Goes into close snapshot yes no no, only resource_keys recorded
Goes into save / load checkpoints yes no no, must be re-injected after load()
Recommended for business state, intermediate values, anything you want back from close() legacy compatibility / explicitly intentional flow-wide sharing live clients, sockets, callbacks, file handles, cache references
Status recommended primary path risky-default — emits RuntimeWarning on every call new concept — use this for anything that can’t be serialized

state — the main path

State is execution-local, serializable, and snapshot-safe. It’s what populates the close snapshot and what save() / load() round-trip.

async def step(data: TriggerFlowRuntimeData):
    await data.async_set_state("greeting", f"hello {data.input}")
    current = data.get_state("greeting")

API:

Reading state is a local sync operation. Writes, appends, and deletes have async variants so async chunks can stay async-first.

Whatever you put in state at the time of close() shows up in the close snapshot.

flow_data — risky shared scope

flow_data is shared across every execution of the same flow. That sounds convenient until you have:

Because of this, every call emits a RuntimeWarning:

flow.set_flow_data("counter", 0)            # RuntimeWarning
flow.set_flow_data("counter", 0, no_warning=True)   # silenced

If you really mean shared scope (read-only config, a long-running cache that all executions are intentionally sharing), pass no_warning=True. For execution-local data — which is what 99% of code wants — use state instead.

API (each emits the warning unless suppressed):

runtime_resources — live objects

Some things can’t go into state because they can’t be serialized: database clients, callback functions, sockets, in-memory caches, anything with a file descriptor or live network connection. Those live in runtime_resources.

Inject at execution creation:

execution = flow.create_execution(
    runtime_resources={
        "db": my_db_client,
        "logger": my_logger,
        "search_tool": search_function,
    },
)

Or update on the flow itself (default for all executions of that flow):

flow.update_runtime_resources(logger=my_logger)

Inside a chunk:

async def step(data: TriggerFlowRuntimeData):
    logger = data.require_resource("logger")
    logger.info(f"received: {data.input}")
    db = data.require_resource("db")
    rows = await db.fetch("SELECT 1")

require_resource(name) raises if the resource isn’t injected — use it when the chunk genuinely depends on the resource. There’s also data.get_resource(name, default=None) for optional cases.

Why resources don’t enter the snapshot

A close snapshot is supposed to be a serializable dict. Live objects can’t survive serialization (no meaningful representation, no way to reconstruct the live state on the other side). What the snapshot does record is resource_keys — the names of resources the execution had — so you know what to re-inject on resume:

saved = execution.save()
# saved contains state, lifecycle metadata, interrupt state, and resource_keys
# but NOT the live objects themselves

restored = flow.create_execution(
    auto_close=False,
    runtime_resources={"db": new_db_client, "logger": new_logger, "search_tool": search_function},
)
restored.load(saved)

The caller is responsible for re-injecting compatible resources after load().

Managed execution resources

runtime_resources can also receive managed resources from Agently.execution_environment when you pass execution_environments=[...] to flow.create_execution(...), flow.start_execution(...), or flow.async_start(...).

Those resources are still read inside chunks through data.require_resource(...). The difference is ownership: the Execution Environment Manager starts/reuses the resource and releases it when the execution closes. Manually passed runtime_resources={...} remain unmanaged.

Decision table

You’re storing Use
A number, string, dict, list, or other JSON-friendly value that the close snapshot should include state
A pydantic model, dataclass, or anything serializable to dict state
A database client, HTTP client, websocket runtime_resources
A function or callback runtime_resources
An in-memory cache that should survive across executions of the same flow runtime_resources injected at the flow level (and accept that resources don’t survive process restarts unless you re-inject)
Configuration shared across executions, intentionally global flow_data with no_warning=True, or runtime_resources if it isn’t serializable

Common mistakes

See also