基于 Lambda 实现 Claude3 的流式响应

在如今的大语言模型推理输出场景中,流式响应基本已成为必备的功能之一。一方面符合大语言模型生成方式的本质,另一方面当模型推理效率不是很高时,流式响应比起全部 generate 后再输出、能大幅缩短从开始请求到输出第一个 Token 的时间,极大地提高用户体验。

从端到端的视角,可以将大语言模型的流式响应分为两个部分:模型本身的流式推理、后端程序将模型的流式响应输出到客户端,如下图所示。本文将以 Claude3 为例,阐述如何端到端的实现大语言模型的流式响应。


一、流式推理

当今的主流大语言模型,大都已支撑流式推理。我们以 Python 代码来实现 Claude3 的流式推理为例,给出如下示例。Claude3 采用了 Message API,支持多模态的数据输入。推理返回结果的内容结构,相比于此前 Claude2 的 Completions API 也发生了一些变化。

import json
import sys
import boto3

client = boto3.client("bedrock-runtime", region_name="us-east-1")

# 使用文本提示调用Claude 3
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
prompt = "你好"

body = json.dumps({
    "anthropic_version": "bedrock-2023-05-31",
    "max_tokens": 1024,
    "messages": [
        {
            "role": "user",
            "content": [{"type": "text", "text": prompt}],
        }
    ],
})

response = client.invoke_model_with_response_stream(
    body=body,
    modelId=model_id,
    accept="application/json",
    contentType="application/json",
)

stream = response["body"]
if stream:
    for event in stream:
        chunk = event.get("chunk")
        if chunk:
            chunk_obj = json.loads(chunk.get("bytes").decode())
            if 'delta' in chunk_obj.keys() and 'type' in chunk_obj.keys():
                if chunk_obj['type'] == 'content_block_delta':
                    chunk_obj = chunk_obj['delta']
                    sys.stdout.write(chunk_obj["text"])
                    sys.stdout.flush()  # 正式使用时,这里可以用yield返回

输出效果如下所示:

二、流式响应的技术方案介绍

目前流式响应输出到客户端的技术,主要有长轮询、 WebSocket 与 SSE 。

长轮询

浏览器发出 XMLHttpRequest 请求,服务器端接收到请求后,会阻塞请求直到有数据或者超时才返回,浏览器 JS 在处理请求返回信息(超时或有效数据)后再次发出请求,重新建立连接。 这种方式用于流式响应,一方面增加了不必要的请求-响应的往返时间,还给客户端与服务端带来了额外的负载压力与资源浪费。

WebSocket

WebSocket 是 HTML5 定义的新协议,实现了服务器与客户端之间的全双工通信。WebSocket 连接一旦建立,客户端和服务器端处于平等地位,可以相互发送数据,不存在请求和响应的区别,其原理如下图所示。

WebSocket 通常应用于实时聊天、多人在线游戏等场景。在 AWS 上可以使用 API Gateway 与 Lambda 来实现 Websocket 的服务端。但在 Claude3 的流式响应场景中,采用 Websocket 方案显得有些大材小用,我们希望能有更轻量级的实现方式。

HTTP SSE

HTTP SSE 的全称是 HTTP Server-Sent Events,它提供了一种从服务器实时发送更新事件到客户端的技术。SSE 主要解决了客户端与服务器之间的单向实时通信需求(例如 Claude3 回答的流式响应),相较于 WebSocket(双向实时),它更加轻量级且易于实现。SSE 是基于 HTTP 协议实现的,所以更适用于服务器持续的向客户端发送文本。其原理示意如下图所示。

本文将主要使用 HTTP-SSE 技术来展开介绍,如何将 Claude3 的回答流式推向客户端。

三、使用 Python Flask 搭建 SSE Demo

一提到 SSE,我们首先考虑了使用 Python Flask 框架来实现一个原型(Demo)。Flask 是一个轻量级的 Web 应用框架,它提供了简单易用的工具和技术来构建 Web 服务器。特别是,我们利用了 Flask 的 stream_with_context 功能来实现服务器发送事件(SSE),这使得服务器能够以流的形式实时推送数据到客户端。

将 Claude3 的流式推理结果,通过 SSE 技术推送到客户端的代码如下:

#pip install Flask boto3 CORS
from flask import Flask, Response, stream_with_context, request
from flask_cors import CORS  # 导入CORS模块
import time
import json
import sys
import boto3


app = Flask(__name__)
CORS(app)  # 为app添加跨域支持

def generate_stream(prompt):
    client = boto3.client("bedrock-runtime", region_name="us-east-1")

    # 调用 Claude 3 并提供文本提示
    model_id = "anthropic.claude-3-sonnet-20240229-v1:0"

    body = json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1024,
        "messages": [{
            "role": "user",
            "content": [{"type": "text", "text": prompt}],
        }],
    })

    response = client.invoke_model_with_response_stream(
        body=body,
        modelId=model_id,
        accept="application/json",
        contentType="application/json",
    )

    stream = response["body"]
    if stream:
        for event in stream:
            chunk = event.get("chunk")
            if chunk:
                chunk_obj = json.loads(chunk.get("bytes").decode())
                if 'delta' in chunk_obj.keys() and 'type' in chunk_obj.keys():
                    if chunk_obj['type'] == 'content_block_delta':
                        chunk_obj = chunk_obj['delta']
                        yield f"data: {json.dumps(chunk_obj['text'], ensure_ascii=False)}\n\n"  # 修改为适合HTTP SSE的格式


@app.route('/stream')
def stream():
    prompt = request.args.get('prompt', '你好')  # 从HTTP请求中提取prompt变量的值,默认值为"你好"
    response = Response(stream_with_context(generate_stream(prompt)), content_type='text/event-stream')
    response.headers['Access-Control-Allow-Origin'] = '*'  # 允许所有域名跨域访问
    return response

if __name__ == '__main__':
    app.run(debug=True, port=9000, host='0.0.0.0')

web 端代码如下:

<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <title>Stream Demo</title>
</head>
<body>
    <h2>输入提示词:</h2>
    <input type="text" id="promptInput" placeholder="请输入提示词">
    <button onclick="startStream()">开始流式响应</button>
    <div id="output" style="margin-top: 20px;"></div>

    <script>
        function startStream() {
            const prompt = document.getElementById('promptInput').value;
            const outputDiv = document.getElementById('output');
            outputDiv.innerHTML = ''; // 清空之前的输出

            // 创建一个新的EventSource实例,连接到服务器端的/stream端点
            const eventSource = new EventSource(`http://xx.xx.xx.xx:9000/stream?prompt=${encodeURIComponent(prompt)}`);

            eventSource.onmessage = function(event) {
                // 当接收到新的数据时,将其添加到页面上

                console.log('Received data:', event.data);
                
                // 使用 decodeURIComponent 和 escape 函数转换 Unicode 编码的字符串
                var decodedMessage = decodeURIComponent(event.data).replace(/^"|"$/g, '');
                
                outputDiv.innerHTML += decodedMessage; 
            };

            eventSource.onerror = function() {
                // 如果发生错误,关闭连接
                eventSource.close();
                outputDiv.innerHTML += '<p>流已结束</p>';
            };
        }
    </script>
</body>
</html>

效果如下所示:

尽管使用 Flask API 来实现这个 Demo 相对简单直接,但在生产环境中,直接使用 Flask API 作为 API 服务器存在一些劣势,特别是当考虑到可扩展性、管理和成本效率时。例如,Flask 应用通常需要部署在一台或多台服务器上,这意味着你需要管理这些服务器的维护、监控和扩展。此外,对于流量波动较大的应用,服务器可能会在低峰时期闲置,造成资源浪费,或在高峰时期过载,影响服务质量。

为了克服这些劣势,我们转向了 AWS Lambda 来实现 streaming response。

四、基于 AWS  Lambda  来实现  SSE Demo

AWS Lambda 是一个无服务器计算服务,它允许你运行代码而无需预置或管理服务器。Lambda 只在代码执行时才收费,这使得它在成本效率上对于不定时或间歇性的工作负载非常有吸引力。通过 Lambda,我们可以实现一个更加弹性的架构,自动扩展以应对请求量的变化,同时减少了管理服务器的负担。

在客户端你可以基于 SSE 协议,调用 Lambda 函数 URL 来实时获取响应结果。

创建 Lambda 函数

AWS Lambda 默认支持使用 Node.js Runtime 支持流式响应。对于其他语言,你可以使用使用带有自定义 Runtime 方式来实现,或者使用 Lambda Web 适配器。

然后将如下代码粘贴到 Lambda 函数中:

import util from 'util';
import stream from 'stream';
import {
  BedrockRuntimeClient,
  InvokeModelWithResponseStreamCommand,
} from "@aws-sdk/client-bedrock-runtime";

import querystring from 'querystring';

const finished = util.promisify(stream.finished); 

// Create a new Bedrock Runtime client instance.
const client = new BedrockRuntimeClient({ region: "us-east-1" });
const modelId = "anthropic.claude-3-sonnet-20240229-v1:0";
let prompt = "您好";


export const handler = awslambda.streamifyResponse(async (event, responseStream, _context) => {
  
  console.log(event);

  responseStream.setContentType("text/event-stream");
  
   // 假设event.rawQueryString存在并包含查询字符串
  const rawQueryString = event.rawQueryString;
    
  // 使用querystring模块解析查询字符串
  const params = querystring.parse(rawQueryString);
  
  prompt=params['prompt'];
    
  
  // Prepare the payload for the model.
  const payload = {
    anthropic_version: "bedrock-2023-05-31",
    max_tokens: 1000,
    messages: [
      {
        role: "user",
        content: [{ type: "text", text: prompt }],
      },
    ],
  };
  try {
    // 使用payload调用Claude并等待API响应
    const command = new InvokeModelWithResponseStreamCommand({
      contentType: "application/json",
      body: JSON.stringify(payload),
      modelId,
    });
    const apiResponse = await client.send(command);

    // 解码并处理响应流
    for await (const item of apiResponse.body) {
      const chunk = JSON.parse(new TextDecoder().decode(item.chunk.bytes));
      const chunk_type = chunk.type;

      if (chunk_type === "content_block_delta") {
        const text = chunk.delta.text;
        responseStream.write(`data: ${JSON.stringify(text)}\n\n`);
      }
    }
  } catch (error) {
    console.error("处理API响应时发生错误:", error);
    responseStream.write(`data: ${JSON.stringify({ error: "处理请求时发生错误" })}\n\n`);
  }
  
  responseStream.end();
  
  
});
配置权限

在创建完 Lambda 后,会生成一个具备基础权限的默认角色。需要在该角色中增加 Bedrock 的调用权限。点击下图中的角色名称,打开 IAM 服务中的角色管理页面。

在该 IAM 角色中添加一个内联权限,权限内容如下所示:

配置 Lambda 响应超时时间

在流式响应场景中,Lambda 的整体响应时间范围可能从几秒中到几分钟,具体取决于输入/输出 Token 的大小。所以需要提前修改 Lambda 默认的超时时间。

设置 Lambda 函数 URL

在函数 URL 配置页面,建议 Auth Type 选择 NONE。因为如果选择 AWS_IAM 方式,则需要在客户端存储 AWS 身份认证信息(如 AK/SK),有较大的安全隐患。所以,在面向最终用户的场景中,建议 Auth Type 设置为 NONE,然后在 Lambda 的 Header 中自行实现一些认证方式(如 API_KEY)。

此前,Invoke mode 需要选择 RESPONSE_STREAM,并配置 CORS,允许跨域访问。

Web 端示例代码:
<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <title>Stream Demo</title>
</head>
<body>
    <h2>输入提示词:</h2>
    <input type="text" id="promptInput" placeholder="请输入提示词">
    <button onclick="startStream()">开始流式响应(Lambda)</button>
    <div id="output" style="margin-top: 20px;"></div>

    <script>
        function startStream() {
            const prompt = document.getElementById('promptInput').value;
            const outputDiv = document.getElementById('output');
            outputDiv.innerHTML = ''; // 清空之前的输出

            // 创建一个新的EventSource实例,连接到服务器端的/stream端点
            const eventSource = new EventSource(`http://xx.xx.xx.xx:9000/stream?prompt=${encodeURIComponent(prompt)}`);

            eventSource.onmessage = function(event) {
                // 当接收到新的数据时,将其添加到页面上

                console.log('Received data:', event.data);
                
                // 使用 decodeURIComponent 和 escape 函数转换 Unicode 编码的字符串
                var decodedMessage = decodeURIComponent(event.data).replace(/^"|"$/g, '');
                
                outputDiv.innerHTML += decodedMessage; 
            };

            eventSource.onerror = function() {
                // 如果发生错误,关闭连接
                eventSource.close();
                outputDiv.innerHTML += '<p>流已结束</p>';
            };
        }
    </script>
</body>
</html>

效果如下:

Lambda 流式响应的限制
  • 单次调用的流式响应有默认为 20MB 的大小限制,但可以向后台申请提升限制。
  • 单次调用的流式响应的前 6MB 拥有不受限制的带宽。之后将以最大 2MBps 的速率进行流式响应。这在大语言推理场景,应该是完全足够了。
  • 通过 API Gateway 的 LAMBDA_PROXY 集成不支持流式响应。你可以在 API Gateway 和 Lambda 函数 URL 之间使用 HTTP_PROXY 集成,但你将受到 API Gateway 的 10MB 响应负载限制。此外,API Gateway 不支持分块传输编码,因此将无法实现流式响应的预期效果。

五、总结&综述

本文从端到端的视角,介绍了 Claude3 的流式推理,以及服务端流式响应的技术选型。通过比较分析,建议基于 Http-SSE 这种轻量级方式,来实现流式响应。最后,以 Claude3 为例,基于 Http-SSE 技术,分别介绍了使用 Python Flask、 AWS 云原生服务 Lambda 实现流式响应的实践。

本篇作者

张盼富

AWS 解决方案架构师,从业十三年,先后经过历云计算、供应链金融、电商等多个行业,担任过高级开发、架构师、产品经理、开发总监等多种角色,有丰富的大数据应用与数据治理经验。加入亚马逊云科技后,致力于通过大数据+AI 技术,帮助企业加速数字化转型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/527665.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

访问网站显示不安全是什么原因?怎么解决?

访问网站时显示“不安全”&#xff0c;主要原因以及解决办法&#xff1a; 1.没用HTTPS加密&#xff1a;网站还在用老的HTTP协议&#xff0c;数据传输没加密&#xff0c;容易被人偷看或篡改。解决办法是网站管理员启用HTTPS&#xff0c;也就是给网站装个“SSL证书”。这个是最常…

5.6 mybatis之RowBounds分页用法

文章目录 mybatis 中&#xff0c;使用 RowBounds 进行分页&#xff0c;非常方便&#xff0c;不需要在 sql 语句中写 limit&#xff0c;即可完成分页功能。但是由于它是在 sql 查询出所有结果的基础上截取数据的&#xff0c;所以在数据量大的sql中并不适用&#xff0c;它更适合在…

深度学习学习日记4.8(下午)

1.softmax 函数的得出的结果是样本被预测到每个类别的概率&#xff0c;所有类别的概率相加总和等于1。使用 softmax 进行数据归一化&#xff0c;将数字转换成概率。 2.熵&#xff0c;不确定性&#xff0c;越低越好 3.KL 散度交叉熵-信息熵 预测越准&#xff0c;交叉熵越小&am…

【大数据】大数据概论与Hadoop

目录 1.大数据概述 1.1.大数据的概念 1.2.大数据的应用场景 1.3.大数据的关键技术 1.4.大数据的计算模式 1.5.大数据和云计算的关系 1.6.物联网 2.Hadoop 2.1.核心架构 2.2.版本演进 2.3.生态圈的全量结构 1.大数据概述 1.1.大数据的概念 大数据即字面意思&#x…

什么是人工智能?人工智能、机器学习、深度学习三者之间有什么关系吗?

深度学习是机器学习的一个分支。深度学习是机器学习的一部分&#xff0c;与机器学习的其他分支学科&#xff0c;以及统计学、人工智能等学科都有着紧密的联系。深度学习、机器学习、人工智能、统计学之间的关系如图1-4所示。 图1-4 深度学习、机器学习、人工智能、统计学之间的…

CNAS认可和CMA认可对比辨析

性质对比 CNAS&#xff08;China National Accreditation Service for Conformity Assessment&#xff0c;中国合格评定国家认可委员会&#xff09;。该委员会是国家授权的单位&#xff0c;是专门来做认可和监督工作的。认可的对象是认证机构、检验机构和实验室等。确认其是否有…

php站长在线工具箱源码优化版

环境要求 PHP > 7.4MySQL > 5.6fileinfo扩展使用Redis缓存需安装Redis扩展 源码下载地址&#xff1a;php站长在线工具箱源码优化版.zip

Android图形显示架构概览

图形显示系统作为Android系统核心的子系统&#xff0c;掌握它对于理解Android系统很有帮助&#xff0c;下面从整体上简单介绍图形显示系统的架构&#xff0c;如下图所示。 这个框架只包含了用户空间的图形组件&#xff0c;不涉及底层的显示驱动。框架主要包括以下4个图形组件。…

SRIO学习(3)使用SRIO IP核进行设计

文章目录 前言一、设计框图二、模块介绍三、上板验证 前言 本文将通过使用SRIO IP核实现数据通信&#xff0c;重点在于打通数据链路&#xff0c;具体的协议内容设计并非重点&#xff0c;打通了链路大家自己根据设计需求来即可。 一、设计框图 看了前面高速接口的一些设计&am…

【uniapp】开发微信小程序 — 注意事项

底部导航栏 (tabBar) 图标的正确做法&#xff1a; 1、图片的标准尺寸为 81px * 81px&#xff0c;该尺寸在官方的文档中有明确的说明&#xff0c;可以参考微信小程序全局配置文档中对 iconPath 属性的说明。 2、为了保持良好的间距&#xff0c;图片的内容区域设置 60px* 比较好&…

docker最简单教程(使用dockerfile构建环境)

一 手里有的东西 安装好的dockerdockerfile 二 操作 只需要在你的dockerfile文件下执行命令 docker build -t"xianhu/centos:gitdir" . 将用户名、操作系统和tag进行修改就可以了&#xff0c;这就相当于在你本地安装了一个docker环境&#xff0c;然后执行 docker…

【c 语言】结构体的概念

&#x1f388;个人主页&#xff1a;豌豆射手^ &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;C语言 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进步&…

JAVAEE——网络初始

文章目录 网络发展史独立模式网络模式局域网LAN路由器的诞生 网络通信的基础IP地址端口号 认识协议OSI七层模型TCP/IP五层模型 网络发展史 独立模式 在原始的年代中电脑间想要通信需要两台电脑连接一根网线&#xff0c;但是一台电脑基本上只有一个接口。因此想要链接更多的电…

函数重载和引用【C++】

文章目录 函数重载什么是函数重载&#xff1f;函数重载的作用使用函数重载的注意点为什么C可以函数重载&#xff0c;C语言不行&#xff1f; 引用什么是引用&#xff1f;引用的语法引用的特点引用的使用场景引用的底层实现传参时传引用和传值的效率引用和指针的区别 函数重载 什…

全国火情预报卫星遥感应用方案

一、引言 火情预报应急卫星遥感解决方案是一种利用卫星遥感技术进行火灾预警和应急响应的方法。该方案通过实时监测和分析森林、草原等地区的火情信息&#xff0c;为火灾预警和应急响应提供科学依据&#xff0c;有效减少火灾造成的损失。本技术文档将介绍火情预报应急卫…

Android-NDK的linux交叉编译环境

NDK工具包下载 NDK 下载 | Android NDK | Android Developers https://github.com/android/ndk/wiki/Unsupported-Downloads 以android-ndk-r26c下载为例&#xff0c;下载后将压缩包解压至/usr目录下 CMakeLists编译选项设置 编译平台变量判断条件中增加一下android条件…

7款公司电脑监控软件

7款公司电脑监控软件 研究证明&#xff0c;人们在家办公的效率比在办公室办公的效率低一半&#xff0c;其中原因是缺少监督&#xff0c;即便在公司办公&#xff0c;还存在员工偷闲的时刻&#xff0c;比如聊天、浏览无关网站、看剧、炒股等&#xff0c;企业想提高员工的工作效率…

TCP 三次握手与四次挥手面试题(计算机网络)

TCP 基本认识 TCP 头格式有哪些&#xff1f; 序列号&#xff1a;在建立连接时由计算机生成的随机数作为其初始值&#xff0c;通过 SYN 包传给接收端主机&#xff0c;每发送一次数据&#xff0c;就「累加」一次该「数据字节数」的大小。用来解决网络包乱序问题。 确认应答号&a…

STC89C52学习笔记(二)

STC89C52学习笔记&#xff08;二&#xff09; 综述&#xff1a;本文简要介绍了51单片机以及示例了如何成功创建并运行一个程序&#xff08;点亮一个LED&#xff09;。 一、单片机介绍 单片机简称MCU&#xff0c;MCUCPURAMROM定时器中断系统通讯协议等单片机任务是信息采集、…

T-Mamba:用于牙齿 3D CBCT 分割的频率增强门控长程依赖性

T-Mamba&#xff1a;用于牙齿 3D CBCT 分割的频率增强门控长程依赖性 摘要Introduction方法T-Mamba architectureTim block T-Mamba: Frequency-Enhanced Gated Long-Range Dependendcy for Tooth 3D CBCT Segmentation 摘要 三维成像中的高效牙齿分割对于正畸诊断至关重要&am…