Agently Docs

Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.

View the Project on GitHub AgentEra/Agently

Dynamic Task

Dynamic Task is a first-class Agently task surface for model-generated or app-generated DAGs. It exposes a compact app-facing API, validates a TaskDAG, resolves task handlers, and compiles the graph to ordinary TriggerFlow execution as an implementation substrate.

task = Agently.create_dynamic_task(target="review policy")
result = await task.async_start()

When the caller already has a plan, pass the TaskDAG directly and skip model planning:

async def local_handler(context):
    return {
        "task_id": context.task.id,
        "deps": dict(context.dependency_results),
    }

task = Agently.create_dynamic_task(
    target="review policy",
    plan={
        "graph_id": "review",
        "task_schema_version": "task_dag/v1",
        "tasks": [
            {"id": "extract", "kind": "local", "binding": "local_handler"},
            {
                "id": "final",
                "kind": "local",
                "binding": "local_handler",
                "depends_on": ["extract"],
            },
        ],
        "semantic_outputs": {"final": "final"},
    },
    handlers={"local_handler": local_handler},
)
snapshot = await task.async_start(timeout=10)

Submitted DAG inputs may reference runtime data with placeholders. A whole string placeholder preserves the original value type; embedded placeholders are rendered into the surrounding string. Slot names are case-insensitive, but docs use uppercase:

plan = {
    "graph_id": "review",
    "task_schema_version": "task_dag/v1",
    "tasks": [
        {"id": "lookup", "kind": "local", "binding": "local_handler"},
        {
            "id": "final",
            "kind": "local",
            "binding": "local_handler",
            "depends_on": ["lookup"],
            "inputs": {
                "account": "${INIT.account}",
                "ticket": "${DEPS.lookup.ticket}",
                "summary": "Ticket ${STATE.task_results.lookup.ticket.id} for ${INIT.account}",
            },
        },
    ],
}

${INIT} points at the submitted graph input / initial execution input. ${DEPS...} points at completed dependency results. ${STATE...} reads execution state, for example ${STATE.task_results.lookup}. ${TRIGGER...} points at the raw TriggerFlow trigger payload (data.value) and is mainly for advanced debugging or executor-level integrations. Missing runtime paths fail closed during task execution instead of staying as unresolved strings.

When a submitted DAG runs through agent.use_dynamic_task(...).create_execution(), ${INIT...} first reads an explicit use_dynamic_task(graph_input=...) value. If that argument is omitted, it reads the execution prompt snapshot input slot captured by create_execution(). Only when neither source exists does the Agent route fall back to {"target": task_target}.

If create_dynamic_task(..., output_schema=..., ensure_keys=...) supplies the frontstage contract for a semantic-output model node, that host contract wins over an incompatible planner-chosen node format. For multi-field structured contracts, a planner’s inputs.output_format="flat_markdown" is coerced back to auto so the output parser can choose a compatible structured format.

Submitted plans can also be kept as YAML or JSON config artifacts. Load the config into TaskDAG, then pass it through the same facade:

from agently.core import TaskDAG

graph = TaskDAG.from_yaml("examples/dynamic_task/config_policy_review.yaml")
task = Agently.create_dynamic_task(
    target="review policy",
    plan=graph,
    handlers={"local_handler": local_handler},
)
snapshot = await task.async_run(graph_input={"doc": "policy"}, timeout=10)

TaskDAG.from_json(...) accepts a file path or raw JSON/JSON5 content. Both from_yaml(...) and from_json(...) support task_dag_key_path="plans.review" for selecting one DAG inside a larger config file. Use graph.get_yaml(path) or graph.get_json(path) to export a normalized graph.

Agent instances expose the same facade:

task = agent.create_dynamic_task(target="review policy")

Agent prompt methods are configuration. agent.create_dynamic_task() consumes the same prompt snapshot as agent.start() / agent.create_execution():

task = (
    agent
    .info({"customer": "Acme"})
    .instruct("Focus on renewal risk and account-team actions.")
    .input({"account": "Acme", "ticket": "T-42"})
    .output({
        "summary": (str, "risk summary", True),
        "risks": ([str], "risk bullets", True),
    }, format="json")
    .create_dynamic_task()
)

The prompt snapshot is rendered through the normal Prompt generator to become the Dynamic Task target. The output slot becomes the facade-level output_schema, and output_format becomes the default model-task format. set_agent_prompt(...) / always=True values are inherited; request prompt values from set_request_prompt(...) / quick prompt calls are frozen into the new task and then cleared from the pending request. Explicit create_dynamic_task(target=..., output_schema=..., output_format=...) arguments override prompt-derived defaults.

For model tasks, use Agently’s request output pipeline instead of parsing model text in handlers or examples. output_schema applies to semantic output model tasks; node-level inputs.output_schema can override it for a specific model task. Each model task may also set inputs.output_format:

task = Agently.create_dynamic_task(
    target="write an incident briefing",
    output_schema={
        "brief": (str, "customer-facing briefing", True),
        "next_update": (str, "next update timing", True),
    },
)
snapshot = await task.async_start(timeout=120)
_, output = next(iter(snapshot["semantic_outputs"].items()))
brief = output["result"]["brief"]

For submitted DAGs, put the task-specific strategy on the model task itself:

{
    "id": "render_html",
    "kind": "model",
    "inputs": {
        "output_schema": {"html": (str, "render-ready HTML", True)},
        "output_format": "flat_markdown",
    },
}

Submitted DAG placeholders use the same uppercase naming style as Prompt references, but they are a TriggerFlow runtime namespace rather than Prompt slot references. ${INIT.foo} points at initial input, ${DEPS.task.path} points at completed dependency results, ${STATE.task_results.task.path} points at execution state, and ${TRIGGER.result} points at the raw TriggerFlow trigger payload. In DAG task inputs, whole-string placeholders preserve the original runtime value type; embedded placeholders stringify into the surrounding text.

Architecture

Dynamic Task is split into four stages:

bindings is not part of the public facade. Use handlers for custom local functions. Use explicit resource slots such as planner, model, actions, and skills when a task may use them; actions and skills are not exposed to the planner unless passed by the caller.

Resolver Semantics

Custom handlers should use clear names ending in _handler:

task = Agently.create_dynamic_task(
    target="review policy",
    plan=task_dag,
    handlers={"risk_check_handler": risk_check_handler},
)

In the DAG:

{"id": "check_risk", "kind": "local", "binding": "risk_check_handler"}

Unknown optional handlers may be safely pruned by the validator when they do not affect required semantic outputs, required downstream nodes, approvals, or side-effect policy. Pruned nodes are recorded in diagnostics; unknown required handlers fail closed before execution.

Lower-Level Control

Use the low-level classes only when a framework integration needs staged control:

from agently.builtins.plugins import AgentlyTaskDAGPlanner
from agently.core import DynamicTaskResolver, TaskDAGExecutor, TaskDAGValidator

resolver = DynamicTaskResolver({"risk_check_handler": risk_check_handler})
validator = TaskDAGValidator(resolver)
planner = AgentlyTaskDAGPlanner(validator=validator)

graph = await planner.async_plan(planner_agent, {"target": "review policy"})
validation = validator.validate(graph, strict_schema_version=True)
snapshot = await TaskDAGExecutor(resolver, validator=validator).async_run(graph)

The executor does not depend on Agent. Model and action access belong to the facade or resolver adapters, while TriggerFlow remains the execution substrate under Dynamic Task rather than the owner API.

Examples

Use the examples in examples/dynamic_task/ by layer: