Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.
Dynamic Task is a first-class Agently task surface for model-generated or
app-generated DAGs. It exposes a compact app-facing API, validates a TaskDAG,
resolves task handlers, and compiles the graph to ordinary TriggerFlow
execution as an implementation substrate.
task = Agently.create_dynamic_task(target="review policy")
result = await task.async_start()
When the caller already has a plan, pass the TaskDAG directly and skip model
planning:
async def local_handler(context):
return {
"task_id": context.task.id,
"deps": dict(context.dependency_results),
}
task = Agently.create_dynamic_task(
target="review policy",
plan={
"graph_id": "review",
"task_schema_version": "task_dag/v1",
"tasks": [
{"id": "extract", "kind": "local", "binding": "local_handler"},
{
"id": "final",
"kind": "local",
"binding": "local_handler",
"depends_on": ["extract"],
},
],
"semantic_outputs": {"final": "final"},
},
handlers={"local_handler": local_handler},
)
snapshot = await task.async_start(timeout=10)
Submitted DAG inputs may reference runtime data with placeholders. A whole
string placeholder preserves the original value type; embedded placeholders are
rendered into the surrounding string. Slot names are case-insensitive, but docs
use uppercase:
plan = {
"graph_id": "review",
"task_schema_version": "task_dag/v1",
"tasks": [
{"id": "lookup", "kind": "local", "binding": "local_handler"},
{
"id": "final",
"kind": "local",
"binding": "local_handler",
"depends_on": ["lookup"],
"inputs": {
"account": "${INIT.account}",
"ticket": "${DEPS.lookup.ticket}",
"summary": "Ticket ${STATE.task_results.lookup.ticket.id} for ${INIT.account}",
},
},
],
}
${INIT} points at the submitted graph input / initial execution input.
${DEPS...} points at completed dependency results. ${STATE...} reads
execution state, for example ${STATE.task_results.lookup}. ${TRIGGER...}
points at the raw TriggerFlow trigger payload (data.value) and is mainly for
advanced debugging or executor-level integrations. Missing runtime paths fail
closed during task execution instead of staying as unresolved strings.
When a submitted DAG runs through agent.use_dynamic_task(...).create_execution(),
${INIT...} first reads an explicit use_dynamic_task(graph_input=...) value.
If that argument is omitted, it reads the execution prompt snapshot input slot
captured by create_execution(). Only when neither source exists does the Agent
route fall back to {"target": task_target}.
If create_dynamic_task(..., output_schema=..., ensure_keys=...) supplies the
frontstage contract for a semantic-output model node, that host contract wins
over an incompatible planner-chosen node format. For multi-field structured
contracts, a planner’s inputs.output_format="flat_markdown" is coerced back to
auto so the output parser can choose a compatible structured format.
Submitted plans can also be kept as YAML or JSON config artifacts. Load the
config into TaskDAG, then pass it through the same facade:
from agently.core import TaskDAG
graph = TaskDAG.from_yaml("examples/dynamic_task/config_policy_review.yaml")
task = Agently.create_dynamic_task(
target="review policy",
plan=graph,
handlers={"local_handler": local_handler},
)
snapshot = await task.async_run(graph_input={"doc": "policy"}, timeout=10)
TaskDAG.from_json(...) accepts a file path or raw JSON/JSON5 content. Both
from_yaml(...) and from_json(...) support task_dag_key_path="plans.review"
for selecting one DAG inside a larger config file. Use graph.get_yaml(path)
or graph.get_json(path) to export a normalized graph.
Agent instances expose the same facade:
task = agent.create_dynamic_task(target="review policy")
Agent prompt methods are configuration. agent.create_dynamic_task() consumes
the same prompt snapshot as agent.start() / agent.create_execution():
task = (
agent
.info({"customer": "Acme"})
.instruct("Focus on renewal risk and account-team actions.")
.input({"account": "Acme", "ticket": "T-42"})
.output({
"summary": (str, "risk summary", True),
"risks": ([str], "risk bullets", True),
}, format="json")
.create_dynamic_task()
)
The prompt snapshot is rendered through the normal Prompt generator to become
the Dynamic Task target. The output slot becomes the facade-level
output_schema, and output_format becomes the default model-task format.
set_agent_prompt(...) / always=True values are inherited; request prompt
values from set_request_prompt(...) / quick prompt calls are frozen into the
new task and then cleared from the pending request. Explicit
create_dynamic_task(target=..., output_schema=..., output_format=...)
arguments override prompt-derived defaults.
For model tasks, use Agently’s request output pipeline instead of parsing model
text in handlers or examples. output_schema applies to semantic output model
tasks; node-level inputs.output_schema can override it for a specific model
task. Each model task may also set inputs.output_format:
json: compact machine-control outputs, action arguments, routing flags,
numeric or boolean facts, model judges, dense nested arrays/objects, and
strict extraction.flat_markdown: flat string fields with long HTML, Markdown, code, SVG,
SQL, templates, or report sections.hybrid: explicit opt-in for long prose with structured lists, tables,
citations, metadata, or nested evidence when retry latency is acceptable.auto: structural schema-driven selection when retry latency is acceptable.task = Agently.create_dynamic_task(
target="write an incident briefing",
output_schema={
"brief": (str, "customer-facing briefing", True),
"next_update": (str, "next update timing", True),
},
)
snapshot = await task.async_start(timeout=120)
_, output = next(iter(snapshot["semantic_outputs"].items()))
brief = output["result"]["brief"]
For submitted DAGs, put the task-specific strategy on the model task itself:
{
"id": "render_html",
"kind": "model",
"inputs": {
"output_schema": {"html": (str, "render-ready HTML", True)},
"output_format": "flat_markdown",
},
}
Submitted DAG placeholders use the same uppercase naming style as Prompt
references, but they are a TriggerFlow runtime namespace rather than Prompt
slot references. ${INIT.foo} points at initial input, ${DEPS.task.path}
points at completed dependency results, ${STATE.task_results.task.path} points
at execution state, and ${TRIGGER.result} points at the raw TriggerFlow
trigger payload. In DAG task inputs, whole-string placeholders preserve the
original runtime value type; embedded placeholders stringify into the
surrounding text.
Dynamic Task is split into four stages:
AgentlyTaskDAGPlanner generates deterministic TaskDAG data with Agently
output schema, ensure_keys, and validation retry.TaskDAGValidator validates DAG syntax, dependencies, schema version,
semantic outputs, side-effect policy, and resolver availability.DynamicTaskResolver maps task.binding, task.id, then task.kind to a
runnable handler.TaskDAGExecutor compiles the validated DAG to ordinary TriggerFlow chunks
and runs it through TriggerFlow lifecycle, stream, pause/resume, result, and
runtime resource mechanics.bindings is not part of the public facade. Use handlers for custom local
functions. Use explicit resource slots such as planner, model, actions,
and skills when a task may use them; actions and skills are not exposed to
the planner unless passed by the caller.
Custom handlers should use clear names ending in _handler:
task = Agently.create_dynamic_task(
target="review policy",
plan=task_dag,
handlers={"risk_check_handler": risk_check_handler},
)
In the DAG:
{"id": "check_risk", "kind": "local", "binding": "risk_check_handler"}
Unknown optional handlers may be safely pruned by the validator when they do not
affect required semantic outputs, required downstream nodes, approvals, or
side-effect policy. Pruned nodes are recorded in diagnostics; unknown required
handlers fail closed before execution.
Use the low-level classes only when a framework integration needs staged control:
from agently.builtins.plugins import AgentlyTaskDAGPlanner
from agently.core import DynamicTaskResolver, TaskDAGExecutor, TaskDAGValidator
resolver = DynamicTaskResolver({"risk_check_handler": risk_check_handler})
validator = TaskDAGValidator(resolver)
planner = AgentlyTaskDAGPlanner(validator=validator)
graph = await planner.async_plan(planner_agent, {"target": "review policy"})
validation = validator.validate(graph, strict_schema_version=True)
snapshot = await TaskDAGExecutor(resolver, validator=validator).async_run(graph)
The executor does not depend on Agent. Model and action access belong to the facade or resolver adapters, while TriggerFlow remains the execution substrate under Dynamic Task rather than the owner API.
Use the examples in examples/dynamic_task/ by layer:
01_dynamic_task_basic.py: submitted TaskDAG smoke example with local
handlers only.02_support_response_module_model.py: model-powered support module with a
simple SupportResponseModule.respond(ticket) facade, backend fan-out/join
stages, structured model outputs, mocked business-system lookups, and a
printed customer-facing response.03_contract_risk_review_business.py: contract risk review business example
with a simple ContractRiskReviewService.review(contract) facade,
deterministic local handlers, backend risk scoring, and a printed risk memo.04_incident_briefing_auto_plan.py: auto-planned incident briefing example
with a simple IncidentBriefingService.brief(report) facade. The model
creates the TaskDAG; Dynamic Task validates and executes it, while the
frontstage briefing shape is enforced through Agently output_schema.05_enterprise_renewal_complex_auto_plan.py: complex auto-planned renewal
example where the model planner creates several independent analysis roots,
joins them into a synthesis stage, and produces a structured recovery package.06_dynamic_task_config_plan.py: submitted TaskDAG loaded from YAML config
through TaskDAG.from_yaml(...).