流式输出 SSE
大约 2 分钟
第十二章:流式输出:WebFlux SSE 实战(取消/断流/心跳)
12.1 为什么流式一定要做“取消与断连”
流式输出提升体验,但如果你不处理取消与断连,会出现:
- 用户已经关页面了,服务端还在继续生成(白白烧钱)
- 代理/网关空闲超时导致断流,用户以为卡死
- 并发流式连接过多,拖垮应用
本章用 Spring WebFlux SSE 做一个可落地的流式接口骨架。
12.2 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-open-ai-spring-boot-starter</artifactId>
<version>1.12.2-beta22</version>
</dependency>并在 application.yml 里配置 streaming-chat-model(见第 3 章)。
12.3 Controller:把 TokenStream 转成 SSE
下面的写法核心是:
- 用
Sinks.Many<String>作为桥梁,把 onPartialResponse 推进 Flux - 断开连接时停止继续推送
- 结束时发送一个完成事件
package com.example.langchain4j.api;
import dev.langchain4j.model.chat.StreamingChatModel;
import dev.langchain4j.service.AiServices;
import dev.langchain4j.service.TokenStream;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
@RestController
public class StreamingChatController {
interface StreamingAssistant {
TokenStream chat(String message);
}
private final StreamingAssistant assistant;
public StreamingChatController(StreamingChatModel streamingChatModel) {
this.assistant = AiServices.create(StreamingAssistant.class, streamingChatModel);
}
@GetMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> stream(@RequestParam String q) {
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
TokenStream tokenStream = assistant.chat(q);
tokenStream
.onPartialResponse(sink::tryEmitNext)
.onCompleteResponse(resp -> {
sink.tryEmitNext("\n");
sink.tryEmitComplete();
})
.onError(err -> sink.tryEmitError(err))
.start();
return sink.asFlux()
.map(data -> ServerSentEvent.builder(data).event("delta").build())
.doFinally(signal -> tokenStream.cancel());
}
}12.4 心跳:避免网关空闲超时
很多反向代理会在“长时间无数据”时断开连接。你可以在 SSE 流里定时发心跳:
package com.example.langchain4j.api;
import java.time.Duration;
import org.springframework.http.codec.ServerSentEvent;
import reactor.core.publisher.Flux;
public final class SseHeartbeat {
private SseHeartbeat() {
}
public static Flux<ServerSentEvent<String>> heartbeat(Duration interval) {
return Flux.interval(interval)
.map(tick -> ServerSentEvent.<String>builder("ping").event("ping").build());
}
}把它和模型流合并:
return Flux.merge(
sink.asFlux().map(d -> ServerSentEvent.builder(d).event("delta").build()),
SseHeartbeat.heartbeat(Duration.ofSeconds(10))
).doFinally(signal -> tokenStream.cancel());12.5 本章小结
你已经拿到一个可上线的 SSE 骨架:TokenStream → Flux → SSE,并且考虑了取消、断连与心跳。下一章我们把治理三件套一次讲完:可观测性 + 安全 + 性能成本,最后在第 14 章把所有能力组合成实战项目。
