项目简介:
小李哥将继续每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案,帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践,并应用到自己的日常工作里。
本次介绍的是如何在亚马逊云科技上利用音视频转录服务Transcribe提取视频内容字幕,并利用SageMaker部署开源向量模型GPT-J 6B和大语言模型Falcon-7B,利用向量模型对视频字幕进行向量化并存入FAISS内存向量库,最后利用streamlit框架搭建Web UI集成大模型API实现基于视频内容的问答。本架构设计全部采用了云原生Serverless架构,提供可扩展和安全的AI解决方案。本方案的解决方案架构图如下:
方案所需基础知识
什么是 Amazon SageMaker?
Amazon SageMaker 是亚马逊云科技提供的一站式机器学习服务,帮助开发者和数据科学家轻松构建、训练和部署机器学习模型。SageMaker 提供了全面的工具,从数据准备、模型训练到部署和监控,覆盖了机器学习项目的全生命周期。通过 SageMaker,用户可以加速机器学习模型的开发和上线,并确保模型在生产环境中的稳定性和性能。
什么是 Amazon Transcribe?
Amazon Transcribe 是亚马逊云科技提供的一项自动语音识别(ASR)服务,能够将语音内容准确地转录为文本。Transcribe 支持实时和批量转录,能够处理多种语言和方言。它被广泛应用于客户服务、内容制作、会议记录等场景,帮助企业轻松地将音频和视频内容转化为可搜索、可分析的文本数据。
基于视频转录内容进行问答的现实场景
在现代内容消费和业务场景中,视频正变得越来越重要。通过 Amazon Transcribe,将视频内容转录为文本后,可以进一步应用于问答系统,提升用户体验和业务效率。例如:
在线教育:
在在线课程平台上,学生可以通过输入问题,基于课程视频的转录内容快速找到答案,提升学习效率和效果。
客户支持:
客服团队可以通过视频转录内容建立知识库,客户在观看产品演示视频时,能够实时查询与视频内容相关的常见问题和解答,提升客户满意度。
媒体内容索引:
对媒体公司而言,转录视频内容后,可以建立基于视频内容的问答系统,用户可以快速搜索视频中的关键信息,提升内容的可访问性和互动性。
企业内部培训:
企业可以将培训视频内容转录为文本,并基于这些转录内容提供员工自助问答服务,帮助员工快速获取培训相关的信息,提升内部知识传播效率。
本方案包括的内容
1. 利用亚马逊云科技Transcribe服务对音视频文件进行文字转录
2. 在Amazon SageMaker上部署向量大模型
3. 在Amazon SageMaker上利用Jupyter Notebook处理原始数据构建知识库
4. 开发基于Streamlit框架的视频内容问答的网页服务并部署。
项目搭建具体步骤
1. 进入亚马逊云科技控制台,并打开Amazon Transcribe服务
2. 点击”Create Job“创建一个Transcribe音视频转录服务
3. 为任务取名为”video-transcription“,并选择语言为英文
4. 在S3桶中选择一个我们希望转录的视频文件。
5. 选择一个保存转录文本的S3存储桶,最后点击Create创建
6. 重复上述步骤我们再创建一个新的任务”audio-transcription“,用于转录音频文件。
7. 接下来我们进入到SageMaker服务主页,点击Open Studio打开我们的Studio机器学习开发环境。
8. 我们点击JumpStart中的HuggingFace,用于快速部署AI大模型,无需额外创建和训练。
9. 在搜索框中搜索模型名”GPT-J 6B Embedding FP16“, 并点击
10. 配置模型计算资源类型为”ml.g5.2xlarge“,再点击部署。同时我们使用相同的步骤部署大模型”Falcon-7b“。
11. 接下来我们打开一个Jupyter Notebook,首先创建一个ipynb文件,复制以下代码安装和导入必要依赖,如LangChain和SageMaker等。
!pip install --upgrade sagemaker --quiet
!pip install ipywidgets==7.0.0 --quiet
!pip install langchain==0.0.148 --quiet
!pip install faiss-cpu --quiet
!pip install unstructured==0.8.1 --quiet
import time
import os
import sagemaker, boto3, json
from sagemaker.session import Session
from sagemaker.model import Model
from sagemaker import image_uris, model_uris, script_uris, hyperparameters
from sagemaker.predictor import Predictor
from sagemaker.utils import name_from_base
from typing import Any, Dict, List, Optional
from langchain.embeddings import SagemakerEndpointEmbeddings
from langchain.llms.sagemaker_endpoint import ContentHandlerBase
12. 接下来我们初始化SageMaker客户端,定义三个函数分别获取大模型回复、处理响应、获取大模型API端点。
sagemaker_session = Session()
aws_role = sagemaker_session.get_caller_identity_arn()
aws_region = boto3.Session().region_name
sess = sagemaker.Session()
client = boto3.client("runtime.sagemaker")
def query_endpoint_with_json_payload(encoded_json, endpoint_name, content_type="application/json"):
response = client.invoke_endpoint(
EndpointName=endpoint_name, ContentType=content_type, Body=encoded_json
)
return response
def parse_response(query_response):
model_predictions = json.loads(query_response["Body"].read())
return model_predictions
def get_model_endpoint_with_prefix(prefix="falcon"):
# Create a SageMaker client
client = boto3.client('sagemaker')
# List all SageMaker endpoints
response = client.list_endpoints(
MaxResults=5,
SortBy='Name'
)
# Filter endpoints that start with 'falcon'
for endpoint in response['Endpoints']:
if endpoint['EndpointName'].startswith(prefix):
return endpoint['EndpointName']
return None
qa_endpoint_name = get_model_endpoint_with_prefix()
13. 我们利用以下代码段与SageMaker端点进行API交互生成关于问题的回复,问题为”什么是Amazon Bedrock“
## Code Cell 3 ##
question = "What is Amazon Bedrock?"
payload = {
"inputs": question,
"max_new_tokens": 50,
"top_k":50,
"num_return_sequences": 1,
"top_p":0.95,
"do_sample":True
}
query_response = query_endpoint_with_json_payload(
json.dumps(payload).encode("utf-8"), endpoint_name=qa_endpoint_name
)
generated_texts = parse_response(query_response)
print(f"{generated_texts[0]}")
14. 接下来我们通过LangChain创建一个SageMaker上的向量模型实例,指定了向量模型端点、文档向量化配置以及API输入输出请求格式。
## Code Cell 4 ##
from langchain.embeddings.sagemaker_endpoint import EmbeddingsContentHandler
from langchain.embeddings import SagemakerEndpointEmbeddings
class SagemakerEndpointEmbeddingsJumpStart(SagemakerEndpointEmbeddings):
def embed_documents(self, texts: List[str], chunk_size: int = 5) -> List[List[float]]:
"""Compute doc embeddings using a SageMaker Inference Endpoint.
Args:
texts: The list of texts to embed.
chunk_size: The chunk size defines how many input texts will
be grouped together as request. If None, will use the
chunk size specified by the class.
Returns:
List of embeddings, one for each text.
"""
results = []
_chunk_size = len(texts) if chunk_size > len(texts) else chunk_size
for i in range(0, len(texts), _chunk_size):
response = self._embedding_func(texts[i : i + _chunk_size])
print
results.extend(response)
return results
class ContentHandler(EmbeddingsContentHandler):
content_type = "application/json"
accepts = "application/json"
def transform_input(self, prompt: str, model_kwargs={}) -> bytes:
input_str = json.dumps({"text_inputs": prompt, **model_kwargs})
return input_str.encode("utf-8")
def transform_output(self, output: bytes) -> str:
response_json = json.loads(output.read().decode("utf-8"))
embeddings = response_json["embedding"]
return embeddings
content_handler = ContentHandler()
embeddings = SagemakerEndpointEmbeddingsJumpStart(
endpoint_name=get_model_endpoint_with_prefix("jumpstart"),
region_name=aws_region,
content_handler=content_handler,
)
15. 利用LangChain初始化SageMaker上的大语言模型,指定模型回复参数、API调用输入输出格式。
## Code Cell 5 ##
from langchain.llms.sagemaker_endpoint import LLMContentHandler, SagemakerEndpoint
parameters = {
"max_length": 200,
"num_return_sequences": 1,
"top_k": 250,
"top_p": 0.95,
"do_sample": False,
"temperature": 1,
}
class ContentHandler(LLMContentHandler):
content_type = "application/json"
accepts = "application/json"
def transform_input(self, prompt: str, model_kwargs={}) -> bytes:
input_str = json.dumps({"inputs": prompt, **model_kwargs})
return input_str.encode("utf-8")
def transform_output(self, output: bytes) -> str:
response_json = json.loads(output.read().decode("utf-8"))
print(response_json)
return response_json[0]["generated_text"]
content_handler = ContentHandler()
sm_llm = SagemakerEndpoint(
endpoint_name=get_model_endpoint_with_prefix("falcon"),
region_name=aws_region,
model_kwargs=parameters,
content_handler=content_handler,
)
16. 从S3桶中提取Transcribe转录的音视频内容,并将其保存到本地的rag_data/路径中,以txt结尾。
## Code Cell 6 ##
s3 = boto3.client('s3')
transcribes_bucket = [bucket['Name'] for bucket in s3.list_buckets()['Buckets'] if bucket['Name'].startswith('transcribe-')].pop()
!mkdir -p rag_data
!aws s3 cp --recursive s3://$transcribes_bucket rag_data
#Converting transcripts from json to text files
directory='rag_data'
for filename in os.listdir(directory):
# Check if the current file is a JSON file
if filename.endswith('.json'):
json_path = os.path.join(directory, filename)
# Open and read the JSON file
with open(json_path) as json_file:
data = json.load(json_file)
transcript = data["results"]["transcripts"][0]["transcript"]
# Create a corresponding text file name
text_filename = filename.replace('.json', '.txt')
text_path = os.path.join(directory, text_filename)
# Open the text file for writing and write the transcript
with open(text_path, 'w') as text_file:
text_file.write(transcript)
17. 下面我们利用LangChain构建一个知识库来对文件内容进行问答,首先导入必要的依赖和将txt文件中的内容导入DirectorLoader加载器中。
## Code Cell 7 ##
import nltk
nltk.download('punkt_tab')
# Import the LangChain doc
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
from langchain.document_loaders import TextLoader
from langchain.indexes import VectorstoreIndexCreator
from langchain.vectorstores import Chroma, AtlasDB, FAISS
from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter
from langchain import PromptTemplate
from langchain.chains.question_answering import load_qa_chain
from langchain.document_loaders import DirectoryLoader
loader = DirectoryLoader("./rag_data/", glob="*.txt")
documents = loader.load()
18. 接下来我们利用”VectorstoreIndexCreator“方法创建一个FAISS向量存储库,并利用向量模型将我们的文档内容向量化,再利用index.creator创建索引用于高效语义搜索。再使用我们刚才的问题对向量库进行提问。
## Code Cell 8 ##
index_creator = VectorstoreIndexCreator(
vectorstore_cls=FAISS,
embedding=embeddings,
text_splitter=RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=200),
)
index = index_creator.from_loaders([loader])
# Review the question variable content.
print(question)
# Ask the question to the model using the vector store with the transcript embeddings.
index.query(question=question, llm=sm_llm)
19. 同时我们还可以用FAISS直接将文档向量化并进行语义搜索提问得到相似性最高的3个回复。
## Code Cell 9 ##
docsearch = FAISS.from_documents(documents, embeddings)
print(question)
docs = docsearch.similarity_search(question, k=3)
20. 在此处我们利用LangChain定义提示词模板,并利用问答链对向量库中搜索出的内容进行搜索增强回复(RAG)。
## Code Cell 11 ##
prompt_template = """Answer based on context:\n\n{context}\n\n{question}"""
PROMPT = PromptTemplate(template=prompt_template, input_variables=["context", "question"])
chain = load_qa_chain(llm=sm_llm, prompt=PROMPT)
## Code Cell 12 ##
result = chain({"input_documents": docs, "question": question}, return_only_outputs=True)[
"output_text"
]
21. 接下来我们创建一个chatbot.py文件,包含以上所有代码,用于调用知识库生成回复。并且创建一个requirements.txt用于安装必要依赖。
aiohttp==3.8.4
aiosignal==1.3.1
altair==5.0.1
async-timeout==4.0.2
attrs==23.1.0
blinker==1.6.2
boto3==1.26.150
botocore==1.29.150
cachetools==5.3.1
certifi==2023.5.7
charset-normalizer==3.1.0
click==8.1.3
dataclasses-json==0.5.7
decorator==5.1.1
frozenlist==1.3.3
gitdb==4.0.10
GitPython==3.1.31
idna==3.4
importlib-metadata==6.6.0
Jinja2==3.1.2
jmespath==1.0.1
jsonschema==4.17.3
langchain==0.0.195
langchainplus-sdk==0.0.8
markdown-it-py==2.2.0
MarkupSafe==2.1.3
marshmallow==3.19.0
marshmallow-enum==1.5.1
mdurl==0.1.2
multidict==6.0.4
mypy-extensions==1.0.0
numexpr==2.8.4
numpy==1.24.3
openapi-schema-pydantic==1.2.4
packaging==23.1
pandas==2.0.2
Pillow==9.5.0
protobuf==4.23.2
pyarrow==12.0.0
pydantic==1.10.9
pydeck==0.8.1b0
Pygments==2.15.1
Pympler==1.0.1
pyrsistent==0.19.3
python-dateutil==2.8.2
pytz==2023.3
pytz-deprecation-shim==0.1.0.post0
PyYAML==6.0
requests==2.31.0
rich==13.4.1
s3transfer==0.6.1
six==1.16.0
smmap==5.0.0
SQLAlchemy==2.0.15
streamlit==1.23.1
streamlit-chat==0.0.2.2
tenacity==8.2.2
toml==0.10.2
toolz==0.12.0
tornado==6.3.2
typing-inspect==0.9.0
typing_extensions==4.6.3
tzdata==2023.3
tzlocal==4.3
urllib3==1.26.16
yarl==1.9.2
zipp==3.15.0
unstructured==0.8.1
transformers~=4.30.2
faiss-cpu==1.7.4
22. 接下来我们运行以下命令,启动streamlit网页服务器。
streamlit run chatbot.py
23. 将服务器启动返回的URL在浏览器中打开,选择使用知识库内容回复,输入问题”什么是Amazon Bedrock“并点击send发送
24. 就可以得到基于知识库中视频内容的问题回复了
以上就是在亚马逊云科技上利用亚马逊云科技上利用Amazon Sagemaker部署AI大模型和向量模型,并基于Transcribe服务转录的视频字幕实现与用户问答的全部步骤。欢迎大家未来与我一起,未来获取更多国际前沿的生成式AI开发方案。