并发¶
批量与 for_each 并发控制。
源码¶
import asyncio
from agently import TriggerFlow, TriggerFlowEventData
## TriggerFlow Concurrency: batch + for_each
def triggerflow_concurrency():
# Idea: show concurrency limits for batch and for_each.
# Flow: batch(echo, concurrency=2) -> print, then for_each(list, concurrency=2) -> print
# Expect: outputs two results with interleaved timing.
flow = TriggerFlow()
async def echo(data: TriggerFlowEventData):
await asyncio.sleep(0.1)
return f"echo: {data.value}"
# batch: run chunks in parallel with a concurrency limit
flow.batch(
("a", echo),
("b", echo),
("c", echo),
concurrency=2,
).end()
result = flow.start("hello")
print(result)
# for_each: process list items with concurrency control
flow_2 = TriggerFlow()
(
flow_2.to(lambda _: [1, 2, 3, 4])
.for_each(concurrency=2)
.to(echo)
.end_for_each()
.end()
)
result_2 = flow_2.start()
print(result_2)
# triggerflow_concurrency()
讲解¶
- 使用 concurrency 进行上限控制。
注释解读¶
- Idea 表示案例思路
- Flow 表示执行编排路径
- Expect 表示预期输出或行为
你学会了什么¶
- 理解 batch/for_each 并发控制
- 知道 concurrency 上限的作用
练习任务¶
- 把 concurrency 改成 1 观察耗时变化