大家好,我是十三!欢迎来到十三Tech。
AI Agent 的核心工程挑战之一是长周期、多轮深度交互:用户上传文档后让 AI 总结,然后要确认满意度、补充反馈、再让 AI 继续生成。传统无状态 HTTP 在"中途暂停等输入"面前毫无办法。Coze Studio 的工作流引擎给了一套相当优雅的答案——允许开发者用拖拽搭建可暂停、可等待、可从断点恢复的复杂流程。这篇文章拆开它的后端源码,重点回答三个问题:API 怎么支撑长连接流式通信、可中断工作流在整洁架构里如何流转、引擎怎么用状态持久化实现执行现场的"冻结"与"解冻"。
1. 通信的基石:基于 SSE 的可中断流式 API
服务端要"随时推送进度、必要时暂停等输入",传统一问一答的 HTTP 显然不够用。Coze 的选择是 SSE (Server-Sent Events)。
传统模式下客户端发请求后只能死等,超时就崩;SSE 模式下客户端建立一条长连接,服务端可以持续推 message 事件、需要时抛 interrupt 事件、收到客户端 stream_resume 后继续推送直到 done。服务端拿到主动权,"暂停-恢复"就不再需要客户端轮询。
两个核心 API 支撑这套交互:
POST /v1/workflow/stream_run:启动新工作流实例,建立 SSE 长连接。响应事件类型包括message(进度)、interrupt(中断)、done(结束)、error(错误)。POST /v1/workflow/stream_resume:收到interrupt后,客户端用这个接口提交反馈,必须携带唯一的executeID来定位被暂停的工作流。
为什么是 SSE 而不是 WebSocket——这是个"够用且优雅"的工程决策。SSE 基于标准 HTTP,协议开销小,自带断线重连,对防火墙友好;WebSocket 在这个服务端单向推送的场景下属于杀鸡用牛刀。Coze 在这里把"功能最强"和"最合适"区分得很清楚。
2. 深入引擎:一次请求的生命周期
揭开 API 之后正式进入后端的深水区。一个 stream_run 请求进来,Coze 是怎么一步步把它驱动起来的?
2.1 数据转换:从前端画布到后端可执行图
系统需要把前端可视化画布的 JSON 定义转换成后端可执行的逻辑图,这条转换链路是 Canvas → WorkflowSchema → Workflow。
三个角色很清晰:Canvas 是前端保存的原始 JSON(人能读,机器不能直接跑);WorkflowSchema 是标准化的图结构(Nodes + Edges + Variables);Workflow 是带执行器、上下文和状态的真正可执行实例。衔接前后端的是适配器函数 adaptor.CanvasToWorkflowSchema():
func CanvasToWorkflowSchema(ctx context.Context, canvas *vo.Canvas) (*compose.WorkflowSchema, error) {
// 格式转换:JSON → Go 结构体
nodes := make(map[string]*compose.Node)
for _, canvasNode := range canvas.Nodes {
node, err := convertCanvasNode(canvasNode)
if err != nil {
return nil, err
}
nodes[canvasNode.ID] = node
}
// 语义理解:节点配置 → 执行逻辑
edges := make([]*compose.Edge, 0, len(canvas.Edges))
for _, canvasEdge := range canvas.Edges {
edge := &compose.Edge{
From: canvasEdge.From,
To: canvasEdge.To,
}
edges = append(edges, edge)
}
// 依赖分析:连接关系 → 执行顺序
return &compose.WorkflowSchema{
Nodes: nodes,
Edges: edges,
}, nil
}
2.2 分层架构:请求的优雅流转
Coze 的后端遵循经典的整洁架构,请求在 Handler → Application → Domain 三层之间清晰流转。
Handler 层(api)只做协议转换和数据搬运,完全不碰业务逻辑:
// 路径: coze/coze-studio/backend/api/handler/workflow/openapi.go
func OpenAPIStreamRunFlow(c *app.RequestContext) {
// 1. 解析请求参数
var req workflow.OpenAPIStreamRunFlowRequest
if err := c.BindAndValidate(&req); err != nil {
ResponseError(c, err)
return
}
// 2. 建立 SSE 长连接
w := sse.NewWriter(c)
c.SetContentType("text/event-stream; charset=utf-8")
c.Response.Header.Set("Connection", "keep-alive")
c.Response.Header.Set("Cache-Control", "no-cache")
// 3. 调用业务逻辑
ctx := context.Background()
sr, err := appworkflow.SVC.OpenAPIStreamRun(ctx, &req)
if err != nil {
sendErrorEvent(w, err)
return
}
// 4. 持续转发消息
sendStreamRunSSE(ctx, w, sr)
}
转发循环本身极简——从执行引擎的 StreamReader 读消息,转成 sse.Event 写回客户端,遇到 io.EOF 就收尾:
func sendStreamRunSSE(ctx context.Context, w *sse.Writer, sr *StreamReader) {
defer w.Close()
for {
message, err := sr.Recv()
if err != nil {
if err == io.EOF {
return
}
sendErrorEvent(w, err)
return
}
event := sse.Event{
ID: message.ID,
Type: message.Type, // "message" | "interrupt" | "done" | "error"
Data: message.Data,
}
if err := w.Write(event); err != nil {
return // 客户端断开
}
}
}
Application 层负责业务编排——权限检查、构造执行配置、把领域事件格式转成 API 格式。这里有个值得拆开看的细节:事件转换器让内部领域模型与对外 API 模型解耦。
// 路径: coze/coze-studio/backend/application/workflow/service.go
func (svc *ApplicationService) OpenAPIStreamRun(ctx context.Context, req *Request) (*StreamReader, error) {
// 1. 权限检查
if err := svc.checkPermission(ctx, req.WorkflowID, req.UserID); err != nil {
return nil, errors.Wrap(err, "permission denied")
}
// 2. 制定执行计划
config := &vo.ExecuteConfig{
From: vo.FromSpecificVersion,
Mode: vo.ExecuteModeRelease,
SyncPattern: vo.SyncPatternStream, // 关键:流式执行
WorkflowID: req.WorkflowID,
Parameters: req.Parameters,
UserID: req.UserID,
}
// 3. 委托给领域层
sr, err := svc.domainSVC.StreamExecute(ctx, config)
if err != nil {
return nil, errors.Wrap(err, "domain execute failed")
}
// 4. 事件转换:内部格式 → API 格式
return svc.convertStreamEvents(sr), nil
}
事件转换器的好处是内外隔离:领域层可以自由演进内部数据结构,API 层的契约保持稳定,同时所有日志、监控这类横切关注点都能在转换层统一加。
2.3 领域核心:中断与恢复的实现机制
引擎的心脏在 Domain 层。Coze 在这里做了一个非常聪明的决策:不重新发明图执行引擎,而是基于字节自家的 cloudwego/eino/compose:
import "github.com/cloudwego/eino/compose"
wf, err := compose.NewWorkflow(ctx, workflowSchema, options...)
把精力投在 AI Agent 业务逻辑上(中断、恢复、状态管理),把并行执行、错误处理这些通用图执行能力交给成熟框架。这是"识别核心价值"的典型例子。
核心执行流程串起来长这样:
func (svc *DomainService) StreamExecute(ctx context.Context, config *ExecuteConfig) (*StreamReader, error) {
// 第一步:获取工作流定义
wfEntity, err := svc.repo.GetWorkflow(ctx, config.WorkflowID)
if err != nil {
return nil, errors.Wrap(err, "get workflow failed")
}
// 第二步:Canvas JSON → WorkflowSchema
canvas := &vo.Canvas{}
if err := sonic.UnmarshalString(wfEntity.Canvas, canvas); err != nil {
return nil, errors.Wrap(err, "unmarshal canvas failed")
}
workflowSchema, err := adaptor.CanvasToWorkflowSchema(ctx, canvas)
if err != nil {
return nil, errors.Wrap(err, "convert canvas to schema failed")
}
// 第三步:WorkflowSchema → 可执行的 Workflow
workflow, err := svc.createExecutableWorkflow(ctx, workflowSchema, config)
if err != nil {
return nil, errors.Wrap(err, "create workflow failed")
}
// 第四步:开始执行
return svc.startStreamExecution(ctx, workflow, config.Parameters)
}
流式执行的秘密:生产者-消费者模式
工作流要"后台异步执行"同时"实时推送进度",Coze 用 内存管道 Pipe 把这两件事解耦:
schema.Pipe[*entity.Message](10) 返回一对读写端 sr/sw,缓冲区为 10。后台 goroutine 跑 workflow.AsyncRun 作为生产者,每个节点的输出 sw.Send 写入管道;上层从 sr.Recv() 读消息转发给 SSE 客户端,作为消费者。错误也走同一条管道——这就是为什么 Handler 那个 for 循环里 io.EOF 是正常结束、其他 err 都会被转成 error 事件。
func (svc *DomainService) startStreamExecution(ctx context.Context, workflow *Workflow, params map[string]any) (*StreamReader, error) {
// 创建内存管道:sr 读端,sw 写端
sr, sw := schema.Pipe[*entity.Message](10)
// 把写端注入工作流执行器
workflow, err := compose.NewWorkflow(ctx, schema,
compose.WithStreamWriter(sw))
if err != nil {
sw.Close()
return nil, err
}
// 启动异步执行(生产者)
go func() {
defer sw.Close()
if err := workflow.AsyncRun(ctx, params); err != nil {
sw.Send(&entity.Message{
Type: "error",
Data: err.Error(),
})
}
}()
// 立即返回读端给上层(消费者)
return sr, nil
}
中断与恢复:状态快照的魔法
这是整套架构最关键的特性——工作流执行到一半要"冻结",等用户输入后再"解冻"续上。机制看下面这张图:
中断靠异常驱动。问答节点(QuestionAnswerNode)的唯一职责就是构造一份完整的状态快照,然后抛 InterruptException:
// 问答节点执行时
func (node *QuestionAnswerNode) Execute(ctx context.Context, input map[string]any) (map[string]any, error) {
return nil, &InterruptException{
EventID: generateEventID(),
Question: node.Config.Question,
State: svc.captureCurrentState(ctx), // 当前执行状态快照
ExecuteID: ctx.Value("execute_id").(string),
}
}
// 状态快照的数据结构
type ExecutionState struct {
WorkflowID string `json:"workflow_id"`
ExecuteID string `json:"execute_id"`
CurrentNodeID string `json:"current_node_id"`
Variables map[string]interface{} `json:"variables"`
NodeStates map[string]interface{} `json:"node_states"`
CreatedAt time.Time `json:"created_at"`
}
执行引擎捕获异常后做两件事:把 State 持久化到数据库,然后把中断事件通过 streamWriter 推给客户端:
func (executor *WorkflowExecutor) executeNode(ctx context.Context, node Node, input map[string]any) error {
result, err := node.Execute(ctx, input)
if err != nil {
if interruptErr, ok := err.(*InterruptException); ok {
// 遇到中断:保存状态
if err := executor.saveExecutionState(ctx, interruptErr.ExecuteID, interruptErr.State); err != nil {
return errors.Wrap(err, "save execution state failed")
}
// 推送中断事件给前端
executor.streamWriter.Send(&entity.Message{
Type: "interrupt",
EventID: interruptErr.EventID,
Data: interruptErr.Question,
ExecuteID: interruptErr.ExecuteID,
})
return interruptErr // 暂停执行
}
return errors.Wrap(err, "node execution failed")
}
return executor.executeNextNode(ctx, result)
}
恢复靠 ExecuteID 定位。stream_resume 接口拿到用户回答后,按 ID 从数据库加载快照、重建 Workflow 实例、注入用户回答和历史变量、调用 ResumeFrom 从断点继续:
func (svc *DomainService) StreamResume(ctx context.Context, req *ResumeRequest) (*StreamReader, error) {
// 1. 找到被冻结的执行状态
executionState, err := svc.repo.GetExecutionState(ctx, req.ExecuteID)
if err != nil {
return nil, errors.Wrap(err, "get execution state failed")
}
// 2. 重新构建执行环境
workflow, err := svc.rebuildWorkflow(ctx, executionState)
if err != nil {
return nil, errors.Wrap(err, "rebuild workflow failed")
}
// 3. 注入用户回答,从中断点继续
resumeInput := map[string]any{
"user_answer": req.UserAnswer,
"_variables": executionState.Variables,
}
return workflow.ResumeFrom(executionState.CurrentNodeID, resumeInput)
}
这套设计的精妙之处在三点:状态外置(不依赖内存,进程崩了也能恢复)、精确恢复(从任意中断点续上,不重跑已完成节点)、多次交互(支持多轮问答,每次都能准确恢复)。把"中断"当作正常的业务流程而不是错误情况,用异常做控制流——这让暂停和正常执行走的是同一条路径。
节点系统:插件化的统一接口
所有节点都遵循同一个接口,差异只在 Execute 内部:
type Node interface {
Execute(ctx context.Context, input map[string]any) (output map[string]any, err error)
Stream(ctx context.Context, input map[string]any) (*StreamReader, error)
}
LLMNode 调大模型并流式输出,QuestionAnswerNode 抛中断异常,CodeNode 在沙箱里跑用户代码(详见本系列的代码块节点文章)。上一个节点的输出自动成为下一个节点的输入——这种"搭积木"的设计让复杂业务逻辑的可组合性非常高。
3. 实践串联:一次完整交互的生命周期
把"智能文档助手"这个场景放进这套架构跑一遍,所有概念就串起来了。
整个链路分三阶段:
阶段一 · 启动执行:用户发起 stream_run 上传文档,Handler 建立 SSE 长连接,Application 把 SyncPattern 设为 Stream 后调领域服务;Domain 把 Canvas 转 Workflow,用 Pipe + 后台 goroutine 异步执行;LLM 节点的"分析中..."进度通过管道推回客户端。
阶段二 · 中断暂停:执行到问答节点时,节点抛出携带 ExecutionState 的 InterruptException;引擎把 CurrentNodeID、Variables、NodeStates 全部持久化到数据库,然后推送 interrupt 事件携带问题——客户端收到"总结是否满意?",工作流挂起。
阶段三 · 反馈恢复:用户提交 stream_resume 带 executeID 和回答;Domain 按 ID 加载快照、RestoreState 恢复 NodeStates、把用户回答和历史变量注入 resumeInput,然后 ResumeFrom("confirm", ...) 从断点续上。后续节点继续产出 message 事件,最终推 done 结束。
这种交互在传统 HTTP 模式下根本写不出来——你没法在 callLLM(doc) 之后突然"等用户确认"。Coze 用 SSE + Pipe + 状态快照的组合,把"长周期 + 可干预 + 状态保持"变成了工程上可解的题。
4. 结论:架构是体验的基石
读完 Coze 工作流源码,能提炼出三条可直接迁移的工程思想:
- 流式通信是长周期交互的基础设施。
SSE + 异步管道 + 生产者消费者这个组合,让服务端掌握主动权,把"暂停-恢复"从协议层面就内建进去。 - 状态外置让中断恢复成为工程实践。异常驱动 + 状态快照 + 精确恢复三位一体,执行现场可以像冰箱里的食物一样存取——这套机制不靠魔法,靠的是把状态从内存搬到数据库。
- 在正确的地方创新。Coze 没有重新发明图执行引擎,而是基于成熟的
eino/compose,把精力集中在中断、恢复、状态管理这些真正构成差异化能力的地方。
这套架构对任何要支持长周期人机协作的系统都有参考价值——AI Agent 只是它的载体。把它拆解透,能让我们在面对类似需求时少走很多弯路。
总结
Coze 工作流引擎的架构是现代分布式系统的一个完整范本:SSE 流式通信打破传统请求-响应的束缚、状态外置让中断恢复成为可靠工程、分层解耦保证了系统的可扩展性。这些设计思想不只适用于 AI Agent 工作流,对任何需要长周期交互的系统都有直接参考价值。
Make Open Source Great Again!
关于十三Tech
资深服务端研发工程师,AI 编程实践者。 专注分享真实的技术实践经验,相信 AI 是程序员的最佳搭档。 希望能和大家一起写出更优雅的代码!
联系方式:569893882@qq.com GitHub:@TriTechAI 微信:TriTechAI(备注:十三 Tech)
