作为 MinIO 专注于 AI 集成的开发人员,我一直在探索如何将我们的工具无缝集成到现代 AI 架构中,以提高效率和可扩展性。在本文中,我们将深入探讨使用 LangChain 将 MinIO 与检索增强生成 (RAG) 管道和 Weaviate 向量存储的集成。我们的目标是创建一个强大的数据处理框架,不仅可以提高工作流程效率,还可以有效地管理整个数据生命周期。
RAG简介
检索增强生成 (RAG) 是一种变革性的 AI 方法,它将从数据库中检索相关信息与生成模型相结合,以生成既具有信息量又与上下文相关的响应。这种方法通过利用外部知识源增强了人工智能生成细微答案的能力,从而显着提高了生成内容的质量。
RAG 通过一个流程运行,该流程最初从数据库或知识库中识别和检索相关信息,然后使用这些信息来通知和生成响应。从密集的向量搜索到基于关键字的方法,其检索策略的灵活性使 RAG 能够适应各种应用程序,确保生成的响应不仅在上下文中准确,而且是最新的。
使用 RAG 的优势包括无需重新训练模型即可动态更新源知识,通过可追溯源提高 AI 的响应质量,并减少不准确。然而,RAG 系统的有效性在很大程度上取决于检索准确性和生成模型的局限性之间的平衡,例如处理大型上下文或避免“幻觉”内容。
为了深入探索RAG在AI生成内容中的机制和应用,arXiv等资源提供了全面的调查和研究。此外,Papers with Code 和 Elastic Search Labs 等平台提供了对 RAG 运营细微差别的实用见解,解决了其部署的挑战和注意事项。
讨论的技术概述
这些是开发概念验证的要素:
-
Weaviate:充当强大的矢量搜索引擎,通过语义搜索功能实现高效的数据检索。此技术有助于 RAG 管道根据查询的上下文快速准确地获取相关数据。
-
MinIO:作为高性能对象存储系统,对于在 AI 和 RAG 管道的各个阶段存储和管理数据至关重要。MinIO 的可扩展性和可靠性确保了大型数据集的高效处理,支持 AI 模型对大量信息的需求。
-
LangChain:提供将 AI 模型与 RAG 管道的其他组件集成的基本框架。它允许精确控制 AI 模型、数据检索过程和存储系统之间的交互,从而促进 AI 解决方案的定制开发。
通过这种方法,我们将探索在RAG管道中将MinIO与Weaviate和LangChain集成的潜力。
为什么 MinIO 在 AI 数据处理中至关重要
MinIO 作为对象存储解决方案表现出色,为管理 AI 运营中的数据提供了关键基础设施。与在层次结构中组织文件的传统数据存储不同,MinIO 将数据视为对象。这种抽象至关重要,因为 AI 不仅将数据作为文件或文档处理,而且将数据作为富含元数据的对象进行处理。这种基于对象的方法简化了交互,允许 AI 系统通过统一的 API 在粒度级别管理和理解数据。
通过存储桶简化数据管理
MinIO 架构的核心是其存储桶级组织,它简化了数据管理的复杂环境。MinIO 中的存储桶有助于分段和管理对象数据以及关联的元数据,同时保持版本控制、对象锁定和自定义保留策略等基本特征。这种细分对于在 AI 应用程序中实施数据治理和运营控制至关重要。
增强 AI 与 Webhook 的集成
在 MinIO 中使用 webhook 为 AI 集成引入了一条强大的途径。Webhook 允许在数据在存储环境中发生变化时对数据执行 lambda 函数,从而提供一种动态方法来触发 AI 驱动的流程。此功能在需要实时数据处理的方案中特别有用,例如在动态 ETL 管道中。我之前的文章“Dynamic ETL Pipeline: Hydrate AI with Web Data for MinIO and Weaviate using Unstructured-IO”中详细介绍了这方面的一个例子,该文章演示了如何使用自定义对象填充 MinIO 和 Weaviate,通过 AI 增强功能将 URL 转换为查询生成的内容。
对象级数据灵活性
在桶的有组织结构中,MinIO 可以存储各种各样的数据类型,从日志和媒体等原始和半结构化数据到代码和文档等结构化数据。这种多功能性类似于在存储库中管理代码的方式;但是,MinIO 通过将每个项目视为具有其生命周期和元数据的对象来进一步扩展这一点。随着人工智能技术的发展,LangChain等框架将利用这些对象数据原则,加强MinIO在现代人工智能生态系统中的相关性。使用 MinIO 存储桶存储 AI 功能和配置的能力是另一个显着优势,特别是对于构建自定义 AI 执行引擎的开发人员而言。
RAG Pipeline 中 MinIO 的关键集成点
在 (RAG) 流水线中,MinIO 可以增强流水线的效率、可扩展性和可靠性的多个关键功能。下面将更深入地探讨 MinIO 在这些复杂框架中可以发挥的关键作用:
-
集中式原始数据管理:首先,MinIO 在集中和扩展原始数据存储方面的作用至关重要。它充当整个管道的骨干,确保原始数据集随时可用于后续处理和分析。这种集中式方法不仅简化了数据可访问性,还巩固了 RAG 管道运行的基础。
-
增强数据准备:数据在管道中的旅程涉及大量的清理和预处理,通过 MinIO 强大的版本控制和易于访问的机制使任务更加高效。此功能可确保数据在清理和准备后以有序方式存储,为下一个转换步骤做好准备,从而加速数据准备阶段。
-
简化中间流程:随着数据经历各种转换和拆分,MinIO 中的中间数据工件存储变得不可或缺。通过管理数据处理的这些阶段,MinIO支持无缝的工作流程,确保每条数据都得到考虑并可供进一步处理。
-
动态提示管理:RAG 管道中查询的动态特性需要灵活的提示管理,MinIO 可以熟练地处理这一功能。通过存储提示或查询模板,MinIO 可以生成既相关又适合当前特定需求的查询,从而增强管道的响应能力。
-
增强数据处理:对于增强数据存储,MinIO 提供了可扩展的解决方案,可满足通过管道扩充的数据工件的存储需求。这确保了对生成细微响应至关重要的增强内容得到有效管理和存储,为内容创建做好准备。
-
存档生成的结果:在 MinIO 中存档生成的结果以进行历史分析和轻松检索是另一个关键集成点。此功能不仅可以保留输出以供将来参考,还可以提供对生成内容演变的见解,从而支持持续改进。
-
模型和配置版本控制:MinIO 对模型和工件版本控制的支持对于促进 RAG 管道中的实验和开发至关重要。此功能允许跟踪迭代和配置,从而更轻松地恢复更改或探索新方向,而不会丢失进度。
-
备份和恢复:在 AI 驱动的系统环境中,备份和恢复超越了灾难预防。对于 AI 操作,尤其是那些涉及 Weaviate 等 vectorstore 数据库的操作,高效保存和恢复数据的能力至关重要。此功能不仅是为了防止数据丢失,还是为了保护人工智能系统的“内存”。
人工智能技术本身并不能够记住它们处理的每一条数据;因此,实施强大的备份和恢复协议对于保持数据驱动型见解的连续性和完整性至关重要。MinIO 通过提供弹性基础设施来备份和恢复这些关键数据结构,在这种情况下发挥着至关重要的作用。
之前关于如何使用 MinIO S3 存储桶备份 Weaviate 的讨论说明了这些概念的实际实现。MinIO 提供的功能不仅可以防止数据丢失,还可以确保 AI 系统能够调用和重新生成其“内存”(矢量存储类),从而保持操作一致性并增强系统的整体弹性。
使用 MinIO 优化 RAG
本指南介绍了如何初始化与 Weaviate 和 MinIO 的连接。访问 MinIO 博客资产存储库;在这里可以找到原始的 LangChain Weaviate-RAG 脚本,以及演示 MinIO Python SDK 的有效性以及如何实现它以完善 Weaviate-RAG 流程的笔记本。
以下代码块是 GitHub 上 LangChain 的“rag-weaviate”示例。从这一点开始,我们可以直接进入源代码,并开始从逻辑上演示如何将 MinIO 集成到这个预先存在的 Weaviate RAG 管道中。
import os
from langchain_community.chat_models import ChatOpenAI
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Weaviate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain.text_splitter import RecursiveCharacterTextSplitter
if os.environ.get("WEAVIATE_API_KEY", None) is None:
raise Exception("Missing `WEAVIATE_API_KEY` environment variable.")
if os.environ.get("WEAVIATE_ENVIRONMENT", None) is None:
raise Exception("Missing `WEAVIATE_ENVIRONMENT` environment variable.")
WEAVIATE_INDEX_NAME = os.environ.get("WEAVIATE_INDEX", "langchain-test")
### Ingestion via WebBasedLoader
# Load
loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
data = loader.load()
### Split text via RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=0)
all_splits = text_splitter.split_documents(data)
### Add to vectorDB
# vectorstore = Weaviate.from_documents(
# documents=all_splits, embedding=OpenAIEmbeddings(), index_name=WEAVIATE_INDEX_NAME
# )
# retriever = vectorstore.as_retriever()
vectorstore = Weaviate.from_existing_index(WEAVIATE_INDEX_NAME, OpenAIEmbeddings())
retriever = vectorstore.as_retriever()
### RAG prompt
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
### RAG pipeline
model = ChatOpenAI()
chain = (
RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
| prompt
| model
| StrOutputParser()
)
# Add typing for input
class Question(BaseModel):
__root__: str
chain = chain.with_types(input_type=Question)
在LangChain-Weaviate RAG流水线中集成MinIO SDK
上面的代码是我们的起点,它为我们提供了一个画布,可以在该画布上传达几个更简化的集成点。
初始化和设置
从设置 MinIO 客户端开始,此设置是基础,因为它支持 RAG 管道内的整个数据管理过程。
from minio import Minio
# Initialize the MinIO client with appropriate access credentials
minio_client = Minio(
"play.min.io:443",
access_key="minioadmin",
secret_key="minioadmin",
secure=True
)
修改数据引入
不要使用默认的 WebBaseLoader 进行数据引入,而是从 MinIO 过渡到直接数据检索方法。这一变化简化了数据访问过程,这对于从一开始就提高管道的效率至关重要。
from io import BytesIO
# Define a function to fetch data directly from a MinIO bucket
def fetch_data_from_s3(bucket_name, object_name):
response = minio_client.get_object(bucket_name, object_name)
data = BytesIO(response.read())
response.close()
return data.getvalue().decode('utf-8')
# Example usage
bucket_name = "hydrate-bucket"
object_name = "urls.txt"
data = fetch_data_from_s3(bucket_name, object_name)
有关 MinIO Python SDK 和 Weaviate 的水合数据的进一步演示…请参阅此处的 blog-assets 存储库中的笔记本。
处理和存储拆分数据
检索原始数据后,使用 LangChain 中的“RecursiveCharacterTextSplitter”将数据分割成可管理的块,为更深入的分析和处理做好准备。
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Split the retrieved data into smaller text chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=0)
all_splits = text_splitter.split_documents(data)
# Store the processed data splits back into MinIO for easy retrieval and further use
def store_data_in_s3(bucket_name, object_name, content):
content_bytes = content.encode('utf-8')
minio_client.put_object(bucket_name, object_name, BytesIO(content_bytes), len(content_bytes))
# Example of storing each split in MinIO
for index, split in enumerate(all_splits):
store_data_in_s3("clean-bucket", f"split_{index}.txt", split)
有关使用 MinIO Python SDK 进行保存拆分的进一步演示,请参阅此处的 blog-assets 存储库中的笔记本。
检索和利用 RAG 提示
从 MinIO 检索必要的 RAG 提示,这对于在 RAG 管道中指导 AI 模型的响应生成至关重要。
# Fetch the RAG prompt stored in a MinIO bucket
# Function to retreive rag-prompt.txt from prompt-bucket
def get_prompt_from_minio(client, bucket_name, object_name):
"""
Retrieve a text file from a specified MinIO bucket and return its content as a string.
:param client: Minio client instance
:param bucket_name: Name of the MinIO bucket
:param object_name: Object name in the bucket (e.g., 'rag-prompt.txt')
:return: Content of the prompt as a string
"""
try:
# Get object data from the specified bucket
response = client.get_object(bucket_name, object_name)
data = response.read() # Read data from the response
prompt_content = data.decode('utf-8') # Decode bytes to string
return prompt_content
except Exception as e:
print(f"Failed to retrieve prompt: {e}")
return None
# Usage
prompt = get_prompt_from_minio(minio_client, "prompt-bucket", "rag-prompt.txt")
if prompt:
print(f"Retrieved prompt content: {prompt}")
else:
print("No content retrieved.")
有关使用 MinIO Python SDK 进行提示检索的进一步演示,请参阅此处的 blog-assets 存储库中的笔记本。
执行 RAG 流程链
所有元素都已就绪后,下面是一个 RetrievalQA 链示例,该链使用配置的 LangChain 框架执行 RAG 流程,利用检索器和语言模型生成上下文相关的答案。
from langchain.chains import RetrievalQA
# Setup the RetrievalQA chain with the language model and the vector store retriever
qa_chain = RetrievalQA.from_chain_type(
llm,
retriever=vectorstore.as_retriever(),
chain_type_kwargs={"prompt": prompt}
)
# Execute the RAG process with a sample query
question = "What are the approaches to Task Decomposition?"
result = qa_chain({"query": question})
print(result["result"])
由于 MinIO 提供的基础设施,定制集成 AI 的开发有无限的可能性,而 MinIO 可以提供。这种方法凸显了 MinIO 在 RAG 管道中的多功能作用及其简化运营的潜力,为管理 AI 驱动的数据处理和内容生成任务的生命周期提供了实用且高效的解决方案。
MinIO 集成 RAG 管道的评估
利用 MinIO、Weaviate 和 LangChain 的精细 RAG 管道从详细而有条不紊的评估过程中受益匪浅。以下是我们简化此方法的方法:
Ragas 框架应用程序
Ragas 是我们评估的核心,为检索和生成提供全面的指标:
-
上下文相关性和召回:确保检索到的信息全面且与查询相关。
-
答案的忠实性和相关性:验证生成的答案是否准确,并直接解决提出的问题。
Ragas LLM以评估为导向的评估提供了一个统一的分数,反映了整体 QA 绩效。它的细致入微的使用LLMs包括根据上下文将答案解析为可验证的陈述,将预测的潜在问题与实际问题的相关性进行比较,并确保所有支持信息都存在于检索到的上下文中。
使用LangSmith进行持续评估
LangSmith 通过提供以下功能来增强 Ragas:
-
用于创建和管理测试数据集以及运行评估的平台。
-
用于可视化和分析结果的工具,使 Ragas 指标透明且可操作。
-
使用真实世界数据增强评估的工具,从生产日志中添加测试示例。
这些工具共同支持迭代和动态评估方法,从而实现 RAG 管道的实时洞察和持续改进。
该评估策略强调了管道的功能性和适应性,旨在维护一个强大而高效的系统。它专注于绩效指标和持续评估,为持续完善和优化集成 RAG 管道提供了一条途径
进一步探索RAG和AI
为了更深入地了解检索增强生成 (RAG) 和相关技术,我整理了以下资源,我发现这些资源对我自己的探索很有帮助:
-
Hugging Face 的 RAG 文档:Hugging Face 提供的有关 RAG 模型的详细文档。
-
使用 Ragas 和 LangSmith 进行 RAG 评估:使用 Ragas 和 LangSmith 评估 RAG 系统的指南。
-
Retrieval-Augmented Generation for Large Language Models: A Survey:一篇关于将检索机制与大型语言模型相结合的进展的调查论文。
-
人工智能生成内容的检索增强生成:一项调查:本文全面研究了 RAG 系统在生成人工智能内容方面的有效性。
-
ARES:检索增强生成系统的自动评估框架:ARES微调语言模型以自主评估RAG系统,从而减少对人类注释的依赖。
-
RaLLe:开发和评估检索增强大型语言模型的框架:RaLLe 提供了用于构建和测试 R-LLMs 的工具,增强了检索和生成过程的透明度和优化。
最后的想法:使用 MinIO 增强 AI 工作流程
当我们探索 MinIO 与检索增强生成 (RAG) 管道的集成以及使用 Langchain 的 Weaviate 向量存储时,很明显,这种组合为 AI 数据处理提供了显着增强。通过利用 MinIO 强大的存储功能,以及 Weaviate 高效的矢量存储和灵活的 Langchain 框架,我们可以简化工作流程并在整个生命周期内更有效地管理数据。
这种集成的影响超越了当前的增强功能,为更广泛的人工智能领域的创新开辟了新的途径。它有望扩展人工智能系统的能力,特别是在生成细致入微的、上下文丰富的响应方面,这些响应可能会变得更加普遍。这种工具的持续发展和完善不仅将简化现有流程,还将为人工智能应用的新突破铺平道路。