什么是RAG?
RAG(Retrieval-Augmented Generation,检索增强生成) 是一种结合检索(Retrieval)和生成(Generation)的技术,旨在提升大语言模型(LLM)生成内容的准确性、相关性和时效性。
-
基本思想:通过外部知识库动态检索与用户查询相关的信息,并将检索结果作为上下文输入生成模型,辅助生成更可靠的回答。
-
与传统LLM的区别:传统LLM仅依赖预训练参数中的静态知识,而RAG能实时利用外部数据,解决模型幻觉(编造信息)和知识过时的问题。
RAG分为两阶段流程:
-
检索阶段(Retrieval):
-
将用户查询转换为向量或关键词,从外部知识库(如文档、数据库、网页)中检索相关内容。
-
常用技术:向量数据库(如FAISS)、BM25算法、语义相似度匹配。
-
-
生成阶段(Generation):
-
将检索到的信息与用户输入拼接,输入生成模型(如GPT、LLaMA)生成最终回答。
-
模型结合检索内容与自身知识,生成更精准、可解释的响应。
-
为什么需要RAG?
一、突破静态知识的限制
-
知识过时:
模型训练完成后无法自动更新知识(如GPT-3的数据截止到2021年)。
示例:若用户询问“2023年诺贝尔奖得主是谁”,传统模型无法回答,而RAG可通过检索最新新闻或数据库提供正确答案。 -
覆盖范围有限:
模型对长尾领域(如企业内部文档、专业论文)或小众问题(如特定设备故障代码)可能缺乏知识。
二、减少模型“幻觉”风险
大模型生成内容时可能编造看似合理但错误的信息(称为“幻觉”),这在关键场景(如医疗、法律)中风险极高。
三、无需重新训练,低成本扩展能力
传统方法需通过微调(Fine-tuning) 让模型适配新任务,但存在痛点:
-
数据需求高:需大量标注数据,成本高昂。
-
难以频繁更新:每次更新知识都需重新训练模型,效率低下
RAG的优势:
- 动态扩展知识:仅需更新外部知识库(如上传最新文档),无需修改模型参数。
- 灵活适配场景:同一模型可连接不同知识库,服务医疗、金融等多个领域。
RAG的工作流程
一、构建知识库
1. 文本分块:将长篇文档切分为多个文本块,一边更加高效地处理和检索信息。
2. 向量化:使用各种Embedding模型来将每个文本块转换为高纬向量表示,使得文本之间可以在高维空间中进行相似度比较和检索。
二、用户检索
1. 向量化:将用户提出的问题,经过同样的向量化处理,得到问题的向量表示。
2. 检索:通过向量数据库进行相似性检验(如余弦相似度),从知识库中找到与用户提问相关的文档切片。
3. 响应:将检索到的文本块作为额外的上下文,加上用户的问题,生成更加规范正确的答案。
基于Mindspore的RAG实现
首先,需要在ModelArts平台创建:mindspore == 2.3.0, cann == 8.0.0的notebook。
随后,克隆github项目:
克隆MindNLP并更新
git clone https://github.com/mindspore-lab/mindnlp.git
cd mindnlp
git checkout ef64a3b83097c9578bb0d5326f905beeb5b50e1d
bash scripts/build_and_reinstall.sh
RAG项目
git clone https://github.com/ResDream/MindTinyRAG.git
安装所需依赖
cd MindTinyRAG
pip install -r requirements.txt
一、 读取文件与处理
需求分析:
- 文件格式不同,需要判断文件类型再进行内容提取。
- 基于最大token长度和覆盖内容的逻辑分割长文本,确保段落间的语义连续性。
分块策略
1. 固定长度分块(Fixed-Size Chunks)
将文档按固定长度(如每段100字或500字符)均匀切割。
优点:实现简单,计算高效。适合结构化文本(如代码、表格)。
缺点:可能破坏句子或段落的语义完整性。对长距离依赖的上下文不敏感。
示例:
原文:人工智能(AI)是... [100字] → 分块1:人工智能(AI)是...;分块2:在医疗领域...;
2. 基于内容的分块(Content-Based Chunking)
根据文档结构(段落、标题、分隔符)进行分块。
常见方法:
- 段落分块:以自然段落为单位。
- 标题分块:按标题层级切分(如H1、H2)。
- 分隔符分块:根据标点(如句号)、空行或Markdown符号(
##
)分割。
优点:保留语义单元,减少上下文碎片化。适合非结构化文本(如文章、报告)。
缺点:依赖文档格式的规范性(如无明确分隔符时失效)。
示例:
# 第一章 引言
人工智能是... → 分块1(标题+段落)
## 1.1 发展历史
20世纪50年代... → 分块2
3. 滑动窗口分块(Sliding Window)
在固定长度分块基础上,通过重叠窗口(如50%重叠)连接上下文。
优点:缓解固定分块导致的上下文断裂问题。提升相邻块之间的语义连贯性。
缺点:增加存储和计算成本(块数量翻倍)。
示例:
窗口大小=200字符,重叠=50%
分块1:字符1-200
分块2:字符100-300
分块3:字符200-400
4. 语义分块(Semantic Chunking)
利用NLP模型(如句子嵌入、文本分割模型)识别语义边界。
常用技术:
- 句子分割:按完整句子分块。
- 主题分割:检测文本主题变化(如使用
TextTiling
算法)。 - 嵌入聚类:通过向量相似度合并相关段落。
优点:保持语义完整性,适合长文本(如论文、书籍)。提升检索相关性。
缺点:计算复杂度高,依赖模型性能。
示例
原文:深度学习在CV中的应用...(技术细节)。另一方面,NLP领域... → 分块1(CV)、分块2(NLP)
5. 动态分块(Dynamic Chunking)
据查询需求动态调整分块粒度。
实现方式:
- 多粒度索引:同时存储不同粒度的块(如段落、章节、全文)。
- 检索时合并:检索到多个相关小粒度块后合并为上下文。
优点:平衡召回率与噪声控制。适应多样化的查询需求。
缺点:系统设计复杂,需多层索引支持。
分块策略选择建议
-
平衡长度与语义:
-
一般场景:优先语义分块(如段落)或滑动窗口(重叠20-30%)。
-
长文本(如书籍):语义分块+动态合并。
-
-
考虑下游模型限制:
-
块长度需适配生成模型的输入容量(如GPT-4最大支持128k tokens)。
-
-
领域适配:
-
技术文档:按函数/API分块;法律文本:按条款分块。
-
-
实验调优:
-
通过检索准确率、生成质量等指标测试最佳分块大小。
-
项目使用滑动窗口分块策略:
- 分块大小(chunk size):定义每个分块的最大长度,通常以token为单位
- 滑动步长(stride size):定义分块的起始位置之间的距离,从而实现分块之间的重叠。
- 重叠内容(overlap):由 | chunk size - stride size | 决定,用于保留上下文
1. 确定每个文本块的最大长度(分块大小)
2. 确定滑动的步长,计算分块之间的重叠区域
3. 从文档头开始,提取长度为窗口大小的内容作为分块
4. 从前一个分块的起始位置向前滑动步长,提取下一段内容
5. 重复步骤4直至处理完整个文档。
定义ReadFile类:
# 定义读取文件的类
class ReadFiles:
'''
class for reading files with suffixes
'''
def __init__(self, path: str) -> None:
'''
path: 知识库的文件路径
file_list:文件对象的列表
'''
self._path = path
self.file_list = self.get_files()
@classmethod
def read_pdf(cls, file_path: str) -> str:
'''
使用PyPDF读取pdf文件,并返回文件中文本的方法
param:
file_path: 文件路径
return:
text: PDF中的所有文本
'''
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ''
for page_num in range(len(reader.pages)):
text += reder.pages[page_num].extract_text()
return text
@classmethod
def read_markdown(cls, file_path) -> str:
'''
使用bs4读取md文件,并返回文件中文本的方法
param:
file_path: 文件路径
return:
text: PDF中的所有文本
'''
with open(file_path, 'r', encoding='utf-8') as file:
md_text = file.read()
html_text = markdown.markdown(md_text)
soup = BeautifulSoup(html_text, 'html.parser')
plain_text = soup.get_text()
# 使用正则表达式移除网址链接
text = re.sub(r'http\S+', '', plain_text)
return text
@classmethod
def read_txt(cls, file_path: str) -> str:
'''
返回文件中文本的方法
param:
file_path: 文件路径
return:
text: 文件文本
'''
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
return text
def get_files(self) -> list[str]:
'''
获取指定路径下的所有文件
return:
file_list:可以进行解析的所有文件的文件路径
'''
file_list =[]
for filepath, dirnames, filenames in os.walk(self._path):
# os.walk 函数将递归遍历指定文件夹
for filename in filenames:
# 通过后缀名判断是否能进行解析
if filename.endswith('.md'):
file_list.append(os.path.join(filepath, filename))
elif filename.endswith('.txt'):
file_list.append(os.path.join(filepath, filename))
elif filename.endswith(".pdf"):
file_list.append(os.path.join(filepath, filename))
return file_list
def get_content(self, max_token_len: int = 600, cover_content: int = 150) -> list[str]:
'''
获取知识库中所有文件的文件内容
param:
max_token_len: 分块大小
cover_conten: 重叠内容大小
return:
docs: 文件内容
'''
docs = []
for file in self.file_list:
# 获取文件内容
content = self.read_file_content(file)
# 将文件内容分块
chunk_content = self.get_chunk(
content, max_token_len=max_token_len, cover_content=cover_content)
docs.extend(chunk_content)
return docs
@classmethod
def get_chunk(cls, text: str, max_token_len: int = 600, cover_content: int = 150):
chunk_text = []
# 当前块长度
curr_len = 0
# 当前块内容
curr_chunk = ''
# 滑动窗口的滑动步长
token_len = max_token_len - cover_content
# 假设以换行符对文本进行分割
lines = text.splitlines()
for line in lines:
# 去除文本中的空格
line = line.replace(' ', '')
# enc.encode: 将文本转换为 token ID 列表
line_len = len(enc.encode(line))
# 如果文本较长,对文本进行分块(处理超长行)
if line_len > max_token_len:
# 计算一共要切成多少个文本块
num_chunks = (line_len + token_len - 1) // token_len
for i in range(num_chunks):
start = i * token_len
end = start + token_len
# 确保每个块的结尾不会截断一个完整的单词,从而保持语义的连贯性。
# 通过循环逐步右移切割点,直到块末尾落在空白字符处:
# 如果末尾不是空白,将 start 和 end 右移一位,直到满足条件或越界。
while not line[start: end].rstrip().isspace():
start += 1
end += 1
if start >= line_len:
break
# 前一个块的末尾 cover_content 字符与当前切割内容拼接:
curr_chunk = curr_chunk[-cover_content: ] + line[start: end]
chunk_text.append(curr_chunk)
# 处理最后一个块
# 循环内的 end 可能超出文本实际长度,需要单独处理最后一个块以修正范围。
start = (num_chunks - 1) * token_len
curr_chunk = curr_chunk[-cover_content:] + line[start:end]
chunk_text.append(curr_chunk)
# 如果不是超长文本,直接处理
# 如果当前文本还没有到达最长长度,先拼接,不添加
if curr_len + line_len <= token_len:
curr_chunk += line
curr_chunk += '\n'
curr_len += line_len
curr_len += 1
# 如果达到最长长度,直接添加
else:
chunk_text.append(curr_chunk)
curr_chunk = curr_chunk[-cover_content: ] + line
curr_len = line_len + cover_content
if curr_chunk:
chunk_text.append(curr_chunk)
return chunk_text
@classmethod
def read_file_content(cls, file_path: str):
# 根据文件扩展名选择读取方法
if file_path.endswith('.pdf'):
return cls.read_pdf(file_path)
elif file_path.endswith('.md'):
return cls.read_markdown(file_path)
elif file_path.endswith('.txt'):
return cls.read_text(file_path)
else:
raise ValueError("Unsupported file type")
重点:get_chunk算法
该分块算法的核心实现思路可总结为以下流程:
首先基于换行符将原始文本分割为行级单位,假设“行”是天然语义边界;随后逐行处理时,针对两种场景采取不同策略。
普通行通过动态累加至当前块(curr_chunk
),若累计 token 长度(curr_len
)超出步长限制(token_len = max_token_len - cover_content
),则将当前块存入结果并基于重叠机制(保留末尾 cover_content
字符)初始化新块
超长行则按步长预切割后,通过逐字符右移切割点使块末尾落于空格处,避免单词截断,同时每个新块强制拼接前一块的重叠内容以维持上下文连贯性。
算法通过隐式覆盖(非显式重置)管理状态变量,最终保证所有块长度不过 max_token_len
且相邻块间存在可控重叠。其优势在于平衡了切割效率与语义完整性,但需注意空格删除对语义的破坏风险及冗余块处理问题,可通过反向空格查找、精确换行符 token 计算和状态隔离进一步优化。
二、定义Embedding类
# Embedding设计
class BaseEmbeddings:
'''
embedding的基类
'''
def __init__(self, path: str, is_api: bool) -> None:
self.path = path
self.is_api = is_api
def get_embedding(self, text: str, model: str) -> list[float]:
'''
待子类实现此方法
'''
raise NotImplementedErrort
@classmethod
def consine_similarity(cls, vector1: list[float], vector2: list[float]) -> float:
'''
计算两个向量的余弦相似度
'''
dot = np.dot(vector1, vector2)
magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2)
if not magnitude:
return 0
return dot / magnitude
class MindNLPEmbedding(BaseEmbeddings):
'''
MindNLP使用的Embedding类,继承了BaseEmbedding
'''
def __init__(self, path='BAAI/bge-base-zh-v1.5', is_api=False):
super().__init__(path, is_api)
self._model = self.load_model(path)
def get_embedding(self, text: str):
'''
使用定义的Embedding模型,获取输入句子的Embedding
'''
sentence_embedding = self._model.encode([text], normalize_embeddings=True)
return sentence_embedding
def load_model(self, path: str):
'''
通过MindNLP提供的类,加载Embedding模型并返回
'''
from mindnlp.sentence import SentenceTransformer
model = SentenceTransformer(path)
return model
@classmethod
def consine_similarity(cls, sentence_embedding_1, sentence_embedding_2):
'''
重写父类的相似度方法
子类在调用 self._model.encode 时设置了 normalize_embeddings=True,
这会强制将输出向量归一化为单位长度(模长为 1)。
所以相似度只需要计算点积结果即可表示。
'''
similarity = sentence_embedding_1 @ sentence_embedding_2.T
return similarity
三、知识库设计
向量数据库(Vector Database)是一种专门用于存储、管理和高效检索高维向量数据的数据库系统。它的核心目标是解决传统数据库难以处理的大规模高维数据相似性搜索问题,广泛应用于人工智能、机器学习和大数据领域。
常用的向量数据库有Meta AI团队提供的FAISS或华为的GaussDB for Vector
本案例将自己实现一个简单的向量数据库。
需求分析:
- Embedding计算
- 数据持久化存储
- 从文件中加载数据
- 相似文档查询
class VectorStore:
def __init__(self, document: list[str] = ['']):
'''
初始化向量库
param:
document: 所有已经分块好的文本块
'''
self.document = document
def get_vector(self, EmbeddingModel: BaseEmbeddings):
'''
获取所有文本块的向量
'''
self.vetors = []
for doc in tqdm(self.document, desc='Calculating Embeddings'):
# 通过Embedding类中定义好的方法,获取每个文本块的向量
self.vetors.append(EmbeddingModel.get_embedding(doc))
return self.vetors
def persits(self, path: str = 'storage'):
'''
将向量数据保存到本地
'''
if not os.path.exists(path):
os.makedirs(path)
# 持久化存储文档
with open(f"{path}/document.json", 'w', encoding='utf-8') as f:
json.dump(self.document, f ,ensure_ascii=False)
# 持久化存储向量
if self.vetors:
vectors_list = [vector.tolist() for vector in self.vetors]
with open(f"{path}/vectors.json", 'w', encoding='utf-8') as f:
json.dump(vectors_list, f)
def load_vector(self, EmbeddingModel: BaseEmbeddings, path: str = 'storage'):
with open(f"{path}/document.json", 'r', encoding='utf-8') as f:
self.document = json.load(f)
with open(f"{path}/vectors.json", 'r', encoding='utf-8') as f:
vectors_list = json.load(f)
# 判断EmbeddingModel的类型
if isinstance(EmbeddingModel, MindNLPEmbedding):
self.vetors = [np.array(vector) for vector in vectors_list]
else:
self.vetors = vectors_list
def get_similarity(self, query: str, EmbeddingModel: BaseEmbeddings, k: int = 1):
# 获取query的向量
query_vector = EmbeddingModel.get_embedding(query)
# 计算查询向量和所有文本块的相似度
similaraties = [self.get_similarity(query_vector, vector) for vector in self.vetors]
# 将相似度、向量和文档存储在一个列表中
results = []
for similarity, vector, document in zip(similaraties, self.vetors, self.document):
results.append({
'similarity': similarity,
'vector': vector,
'document': document
})
# 将结果按照相似度降序排序
results.sort(key=lambda x: x['similarity'], reverse=True)
# 选取相似度最高的top_k个结果
top_k_documents = [result['document'] for result in results[:k]]
return top_k_documents
四、定义模型
# 定义语言模型
class BaseModel:
def __init__(self, path: str = ''):
self.path = path
def chat(self, prompt: str, history: list[dict], content: str) -> str:
pass
def load_model(self):
pass
# 定义prompt模板
PROMPT_TEMPLATE = dict(
RAG_PROMPT_TEMPALTE="""使用以上下文来回答用户的问题。如果你不知道答案,请输出我不知道。总是使用中文回答。
问题: {question}
可参考的上下文:
···
{context}
···
如果给定的上下文无法让你做出回答,请回答数据库中没有这个内容,你不知道。
有用的回答:""",
MindNLP_PROMPT_TEMPALTE="""先对上下文进行内容总结,再使用上下文来回答用户的问题。如果你不知道答案,请输出我不知道。总是使用中文回答。
问题: {question}
可参考的上下文:
···
{context}
···
如果给定的上下文无法让你做出回答,请回答数据库中没有这个内容,你不知道。
有用的回答:"""
)
class MindNLPChat(BaseModel):
def __init__(self, path: str = ''):
super().__init__(path)
self.load_model()
def chat(self, prompt: str, history: list = [], content: str = '') -> str:
"""
生成对话回复。
Args:
prompt (str): 用户输入的提示文本。
history (list[dict]): 对话历史,每个元素为包含'user'和'assistant'键的字典。
content (str): 上下文内容,用于增强生成的相关性。
Returns:
str: 模型生成的回复文本。
"""
prompt = PROMPT_TEMPLATE['MindNLP_PROMPT_TEMPALTE'].format(question=prompt, context=content)
resp, history = self.model.chat(self.tokenizer, prompt, history=history, max_length=1024)
return resp
def load_model(self):
import mindspore
from mindnlp.transformers import AutoTokenizer, AutoModelForCausalLM
self.tokenizer = AutoTokenizer.from_pretrained(self.path, mirror="huggingface")
self.model = AutoModelForCausalLM.from_pretrained(self.path, ms_dtype=mindspore.float16, mirror="huggingface")
五、重排序ReRank
Rerank(重排序)技术是优化检索结果的关键环节。它的核心目标是从初步检索得到的候选文档中,筛选出最相关、高质量的文档作为生成模型的输入,从而提升最终答案的准确性和相关性。
在RAG的典型流程中,Rerank位于检索(Retrieval)和生成(Generation)之间:
-
检索阶段:使用快速但粗粒度的检索模型(如BM25、双编码器Bi-Encoder)从海量文档中召回Top-K候选(例如K=100)。
-
Rerank阶段:对Top-K候选进行精细排序,选出最相关的Top-N(例如N=5)文档。
-
生成阶段:基于Top-N文档生成最终答案。
class MindNLPReranker(BaseReranker):
def __init__(self, path: str = 'BAAI/bge-reranker-base'):
super().__init__(path)
self._model = self.load_model(path)
def load_model(self, path: str):
from mindnlp.sentence import SentenceTransformer
model = SentenceTransformer(path)
return model
def rerank(self, text, content, k):
query_embedding = self._model.encode(text, normalize_embeddings=True)
sentences_embedding = self._model.encode(content, normalize_embeddings=True)
similarity = query_embedding @ sentences_embedding.T
# 根据相似度降序排序
reanked_index = np.argsort(similarity)[::-1]
# 选择top k个结果
top_k = [content[i] for i in reanked_index[:k]]
return top_k
# 创建RerankerModel
reranker = MindNLPReranker('BAAI/bge-reranker-base')
# 从向量数据库中查询出最相似的3个文档
content = vector.query(question, EmbeddingModel=embedding, k=3)
print('first query', content)
# 从一阶段查询结果中用Reranker再次筛选出最相似的2个文档
rerank_content = reranker.rerank(question, content, k=2)
print('reranked', rerank_content)
# 最后选择最相似的文档, 交给LLM作为可参考上下文
best_content = rerank_content[0]
print(chat.chat(question, [], best_content))
运行代码:
embedding = MindNLPEmbedding("BAAI/bge-base-zh-v1.5")
vector = VectorStore(text)
vector.get_vector(EmbeddingModel=embedding)
vector.persists(path='storage') # 将向量和文档内容保存到storage目录下,下次再用就可以直接加载本地的数据库
vector.load_vector(EmbeddingModel=embedding, path='./storage') # 加载本地的数据库
# 创建RerankerModel
reranker = MindNLPReranker('BAAI/bge-reranker-base')
# 从向量数据库中查询出最相似的3个文档
content = vector.query(question, EmbeddingModel=embedding, k=3)
print('first query', content)
# 从一阶段查询结果中用Reranker再次筛选出最相似的2个文档
rerank_content = reranker.rerank(question, content, k=2)
print('reranked', rerank_content)
# 最后选择最相似的文档, 交给LLM作为可参考上下文
best_content = rerank_content[0]
print(chat.chat(question, [], best_content))