专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

Java实现SSE流式接口与异步推送全流程实战解析

在提供大型语言模型(LLM)服务时,完整响应内容的生成通常需要较长时间。

如果采用HTTP的一次性输出方式,用户将面临漫长的等待过程,严重影响交互体验。为解决这一问题,各主流LLM服务提供商均实现了流式输出接口,使用户能够实时看到模型生成的内容,而这些流式接口普遍采用SSE(Server-Sent Events)协议实现。

在维基百科中,SSE的定义是这样的:

Server-Sent Events (SSE) 是一种服务器推送技术。它允许客户端通过 HTTP 连接自动接收来自服务器的更新,并说明服务器如何在建立初始客户端连接后向客户端发起数据传输。 SSE 常用于向浏览器客户端发送消息更新或连续数据流,其设计目的是通过一个名为 EventSource 的 JavaScript API 来增强浏览器自身对跨浏览器流传输的支持。通过此 API,客户端请求特定的 URL 即可接收事件流。EventSource API 由 WHATWG 作为 HTML Living Standard[1] 的一部分进行了标准化。SSE 的媒体类型为 text/event-stream

如何在Spring中提供一个SSE服务

与HTTP请求的响应模式不同,SSE(Server-Sent Events)要求服务器建立一个长连接,并通过该连接持续向客户端推送数据。在Spring框架中实现SSE服务需要采用异步处理机制,以确保能够实现真正的流式数据传输。否则请求依旧会一次性返回给用户端,无法达到流式返回的,减少用户等待的效果。

Spring4中就专门设计了一个SseEmitter类来支持SSE协议,使用起来也比较简单。就直接让Controller返回SseEmitter类就好了。

示例代码:

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@RestController
public class SSEController {
    @GetMapping("/demo/streaming")
    public SseEmitter sendEvents() {
        SseEmitter emitter = new SseEmitter(5000L);
        CompletableFuture<Void> completableFuture1 = sendMessageAndWaitSeconds(emitter, "hello word 1", 1);
        CompletableFuture<Void> completableFuture2 = sendMessageAndWaitSeconds(emitter, "hello word 2", 2);
        CompletableFuture<Void> completableFuture3 = sendMessageAndWaitSeconds(emitter, "hello word 3", 3);
        CompletableFuture
                .allOf(completableFuture1, completableFuture2, completableFuture3)
                .thenAccept((v) -> emitter.complete());
        return emitter;
    }
    private static CompletableFuture<Void> sendMessageAndWaitSeconds(SseEmitter emitter, String message, int delay) {
        return CompletableFuture.runAsync(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(delay);
                        emitter.send(message);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
        );
    }
}

上面的代码用了很多CompletableFuture对象来异步化代码,原因是这个函数的主要目标是需要把SseEmitter返回给前端进行监听,如果等函数执行完,再返回SseEmitter,其实无异于同步执行完代码再返回结果。

此外当服务推送完成的时候,注意需要使用SseEmittercomplete()方法关闭前端的,否则前端会一直等到超时才会关闭接口,并且会响应异常状态码。

实际在平时的业务Java代码中,我们是不习惯编写异步的代码的,可以利用Spring的@Async注解将我们编写的代码转换成异步的代码。

比如:

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Service
public class BizService {
    @Async
    public CompletableFuture<Void> doBiz(SseEmitter emitter, int time, String message) throws InterruptedException, IOException {
        // 模拟业务代码
        TimeUnit.SECONDS.sleep(time);
        // 发送消息给前端
        emitter.send(message);
        return CompletableFuture.completedFuture(null);
    }
}

其实不难发现,这种SSE模式更加合适发布-订阅的模型,即服务端暴露两个接口:

1、 注册接口: 客户端可以订阅注册接口,获取 SseEmitter 对象。
2、 监听消息接口: 客户端监听接收消息接口获取 SseEmitter 对象发送的消息,直到所有的消息推送完毕(这也可以单独拆分一个接口)

但是这样设计,对于客户端的对接开发复杂度就会提升很多,目前这种一个接口干完发布-订阅的模型,对客户端来说是更加友好的模式。

所有的代码已经推送到了github上: https://github.com/whthomas/ai-demo-blog/tree/main/ai-demo-blog-sse

为什么不用Flux?

1、 CompletableFuture是JDK1.8中自带的一个对象,业务函数不只是暴露给Spring使用,如果返回Flux,在其他的接口(比如Dubbo、MQ等等)上势必需要额外的兼容工作。
2、 Flux是一个异步框架,需要各个环境对其进行适配,从数据库操作、文档操作等等函数全部返回Flux对象,一些第三方库和框架可能没有提供响应式API,需要进行适配或阻塞调用,这可能会抵消使用响应式编程的优势。
3、 Flux和响应式编程范式需要开发人员转变思维模式,从命令式编程转为声明式和函数式编程,这种转变对团队来说需要较长时间适应,对于已经习惯传统编程模型的开发而言是较重的心智负担。
4、 Flux响应式代码的调试比传统的命令式代码更复杂,错误栈追踪往往不直观,因为执行链是异步的,在测试编写、问题定位和修复会更加耗时。

总结

随着大模型的能力逐步进入日常的开发工作,SSE已经成了必不可少的技能。SSE让我们能够建立实时通信渠道,为用户提供流畅的交互体验。依托于SSE的简洁性和高效性,我们可以轻松地实现如聊天机器人、实时通知等功能,无需复杂的WebSocket配置。

相关内容:

  • https://github.com/whthomas/ai-demo-blog/tree/main/ai-demo-blog-sse
  • https://en.wikipedia.org/wiki/Server-sent_events
  • https://www.baeldung.com/spring-server-sent-events
  • https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.html
未经允许不得转载:搜云库 » Java实现SSE流式接口与异步推送全流程实战解析

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们