流式输出
大约 3 分钟
第十一章:流式处理
11.1 流式响应处理
很多 AI 场景并不需要“等整段文本生成完再返回”,更好的体验是边生成边展示:
- 提升交互体验:用户更快看到“第一屏”
- 降低超时概率:长输出不再被单次请求超时影响
- 有利于可取消:用户不想看了可以中止连接
Spring AI 的核心是 ChatClient.stream():把模型的输出变成一个可持续消费的数据流。
11.2 实时内容生成:SSE(可复制运行)
这一节给出一个最小可运行的 SSE 示例。你只需要:
- Spring Boot + WebFlux
- 一个
/stream/chat接口返回Flux<ServerSentEvent<String>> - 前端使用
EventSource监听并渲染
11.2.1 pom.xml(关键是 WebFlux)
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-qwen-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-deepseek-spring-boot-starter</artifactId>
</dependency>
</dependencies>11.2.2 Controller:把 ChatResponse 映射成 SSE token 流
package com.example.streaming.api;
import java.time.Duration;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class StreamingController {
private final ChatClient chatClient;
public StreamingController(ChatClient chatClient) {
this.chatClient = chatClient;
}
@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> stream(@RequestParam String message) {
Flux<String> tokens = chatClient.prompt()
.system("你是一个简洁但专业的助手。输出分段清晰,避免一次性输出超长段落。")
.user(message)
.stream()
.flatMapIterable(response -> response.getResults())
.map(result -> result.getOutput().getContent());
Flux<ServerSentEvent<String>> heartbeat = Flux.interval(Duration.ofSeconds(10))
.map(i -> ServerSentEvent.<String>builder()
.event("ping")
.data("keep-alive")
.build());
Flux<ServerSentEvent<String>> payload = tokens
.map(token -> ServerSentEvent.builder(token).event("token").build());
return Flux.merge(payload, heartbeat);
}
}11.2.3 前端:EventSource(直接复制到浏览器打开)
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8" />
<title>Spring AI SSE 流式输出 Demo</title>
<style>
body { font-family: Arial, sans-serif; margin: 24px; }
textarea { width: 100%; height: 80px; }
pre { white-space: pre-wrap; border: 1px solid #ddd; padding: 12px; min-height: 120px; }
</style>
</head>
<body>
<h1>Spring AI SSE 流式输出 Demo</h1>
<textarea id="q">用小白能懂的方式解释一下什么是 Spring AI,并给一个最小可运行示例。</textarea>
<button id="btn">开始</button>
<button id="stop">停止</button>
<pre id="out"></pre>
<script>
let es;
const out = document.getElementById('out');
document.getElementById('btn').onclick = () => {
out.textContent = '';
const q = encodeURIComponent(document.getElementById('q').value);
es = new EventSource(`/stream/chat?message=${q}`);
es.addEventListener('token', (e) => { out.textContent += e.data; });
es.addEventListener('ping', () => {});
es.onerror = () => { out.textContent += "\n\n[连接中断]"; es.close(); };
};
document.getElementById('stop').onclick = () => {
if (es) es.close();
};
</script>
</body>
</html>11.3 大数据流处理:背压、超时、取消
11.3.1 背压(Backpressure)
当客户端慢、或模型输出很快时,后端要避免积压内存:
- 优先使用响应式链路(WebFlux)
- 对下游加限速(例如按 token 合并为 chunk 再发送)
- 对单连接加最大输出限制(最大 token 或最大字符数)
11.3.2 超时与取消
- 模型调用要有超时:防止单个请求占用连接
- 用户关闭浏览器时要能取消:WebFlux 会传播 cancel 信号,底层客户端应尽快释放资源
11.4 性能优化建议(实战向)
- token 合并:每 20~50 个 token 合并成一段再推送,减少 SSE 事件数
- 心跳:长连接需要 keep-alive,避免代理/网关主动断开
- 降级:当 SSE 不可用时,支持回退到“非流式”接口 点击这里👇🏻获取:100万QPS短链系统、复杂的商城微服务系统、智能翻译助手AI Agent、SaaS点餐系统、刷题吧小程序、商城系统、秒杀系统、AI项目、代码生成神器、苏三demo项目、智能天气播报AI Agent、智能代码审查AI Agent等 10 个项目的:项目源代码、开发教程和技术答疑
