Runtime Stream¶
运行态流式输出。
源码¶
import asyncio
from agently import Agently, TriggerFlow, TriggerFlowEventData
## TriggerFlow Streaming: runtime stream + agent streaming in flow
def triggerflow_runtime_stream_demo():
# Idea: stream progress updates without waiting for the final result.
# Flow: put_into_stream steps -> end -> get_runtime_stream
# Expect: prints stream events as they arrive.
flow = TriggerFlow()
async def stream_steps(data: TriggerFlowEventData):
for i in range(3):
data.put_into_stream({"step": i + 1, "status": "working"})
await asyncio.sleep(0.05)
data.stop_stream()
return "done"
flow.to(stream_steps).end()
for event in flow.get_runtime_stream("start"):
print("[stream]", event)
# triggerflow_runtime_stream_demo()
def triggerflow_agent_stream_demo():
# Idea: interactive loop with user input + streamed replies.
# Flow: input -> emit Loop -> stream reply -> emit Loop
# Expect: prints streaming tokens for each user query.
agent = Agently.create_agent()
agent.set_settings(
"OpenAICompatible",
{
"base_url": "http://127.0.0.1:11434/v1",
"model": "qwen2.5:7b",
},
)
flow = TriggerFlow()
async def get_input(data: TriggerFlowEventData):
try:
user_input = input("Question (type 'exit' to stop): ").strip()
except EOFError:
user_input = "exit"
if user_input.lower() == "exit":
data.stop_stream()
return "exit"
data.put_into_stream(f"\n[user] {user_input}\n")
await data.async_emit("UserInput", user_input)
return "next"
async def stream_reply(data: TriggerFlowEventData):
data.put_into_stream("[assistant] ")
try:
request = agent.input(data.value)
async for chunk in request.get_async_generator(type="delta"):
data.put_into_stream(chunk)
data.put_into_stream("\n")
await data.async_emit("Loop", None)
return None
except Exception as exc:
data.put_into_stream(f"\n[error] {exc}\n")
data.stop_stream()
return "error"
# This loop is stream-driven, so we don't set a default result with end().
flow.to(get_input)
flow.when("UserInput").to(stream_reply)
flow.when("Loop").to(get_input)
for event in flow.get_runtime_stream("start", timeout=None):
print(event, end="", flush=True)
triggerflow_agent_stream_demo()
讲解¶
- put_into_stream 推送事件。
- stop_stream 控制结束。
注释解读¶
- Idea 表示案例思路
- Flow 表示执行编排路径
- Expect 表示预期输出或行为
你学会了什么¶
- 掌握 runtime stream 的输出方式
练习任务¶
- 加入 stop_stream 并观察结束行为