SSE 流式响应在 AI 对话系统中的使用

1. 对应简历段落

这篇文章对应简历中“在内部 AI 客服系统中基于 SSE 实现大模型对话流式响应,支持模型 token 增量输出、工具调用状态推送、RAG 引用展示、异常中断处理和前端会话体验优化”的经历。

面试时这部分不要只说“用了 SseEmitter”。资深 Java / AI 工程化面试会继续问:为什么选择 SSE 而不是 WebSocket,流式输出如何和工具调用结合,断线如何处理,后端线程怎么释放,网关和 Nginx 会不会缓冲,怎么做超时、取消、审计和敏感词拦截。

可以把项目描述成:

在 AI 客服对话链路中设计 SSE 流式响应协议,将模型 token、检索状态、工具调用进度、引用来源和最终审计结果以事件流形式返回前端,提升坐席等待体验,并在 Java 后端实现连接生命周期管理、异常兜底、用户取消、超时控制和代理缓冲配置。

这段经历的价值在于把“模型慢”变成“用户可感知的渐进式反馈”。

2. 业务背景

AI 客服系统和传统业务系统最大的体验差异之一是响应时间。传统接口通常几百毫秒到一两秒返回,而大模型回答可能需要数秒甚至十几秒。如果中间还要做 RAG 检索、rerank、工具调用、结果汇总,用户等待时间会更长。

客服坐席的工作节奏很快。如果系统在十秒内没有任何反馈,坐席会以为卡住了,可能重复点击、刷新页面或转去手工查询。AI 系统即使最终答案正确,体验也会很差。

SSE 的作用是让后端可以持续向浏览器推送事件。模型生成一个 token 或一小段文本,就推给前端展示;工具调用开始、完成、失败,也可以推送状态;最后再推送完整回答、引用来源和审计 ID。

为什么很多 AI 对话系统选择 SSE?因为 AI 文本生成通常是服务端到客户端的单向流,前端不需要频繁向后端推送消息。SSE 基于 HTTP,浏览器原生支持 EventSource,穿透代理比 WebSocket 简单,和现有 Spring MVC / WebFlux 系统集成成本较低。

当然,SSE 不是万能的。如果需要双向实时协作、多人编辑、语音实时交互,WebSocket 更合适。但内部客服文本对话场景,SSE 通常足够。

3. 核心原理

SSE,全称 Server-Sent Events,是一种基于 HTTP 长连接的服务端推送机制。响应头通常是:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

服务端按特定格式写入事件:

event: token
data: {"content":"您好"}

event: token
data: {"content":",请稍等"}

event: done
data: {"messageId":"m123"}

每个事件之间用空行分隔。浏览器端通过 EventSource 监听不同事件类型。相比一次性 JSON 返回,SSE 可以在后端处理过程中不断给用户反馈。

AI 对话里的事件不应该只有 token。更成熟的事件协议可以包含:

  • start:会话开始,返回 messageId。
  • retrieval_start:开始检索知识库。
  • retrieval_done:检索完成,返回引用数量。
  • tool_call_start:开始调用某个工具。
  • tool_call_done:工具调用完成。
  • token:模型文本增量。
  • citation:引用来源。
  • warning:安全提示或证据不足提示。
  • error:可恢复或不可恢复错误。
  • done:完整结束。

这样前端可以展示“正在检索条款”“正在查询保单状态”“正在生成回答”,用户不会面对一个空白等待。

在 Java 后端实现上,Spring MVC 可以使用 SseEmitter,Spring WebFlux 可以使用 Flux<ServerSentEvent>。如果系统原来是 Spring MVC,SseEmitter 改造成本低;如果整个链路已经是响应式,WebFlux 更自然。

4. 项目落地

项目落地时建议先定义事件协议,而不是直接把模型 token 原样丢给前端。因为业务对话不只是文本输出,还包括 RAG、工具调用、安全拦截和审计。

后端可以把一次对话拆成 ConversationTask。Controller 创建 SseEmitter 后,把任务交给线程池或异步执行器。任务内部按步骤推送事件:先推 start,再执行意图识别,推 retrieval_start,执行 RAG,推 retrieval_done,必要时推 tool_call_start 和 tool_call_done,最后调用大模型流式接口,把 token 一段段转发给前端。

前端收到事件后,按 messageId 更新当前 AI 消息。token 事件追加文本,citation 事件更新引用区,tool_call 事件更新状态提示,done 事件标记完成。用户如果点击停止生成,前端调用取消接口,后端根据 conversationId 或 requestId 取消上游模型请求和本地任务。

工程上要特别注意连接生命周期。SseEmitter 默认超时可能不适合大模型长回答,需要配置合理超时时间。任务结束时必须 complete,异常时 completeWithError 或发送 error 后 complete。用户断开连接时要释放上游请求,否则后端还在生成没人看的内容。

代理配置也很关键。Nginx 默认可能缓冲响应,导致后端已经分片写了,前端却等到缓冲区满才看到。需要关闭 proxy_buffering,并确认网关、负载均衡、压缩配置不会破坏流式输出。

审计方面,不建议只记录 token 流。可以在内存或消息表中累积完整回答,done 时落库:问题、检索证据、工具调用、完整回答、用户、耗时、traceId。这样既有实时体验,也有完整审计。

5. 流程或伪代码

@PostMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter chat(@RequestBody ChatRequest request, Principal principal) {
    UserContext user = userContextFactory.from(principal);
    SseEmitter emitter = new SseEmitter(120_000L);
    String requestId = idGenerator.next();

    emitter.onCompletion(() -> conversationTaskRegistry.finish(requestId));
    emitter.onTimeout(() -> conversationTaskRegistry.cancel(requestId));
    emitter.onError(ex -> conversationTaskRegistry.cancel(requestId));

    conversationExecutor.submit(() -> {
        try {
            send(emitter, "start", Map.of("requestId", requestId));

            send(emitter, "retrieval_start", Map.of("message", "正在检索知识库"));
            List<Evidence> evidence = ragService.retrieve(request, user);
            send(emitter, "retrieval_done", Map.of("count", evidence.size()));

            ToolPlan plan = toolPlanner.plan(request, user);
            if (plan.needCall()) {
                send(emitter, "tool_call_start", Map.of("tool", plan.toolName()));
                ToolResult result = toolRuntime.execute(plan, user);
                send(emitter, "tool_call_done", result.safeSummary());
            }

            StringBuilder fullAnswer = new StringBuilder();
            llmClient.stream(promptBuilder.build(request, evidence), token -> {
                fullAnswer.append(token);
                send(emitter, "token", Map.of("content", token));
            });

            send(emitter, "citation", citationBuilder.build(evidence));
            auditLogger.record(request, user, evidence, fullAnswer.toString());
            send(emitter, "done", Map.of("requestId", requestId));
            emitter.complete();
        } catch (Exception ex) {
            sendQuietly(emitter, "error", errorMapper.toClientMessage(ex));
            emitter.complete();
        }
    });

    return emitter;
}

这个伪代码省略了很多细节,但面试时可以重点讲:SSE 连接和实际任务要解耦;每个阶段都有事件;异常要转成业务可读错误;任务结束要释放资源;审计记录完整结果而不是零散 token。

6. 安全边界

第一,用户身份边界。SSE 连接建立时必须完成认证,并把用户上下文绑定到整个任务。不能因为连接是长连接就跳过权限校验。

第二,输出内容边界。模型 token 流式返回时,敏感词、隐私字段、违规话术可能已经分片输出。高风险场景可以采用小缓冲策略:先缓存若干 token 做轻量检测,再批量发送;或者对工具结果先脱敏,降低模型输出敏感信息概率。

第三,工具结果边界。工具调用状态可以展示给用户,但不要把内部错误、堆栈、接口地址、系统名称直接推送到前端。

第四,取消边界。用户取消生成后,后端应停止上游模型流和工具任务。否则可能浪费资源,甚至继续执行不该执行的动作。

第五,审计边界。流式输出也要落完整审计。不能因为是 token 流,就没有最终回答记录。

第六,连接资源边界。SSE 是长连接,会占用连接数和线程资源。必须设置超时、并发限制和队列保护,防止大量对话拖垮服务。

第七,代理边界。中间网关必须正确配置,否则可能出现“后端流式,前端不流式”的假象。

7. 常见坑

第一个坑是 Nginx 缓冲没关。后端看日志是一段段发送,用户却最后一次性收到,通常就是代理缓冲或压缩导致。

第二个坑是没有心跳。长时间执行工具或检索时,如果没有任何数据发送,代理可能认为连接空闲而断开。可以定期发送 comment 或 ping 事件。

第三个坑是 SseEmitter 使用公共线程池。模型流式回答耗时长,如果占满业务线程池,会影响普通接口。应该使用独立线程池并限制并发。

第四个坑是异常没有完整结束。异常后不 complete,前端一直显示生成中;或者 complete 前没有发送 error,用户不知道失败原因。

第五个坑是无法取消。用户点击停止只是前端不显示,后端仍然在调用模型。这会浪费 token 和资源。

第六个坑是 token 级审计缺失。只流式输出不保存完整回答,后续质检和纠错会很困难。

第七个坑是事件协议混乱。前端每接一个字段都写特殊逻辑,后期很难维护。应该先定义稳定事件类型。

8. 面试追问

面试官可能问:为什么用 SSE 不用 WebSocket?可以回答:AI 文本生成主要是服务端到客户端单向推送,SSE 基于 HTTP、实现简单、浏览器原生支持、代理兼容性较好;如果需要强双向实时交互再考虑 WebSocket。

可能问:SSE 如何处理工具调用?回答:把工具调用开始、完成、失败作为独立事件推给前端,模型 token 只是事件流的一部分。

可能问:用户断开连接怎么办?回答:监听 completion、timeout、error,取消本地任务和上游模型请求,释放资源。

可能问:如何做敏感内容拦截?回答:优先在工具结果和 Prompt 输入前脱敏;输出侧可做小窗口缓冲检测和最终回答质检,高风险问题不直接流式承诺。

可能问:如何保证审计?回答:后端累积完整回答,同时记录检索证据、工具调用、用户上下文和 traceId,done 时落库。

可能问:线上遇到不流式怎么办?回答:排查响应头、代理 buffering、gzip、网关超时、浏览器 EventSource、后端 flush 和模型 SDK 是否真流式。

9. 推荐回答

推荐回答如下:

我们在 AI 客服里用 SSE 主要是为了解决大模型响应慢带来的等待体验问题。这个场景是典型的服务端向浏览器单向推送,SSE 基于 HTTP,和现有 Spring 系统、网关、浏览器兼容比较好,比 WebSocket 改造成本低。

实现上我们不是只推 token,而是定义了事件协议,包括 start、retrieval_start、tool_call_start、token、citation、error、done 等。这样前端可以展示检索、工具调用和生成状态。后端用 SseEmitter 管理连接,把实际对话任务放到独立线程池,支持超时、断线取消和用户主动停止。

安全上,工具结果先脱敏再进入模型,流式输出会记录完整回答和引用证据,异常也会转换成可读事件返回。线上还要注意 Nginx buffering、gzip 和网关超时,否则很容易后端流式但前端一次性收到。

这段回答能体现你既懂协议,也懂 AI 对话系统的工程细节。

10. 延伸学习路线

第一阶段学习 SSE 协议本身:event-stream 格式、EventSource、重连机制、心跳、响应头。

第二阶段学习 Spring 实现:SseEmitter、ResponseBodyEmitter、WebFlux Flux,比较阻塞和响应式模型的差异。

第三阶段学习大模型流式 API:token delta、tool call delta、finish reason、取消请求、超时处理。

第四阶段学习前端交互:增量渲染、Markdown 流式解析、引用延迟展示、停止生成、错误提示。

第五阶段学习生产配置:Nginx buffering、网关超时、连接池、线程池、限流、监控指标。

最后可以做一个小项目:Spring Boot + SSE + RAG + 模型流式输出,前端显示检索状态、工具调用状态和引用来源。这个项目很适合用来说明 AI 对话体验优化能力。