LLM流式方案解决方案和客户端解决方案

背景

接上一篇《LLM大模型统一封装接口解决方案》架构确定后,流式方案非常规请求,需要特殊处理。

本解决方案就是针对上一篇中所需要的流式(打字机效果进行编码)

什么是SSE

SSE(Server-Sent Events,服务器发送事件)是一种基于HTTP的服务器到客户端的单向通信技术,用于实现服务器向客户端推送数据的功能。SSE协议标准由HTML5规范定义,并且其定义被包含在HTML Living Standard中。

SSE允许服务器通过HTTP连接向客户端发送数据,而无需客户端发起请求。这使得SSE非常适合于实时通信或推送通知给客户端的应用程序,例如实时股票报价、即时通讯、实时监控等场景。

基本上,SSE由以下要素组成:

  1. 服务器:负责向客户端发送事件流的HTTP服务器。
  2. 客户端:通过浏览器中的EventSource API与服务器建立连接,接收服务器发送的事件。
  3. 事件流(Event Stream):服务器向客户端发送的数据流,格式为纯文本,使用一种特定的格式进行编码,例如MIME类型为"text/event-stream"。

SSE的优点包括简单易用、实现方便、跨浏览器支持良好等。然而,它也有一些限制,例如不能支持双向通信,与WebSocket相比,SSE的实时性稍逊一筹。

Java框架说明

pom 文件引入的核心依赖包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>aip.com</groupId>
    <artifactId>aip-com</artifactId>
    <version>0.0.1</version>
    <name>aip-com</name>
    <description>aip com project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Java后端核心代码

本方法是标准的SSE协议标准


    private final ExecutorService executorService = Executors.newFixedThreadPool(5);

    
    /**
     * 会话请求
     *
     * @return String
     */
    @PostMapping(value = "/completions", consumes = MediaType.APPLICATION_JSON_VALUE)
    @Operation(summary = "会话请求")
    public SseEmitter completions(@RequestBody CompletionRequest completionRequest) {
        response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);
        SseEmitter emitter = new SseEmitter();

        executorService.execute(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    // 向客户端发送事件
                    emitter.send(
                            SseEmitter.event()
                                    .name("message")
                                    .data(JsonHelper.toJSONString(new StreamCompletionResult.Builder()
                                            .ended(false)
                                             .message(String.valueOf(i))
                                            .build()))
                    );
                    Thread.sleep(1000);
                }
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });
        return emitter;

    /**
     * 会话请求
     *
     * @return String
     */
    @GetMapping(value = "/stream")
    @Operation(summary = "会话请求")
    public SseEmitter stream() {
        response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);
        SseEmitter emitter = new SseEmitter();

        executorService.execute(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    // 向客户端发送事件
                    emitter.send(
                            SseEmitter.event()
                                    .name("message")
                                    .data(JsonHelper.toJSONString(new StreamCompletionResult.Builder()
                                            .ended(false)
                                             .message(String.valueOf(i))
                                            .build()))
                    );
                    Thread.sleep(1000);
                }
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });
        return emitter;

Flux 和 Flowable 对比

Flux 和 Flowable 都是响应式编程库中的数据流类型,用于处理异步和基于事件的流式数据。它们分别来自于不同的库,Flux 是 Reactor 库的一部分,而 Flowable 则是 RxJava 库的一部分。以下是它们之间的一些区别:

  1. 库的来源:

    • Flux 来自于 Reactor 库,是 Reactor 的核心组件之一,React的核心模块用于基于反应式流规范处理数据流。
    • Flowable 来自于 RxJava 库,是 RxJava 的核心类之一,RxJava 是 Java 平台的反应式扩展库,用于处理异步和基于事件的编程。
  2. 背压策略:

    • Flux 默认采用背压策略为 BUFFER,可以通过 onBackpressureBuffer、onBackpressureDrop、onBackpressureLatest 等方法来指定不同的背压策略。
    • Flowable 默认也是支持背压的,但是相比 Flux,Flowable 提供了更多的背压策略,如 BUFFER、DROP、LATEST、ERROR、MISSING。
  3. 反应式规范:

    • Flux 遵循 Reactor 库的反应式流规范,使用 Mono 和 Flux 来表示异步流和单个结果。
    • Flowable 遵循 RxJava 库的反应式流规范,使用 Observable 和 Flowable 来表示异步流和单个结果。
  4. 生态系统:

    • Reactor 生态系统主要用于基于 Reactor 的应用程序。
    • RxJava 生态系统则更广泛,它是 ReactiveX 的一部分,支持多种语言和平台,并有许多衍生项目。

总的来说,Flux 和 Flowable 在概念上很相似,都用于处理异步和基于事件的流式数据,但它们来自于不同的库,并且有一些细微的区别,如背压策略和生态系统支持。您可以根据项目需求选择适合的库和数据流类型。

Java后端Flowable方式

本方法是Flowable方式,非标准流式规则

    /**
     * 会话请求
     *
     * @return String
     */
    @GetMapping(value = "/stream")
    @Operation(summary = "会话请求")
    public Flowable<String> stream() {
        response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);

        Flowable<String> typingFlow = Flowable.create(emitter -> {
            executorService.execute(() -> {
                try {
                    for (int i = 0; i < 10; i++) {

                        emitter.onNext(JsonHelper.toJSONString(new StreamCompletionResult.Builder()
                                .ended(false)
                                .message(String.valueOf(i))
                                .build()));

                        Thread.sleep(1000);
                    }
                    emitter.onComplete();
                } catch (Exception e) {

                }
            });
        }, BackpressureStrategy.BUFFER);

        return typingFlow;
    }

Java后端Flux方式

本方法是Flux方式,非标准流式规则

    /**
     * 会话请求
     *
     * @return String
     */
    @GetMapping(value = "/stream")
    @Operation(summary = "会话请求")
    public Flux<String> stream() {
        response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);

        Flux<String> typingFlow = Flux.create(emitter -> {
            executorService.execute(() -> {
                try {
                    for (int i = 0; i < 10; i++) {

                        emitter.next(JsonHelper.toJSONString(new StreamCompletionResult.Builder()
                                .ended(false)
                                .message(String.valueOf(i))
                                .build()));

                        Thread.sleep(1000);
                    }
                    emitter.complete();
                } catch (Exception e) {

                }
            });
        }, FluxSink.OverflowStrategy.BUFFER);

        return typingFlow;
    }
}

HTML 客户端接收示例程序

function EventSourceGetRequest() SSE 默认方法,只支持GET请求,适合演示用途以及后端包装好服务

function fetchPostRequest() fetch POST 请求实现SSE,支持所有请求(POST,GET等)以及传递参数

sse.html 内容

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SEE Example</title>
    <script>

        // SSE 默认方法,只支持GET请求
        function EventSourceGetRequest() {
            if(typeof(EventSource)!=="undefined")
            {
                var eventSource = new EventSource('http://127.0.0.1:8090/v1/chat/stream');
                eventSource.onmessage = function(event)
                {
                    document.getElementById('result').insertAdjacentHTML('beforeend', `${event.data}<br/><br/>`);
                    console.log(event)
                };
            }
            else
            {
                document.getElementById("result").innerHTML="抱歉,你的浏览器不支持 server-sent 事件...";
            }
        }

        // fetch POST 请求实现SSE
        function fetchPostRequest() {
            fetch('http://127.0.0.1:8090/v1/chat/completions', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json'
                },
                body: JSON.stringify({}),
            })
            .then(response => {
                // 检查响应是否成功
                if (!response.ok) {
                    throw new Error('Network response was not ok');
                }
                // 返回 ReadableStream 对象
                return response.body;
            })
            .then(stream => {
                // 创建一个新的文本解码器
                const decoder = new TextDecoder();
                
                // 获取一个 reader 对象
                const reader = stream.getReader();
                
                let chunk = ''
                
                // 逐块读取数据
                function read() {
                    reader.read().then(({ done, value }) => {
                        if (done) {
                            document.getElementById('result').insertAdjacentHTML('beforeend', `${chunk}<hr/>`);
                            console.log('Stream has ended');
                            return;
                        }
                        // 将数据块转换为字符串并显示
                        const tmp = decoder.decode(value, { stream: true });
                        if (tmp.startsWith('event:') && chunk!='') {
                            document.getElementById('result').insertAdjacentHTML('beforeend', `${chunk}<hr/>`);
                            chunk = tmp
                        }else{
                            chunk = chunk + tmp
                        }
                        // 继续读取下一块数据
                        read();
                    });
                }
                // 开始读取数据
                read();
            })
            .catch(error => {
                // 处理错误
                console.error('There was a problem with the fetch operation:', error);
            });
        }

        // EventSourceGetRequest();
        fetchPostRequest();
    </script>
</head>
<body>
	<h1>SEE result</h1>
    <div id="result"></div>
</body>
</html>
  • 标准SSE示例

标准SSE

  • 扩展SSE

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/468749.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙App开发学习 - TypeScript编程语言全面开发教程(上)

背景 根据鸿蒙官方的说明&#xff1a; ArkTS是HarmonyOS优选的主力应用开发语言。ArkTS围绕应用开发在TypeScript&#xff08;简称TS&#xff09;生态基础上做了进一步扩展&#xff0c;继承了TS的所有特性&#xff0c;是TS的超集。因此&#xff0c;在学习ArkTS语言之前&#…

蓝牙系列十七:BLE安全机制--地址类型与LL层设备过滤

上一篇我们讲了BLE的安全机制&#xff0c;引入白名单和安全地址的感念&#xff0c;使用白名单来过滤安全设备是BLE种最简单的方法。这一篇我们来详细讲一下这些概念。 一、地址类型 学习资料&#xff1a;官方手册 Vol 6: Core System Package [Low Energy Controller volume…

蓝桥杯学习笔记 单词分析

试题 G: 单词分析 时间限制: 1.0s 内存限制: 512.0MB 本题总分:20 分 [问题描述] 小蓝正在学习一门神奇的语言&#xff0c;这门语言中的单词都是由小写英文字母组成&#xff0c;有些单词很长&#xff0c;远远超过正常英文单词的长度。小蓝学了很长时间也记不住一些单词&#xf…

Spring 3升级指导

一&#xff0c;背景 Spring开源多年&#xff0c;已经经过了多次的升级迭代&#xff0c;最新的已经到Spring 6了&#xff0c;但是估计大家最常用的还是Spring 2.x。 最近项目准备升级到Spring 3&#xff0c;下面简单记录一下升级的改动点。 二&#xff0c;官方指导 1&#x…

深度观察2024中国系统架构师大会(SACC)

今年的中国系统架构师大会&#xff08;SACC&#xff09;在我所在的城市广州举办&#xff0c;很荣幸受邀参加。这次能接触到国内最优秀的架构师&#xff0c;学习他们的架构思想和行业经验。对我而言非常有意义。 大会分为上下午共4场&#xff0c;我参加了上午的多云多活架构设计…

SLAM IPC算法

基础知识&#xff1a;方差&#xff0c;协方差&#xff0c;协方差矩阵 方差&#xff1a;描述了一组随机变量的离散程度 方差 每个样本值 与 全部样本的平均值 相差的平方和 再求平均数&#xff0c;记作&#xff1a; 例如&#xff1a;计算数字1-5的方差&#xff0c;如下 去中心化…

【ZooKeeper】1、基本介绍

本文基于 Apache ZooKeeper Release 3.7.0 版本书写 作于 2022年3月6日 14:22:11 转载请声明 1、Zookeeper是什么&#xff1f; 由ZooKeeper的官网介绍可知&#xff1a; ZooKeeper 是Apache原子基金会下一个开源的、用于提供可靠的分布式协同的服务器。 ZooKeeper 可以用来 配置…

Spring MVC入门(4)

请求 获取Cookie/Session 获取Cookie 传统方式: RequestMapping("/m11")public String method11(HttpServletRequest request, HttpServletResponse response) {//获取所有Cookie信息Cookie[] cookies request.getCookies();//打印Cookie信息StringBuilder build…

Soul CEO张璐团队聚焦AIGC,斩获“年度最具成长潜力”奖

近日,由《财经》新媒体及《财经》商业治理研究院联合主办的第六届“新奖”评选活动落下帷幕。 新型社交平台Soul App在CEO张璐的带领下持续发力AIGC,凭借在“AIGC社交”领域的创新探索及所体现出的巨大成长潜力,荣获新科技板块“年度最具成长潜力奖”,再度凸显其在智能社交方面…

两个独立的高增益运算放大器组成D358,应用于音频放大器、工业控制等。采用 DIP8、SOP8、MSOP8 和 TSSOP8 的封装形式。

一、概述 D358 由两个独立的高增益运算放大器组成。可以是单电源工作&#xff0c;也可以是双电源工作&#xff0c;电源低功耗电流与电源电压大小无关。 应用范围包括音频放大器、工业控制、DC 增益部件和所有常规运算放大电路。 D358 采用 DIP8、SOP8、MSOP8 和 TSSOP8 的封装形…

多功能免费实用的 PDF24工具箱 v11.17.0

PDF24 Creator&#xff08;详情请戳 官网&#xff09;是一款完全免费且优秀实用的PDF工具箱软件&#xff0c;PDF24工具箱包含PDF分割/合并、PDF压缩、PDF编辑器、PDF加密/解密、PDF页面/图像提取、PDF比较、PDF转换、添加PDF水印、PDF文本OCR识别等多种功能&#xff0c;PDF24工…

C++初阶:string类的模拟自实现

目录 1. 引子2. 自实现string类功能模块3. string类功能模块的具体实现3.1 默认成员函数3.2 遍历访问相关成员函数3.3 信息插入相关成员函数3.4 信息删除3.5 信息查找3.6 非成员函数3.7 杂项成员函数 4. 补充知识 1. 引子 通过对string类的初步学习&#xff0c;没有对知识进行较…

大数据面试题 —— Zookeeper

目录 ZooKeeper 的定义ZooKeeper 的特点ZooKeeper 的应用场景你觉得Zookeeper比较重要的功能ZooKeeper 的选举机制 ***zookeeper主节点故障&#xff0c;如何重新选举&#xff1f;ZooKeeper 的监听原理 ***zookeeper集群的节点数为什么建议奇数台 ***ZooKeeper 的部署方式有哪几…

牛客题霸-SQL进阶篇(刷题记录一)

本文基于前段时间学习总结的 MySQL 相关的查询语法&#xff0c;在牛客网找了相应的 MySQL 题目进行练习&#xff0c;以便加强对于 MySQL 查询语法的理解和应用。 由于涉及到的数据库表较多&#xff0c;因此本文不再展示&#xff0c;只提供 MySQL 代码与示例输出。 部分题目因…

C语言之我对结构体与联合体的认识

c语言中的小小白-CSDN博客c语言中的小小白关注算法,c,c语言,贪心算法,链表,mysql,动态规划,后端,线性回归,数据结构,排序算法领域.https://blog.csdn.net/bhbcdxb123?spm1001.2014.3001.5343 给大家分享一句我很喜欢我话&#xff1a; 知不足而奋进&#xff0c;望远山而前行&am…

Huggingface 笔记:大模型(Gemma2B,Gemma 7B)部署+基本使用

1 部署 1.1 申请权限 在huggingface的gemma界面&#xff0c;点击“term”以申请gemma访问权限 https://huggingface.co/google/gemma-7b 然后接受条款 1.2 添加hugging对应的token 如果直接用gemma提供的代码&#xff0c;会出现如下问题&#xff1a; from transformers i…

基于Spring Boot的社区垃圾分类管理平台的设计与实现

摘 要 近些年来&#xff0c;随着科技的飞速发展&#xff0c;互联网的普及逐渐延伸到各行各业中&#xff0c;给人们生活带来了十分的便利&#xff0c;社区垃圾分类管理平台利用计算机网络实现信息化管理&#xff0c;使整个社区垃圾分类管理的发展和服务水平有显著提升。 本文拟…

WordPress自动生成原创文章插件

WordPress作为最受欢迎的内容管理系统之一&#xff0c;为博客和网站的搭建提供了便捷的解决方案。而在内容创作方面&#xff0c;自动生成原创文章的插件为WordPress用户提供了更为高效的选项。 什么是WordPress自动生成原创文章插件&#xff1f; WordPress自动生成原创文章插件…

Rust 错误处理入门和进阶

Rust 错误处理入门和进阶 引用 Rust Book 的话&#xff0c;“错误是软件中不可避免的事实”。这篇文章讨论了如何处理它们。 在讨论 可恢复错误和 Result 类型之前&#xff0c;我们首先来谈谈 不可恢复错误 - 又名恐慌(panic)。 不可恢复错误 恐慌(panic)是程序可能抛出的异…

C++第七弹---类与对象(四)

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】【C详解】 目录 1、拷贝构造函数 1.1、概念 1.2、特征 2、运算符重载 2.1、等号运算符重载 总结 1、拷贝构造函数 1.1、概念 在现实生活中&#xff0c;可能…