流式输出
大约 4 分钟
第四章:流式输出(SSE)
4.1 为什么要做流式?
流式输出不是“锦上添花”,而是很多产品体验的底座:
- 用户能更快看到第一屏,交互感明显提升
- 长输出不容易被单次请求超时卡死
- 天然支持取消:用户不想看了可以关闭连接
在 Spring AI Alibaba 中,核心能力仍然是 Spring AI 的编程模型:ChatClient.stream() 把模型输出映射为响应式数据流。
4.2 最小可运行 SSE 示例
这一节你会得到一个可复制运行的链路:
- 后端:
/stream/chat返回Flux<ServerSentEvent<String>> - 前端:
EventSource接收 token 并逐段渲染
4.2.1 pom.xml(关键:WebFlux + DashScope Starter)
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud.ai</groupId>
<artifactId>spring-ai-alibaba-starter-dashscope</artifactId>
</dependency>
</dependencies>4.2.2 application.yml
spring:
ai:
dashscope:
api-key: ${AI_DASHSCOPE_API_KEY:}
chat:
options:
model: ${DASHSCOPE_CHAT_MODEL:qwen-plus}
temperature: 0.24.2.3 Controller:把模型 token 流映射为 SSE
package com.example.saa.api;
import java.time.Duration;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class StreamingChatController {
private final ChatClient chatClient;
public StreamingChatController(ChatClient chatClient) {
this.chatClient = chatClient;
}
@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> stream(@RequestParam String message) {
Flux<ServerSentEvent<String>> payload = chatClient.prompt()
.system("你是一个简洁但专业的助手。输出尽量分段,不要一次性输出超长段落。")
.user(message)
.stream()
.flatMapIterable(response -> response.getResults())
.map(result -> result.getOutput().getContent())
.map(token -> ServerSentEvent.builder(token).event("token").build());
Flux<ServerSentEvent<String>> heartbeat = Flux.interval(Duration.ofSeconds(10))
.map(i -> ServerSentEvent.<String>builder()
.event("ping")
.data("keep-alive")
.build());
return Flux.merge(payload, heartbeat);
}
}4.2.4 前端:EventSource(复制到本地 html 直接打开)
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8" />
<title>Spring AI Alibaba SSE Demo</title>
<style>
body { font-family: Arial, sans-serif; margin: 24px; }
textarea { width: 100%; height: 90px; }
pre { white-space: pre-wrap; border: 1px solid #ddd; padding: 12px; min-height: 160px; }
</style>
</head>
<body>
<h1>Spring AI Alibaba SSE 流式输出 Demo</h1>
<textarea id="q">用小白能懂的方式解释一下什么是 RAG,并给一个最小实现思路。</textarea>
<button id="btn">开始</button>
<button id="stop">停止</button>
<pre id="out"></pre>
<script>
let es;
const out = document.getElementById('out');
document.getElementById('btn').onclick = () => {
out.textContent = '';
const q = encodeURIComponent(document.getElementById('q').value);
es = new EventSource(`/stream/chat?message=${q}`);
es.addEventListener('token', (e) => { out.textContent += e.data; });
es.addEventListener('ping', () => {});
es.onerror = () => { out.textContent += "\n\n[连接中断]"; es.close(); };
};
document.getElementById('stop').onclick = () => {
if (es) es.close();
};
</script>
</body>
</html>4.3 工程化要点:心跳、取消、背压
4.3.1 心跳
很多代理与网关会在连接长时间无数据时主动断开。心跳的作用是“保持连接活跃”,同时也能让你在前端判断连接是否健康。
4.3.2 取消
用户点击“停止”,浏览器会关闭 SSE 连接。WebFlux 会把 cancel 信号向上传播,后端不需要你手写线程中断逻辑,但你需要确保:
- 不要把流式链路阻塞在不可取消的操作上
- 下游调用(HTTP)要有超时,避免资源长期占用
4.3.3 背压
当输出 token 很快、客户端很慢时,服务端要避免内存堆积。建议:
- token 合并:每 20~50 个 token 合并成 chunk 再推送
- 单请求上限:限制最大输出字符/最大 token
- 并发控制:限制同一用户/同一 IP 的 SSE 连接数
4.4 本章小结
你已经拥有一条可上线的流式链路:ChatClient.stream() → WebFlux → SSE → 浏览器渲染。
下一章我们会进一步把模型“接入业务动作”:用函数调用(Tools)让模型在需要实时数据或执行动作时,调用你提供的后端工具,并在安全边界内完成闭环。 点击这里👇🏻获取:100万QPS短链系统、复杂的商城微服务系统、智能翻译助手AI Agent、SaaS点餐系统、刷题吧小程序、商城系统、秒杀系统、AI项目、代码生成神器、苏三demo项目、智能天气播报AI Agent、智能代码审查AI Agent等 10 个项目的:项目源代码、开发教程和技术答疑
