来自工业界的知识库 RAG 服务(二),RagFlow 源码全流程深度解析

背景介绍

前面介绍过 有道 QAnything 源码解析,通过深入了解工业界的知识库 RAG 服务,得到了不少调优 RAG 服务的新想法。

因此本次趁热打铁,额外花费一点时间,深入研究了另一个火热的开源 RAG 服务 RagFlow 的完整实现流程,希望同样有所收获。

项目概述

框架设计

首先依旧可以先从框架图入手,与 常规的 RAG 架构 进行一些比较
请添加图片描述

可以看到右侧知识库被明显放大,同时最右侧详细介绍了文件解析的各种手段,比如 OCRDocument Layout Analyze 等,这些在常规的 RAG 中可能会作为一个不起眼的 Unstructured Loader 包含进去,可以猜到 RagFlow 的一个核心能力在于文件的解析环节。

在 官方文档 中也反复强调 Quality in, quality out, 反映出 RAGFlow 的独到之处在于细粒度文档解析。

另外 介绍文章 中提到其没有使用任何 RAG 中间件,而是完全重新研发了一套智能文档理解系统,并以此为依托构建 RAG 任务编排体系,也可以理解文档的解析是其 RagFlow 的核心亮点。

源码结构

首先可以看到 RagFlow 的源码结构:
请添加图片描述
对应模块的功能如下:

  • api 为后端的 API
  • web 对应的是前端页面
  • conf 为配置信息
  • deepdoc 对应的就是文件解析模块

从代码结构就能看出文件解析 deepdoc 在 RAGFlow 中一等公民角色

另外相关的技术栈如下:

  • Web 服务是基于 Flask 实现,这个在 2024 年来看稍微有一点点过时了
  • 业务数据库使用的是 MySQL
  • 向量数据库使用的是 ElasticSearch ,奇怪的是公司有自己的向量数据库 infinity 竟然默认没有用上
  • 文件存储使用的是 MinIO

正如前面介绍的因为没有使用 RAG 中间件,比如 langchainllamaIndex,因此实现上与常规的 RAG 系统会存在一些差异

源码解析

文件加载的支持

常规的 RAG 服务都是在上传时进行文件的加载和解析,但是 RAGFlow 的上传仅仅包含上传至 MinIO,需要手工点击触发文件的解析。
请添加图片描述
根据实际体验,以及网络上的反馈了解到 RAGFlow 的解析相当慢,估计资源开销也比较大,因此也能理解为什么采取二次手工确认的产品方案了。

实际的文件解析通过接口 /v1/document/run 进行触发的,实际的处理是在 api/db/services/task_service.py 中的 queue_tasks() 中完成的,此方法会根据文件创建一个或多个异步任务,方便异步执行。实现如下所示:

def queue_tasks(doc, bucket, name):
    def new_task():
        nonlocal doc
        return {
            "id": get_uuid(),
            "doc_id": doc["id"]
        }
    tsks = []
    # pdf 文件的解析,根据不同的类型设置单个任务最多处理的页数

    # 默认单个任务处理 12 页 pdf,pager 类型的 pdf 一个任务处理 22 页,其他 pdf 不分页

    if doc["type"] == FileType.PDF.value:
        file_bin = MINIO.get(bucket, name)
        do_layout = doc["parser_config"].get("layout_recognize", True)
        pages = PdfParser.total_page_number(doc["name"], file_bin)
        page_size = doc["parser_config"].get("task_page_size", 12)
        if doc["parser_id"] == "paper":
            page_size = doc["parser_config"].get("task_page_size", 22)
        if doc["parser_id"] == "one":
            page_size = 1000000000
        if not do_layout:
            page_size = 1000000000
        page_ranges = doc["parser_config"].get("pages")
        if not page_ranges:
            page_ranges = [(1, 100000)]
        for s, e in page_ranges:
            s -= 1
            s = max(0, s)
            e = min(e - 1, pages)
            for p in range(s, e, page_size):
                task = new_task()
                task["from_page"] = p
                task["to_page"] = min(p + page_size, e)
                tsks.append(task)

    # 表格数据单个任务处理 3000 行

    elif doc["parser_id"] == "table":
        file_bin = MINIO.get(bucket, name)
        rn = RAGFlowExcelParser.row_number(
            doc["name"], file_bin)
        for i in range(0, rn, 3000):
            task = new_task()
            task["from_page"] = i
            task["to_page"] = min(i + 3000, rn)
            tsks.append(task)
    else:
        tsks.append(new_task())

    bulk_insert_into_db(Task, tsks, True)
    DocumentService.begin2parse(doc["id"])

    # 任务插入 Redis 消息队列,方便异步处理

    for t in tsks:
        assert REDIS_CONN.queue_product(SVR_QUEUE_NAME, message=t), "Can't access Redis. Please check the Redis' status."

从上面的实现来看,文件的解析是根据内容拆分为多个任务,通过 Redis 消息队列进行暂存,之后就可以离线异步处理。

直接查看对应的消息队列的消费模块,对应在 rag/svr/task_executor.py 中的 main() 方法中。实现简化后如下所示:

def main():
    # 获取任务

    rows = collect()

    for _, r in rows.iterrows():
        embd_mdl = LLMBundle(r["tenant_id"], LLMType.EMBEDDING, llm_name=r["embd_id"], lang=r["language"])
        # 执行文件解析

        cks = build(r)
        # 执行向量化

        tk_count = embedding(cks, embd_mdl, r["parser_config"], callback)

        init_kb(r)

        # 写入 ES

        es_r = ELASTICSEARCH.bulk(cks, search.index_name(r["tenant_id"]))

完整的处理流程如下所示:

  1. 调用 collect() 方法从消息队列中获取任务
  2. 接下来每个任务会依次调用 build() 进行文件的解析
  3. 调用 embedding() 方法进行向量化
  4. 最后调用 ELASTICSEARCH.bulk() 写入 ElasticSearch,从这里就可以看到向量库的技术选型

接下来主要关注 build() 方法深入 RAGFlow 核心的文件解析,具体的实现简化后如下所示:

def build(row):
    # 根据类型选择合适的解析器

    chunker = FACTORY[row["parser_id"].lower()]
    # 执行文档的解析和切片

    cks = chunker.chunk(
        row["name"],
        binary=binary,
        from_page=row["from_page"],
        to_page=row["to_page"],
        lang=row["language"],
        callback=callback,
        kb_id=row["kb_id"],
        parser_config=row["parser_config"],
        tenant_id=row["tenant_id"],
    )

实际是根据 parser_id 去选择合适的解析器组,注意这个应该是从业务层得到的一个类型,每个解析器组中都包含了 pdf, word 等支持格式的文件解析,可以理解为一个使用场景的属性。这个会不会导致后续使用场景较多情况下,出现 N (场景) * N (文件格式) 的组合情况可能值得考虑,后续可能会进行优化。

以默认的 naive 类型为例深入对应的 chunk() 实现,其对应的实现在 rag/app/naive.py 中。此方法中包含了目前主持的 docx, pdf, xlsx, md 等格式的解析,我们以 pdf 为例深入查看对应的实现。

可以看到解析器是继承自 deepdoc/parser/pdf_parser.py 中的 RAGFlowPdfParser 实现,终于进入深度文档解析环节了。

pdf 文件的打开是基于 PyPDF2 实现,并基于 pdfplumber 实现表格数据的提取,这个库相对 PyMuPDF 速度更慢,但是可以处理得更精细。

另外使用的 OCR 模型为 /InfiniFlow/deepdoc,在解析中额外加载了一个 XGB 模型 InfiniFlow/text_concat_xgb_v1.0 用于内容提取。

实际的解析过程写的也很复杂,无怪乎处理速度有点慢。不过预期处理效果相对其他 RAG 项目也会好一些。从实际前端的展示的 Demo 来看,RAGFlow 可以将解析后的文本块与原始文档中的原始位置关联起来,这个效果还是比较惊艳的,目前看起来只有 RagFlow 实现了类似的效果。

请添加图片描述

文件的预处理策略

在 RAGFlow 中的文件中包含了不少了数据的清理操作,比如在 deepdoc/vision/layout_recognizer.py 中的就包含着文档中无用内容的判断,示例如下:

def __is_garbage(b):
    patt = [r"^•+$", r"(版权归©|免责条款|地址[::])", r"\.{3,}", "^[0-9]{1,2} / ?[0-9]{1,2}$",
            r"^[0-9]{1,2} of [0-9]{1,2}$", "^http://[^ ]{12,}",
            "(资料|数据)来源[::]", "[0-9a-z._-]+@[a-z0-9-]+\\.[a-z]{2,3}",
            "\\(cid *: *[0-9]+ *\\)"
            ]
    return any([re.search(p, b["text"]) for p in patt])

文档中版权内容,参考来源信息等内容会被清理。但是这样处理比较分散,而且不同的流程中也充斥着大量的特殊处理,导致从源码很难拆分出明确的预处理逻辑。

从常规的流程来看,RAGFlow 将提取的内容分为普通的文本内容 + 表格,分别对这两部分内容进行 tokenize, 方便进行检索。

文件检索的支持 (包含混合检索)

文件检索的支持可以查看实际的对话处理流程,对话的 API 为 /v1/conversation/completion,实际对话的处理是在 api/db/services/dialog_service.py 中的 chat() 方法中完成

深入跟踪对话处理流程,可以看到文件的检索是在 rag/nlp/search.py 中的 search() 方法中完成。

RAGFlow 的检索目前实现的是混合检索,实现的是文本检索 + 向量检索,混合检索完全依赖 ElasticSearch 实现,具体的实现如下所示:

def search(self, req, idxnm, emb_mdl=None):
    qst = req.get("question", "")
    bqry, keywords = self.qryr.question(qst)
    bqry = add_filters(bqry)
    bqry.boost = 0.05

    # 构造 ElasticSearch 文本查询的请求

    s = Search()
    pg = int(req.get("page", 1)) - 1
    ps = int(req.get("size", 1000))
    topk = int(req.get("topk", 1024))
    src = req.get("fields", ["docnm_kwd", "content_ltks", "kb_id", "img_id", "title_tks", "important_kwd",
                                "image_id", "doc_id", "q_512_vec", "q_768_vec", "position_int",
                                "q_1024_vec", "q_1536_vec", "available_int", "content_with_weight"])

    s = s.query(bqry)[pg * ps:(pg + 1) * ps]
    s = s.highlight("content_ltks")
    s = s.highlight("title_ltks")
    if not qst:
        if not req.get("sort"):
            s = s.sort(
                {"create_time": {"order": "desc", "unmapped_type": "date"}},
                {"create_timestamp_flt": {
                    "order": "desc", "unmapped_type": "float"}}
            )
        else:
            s = s.sort(
                {"page_num_int": {"order": "asc", "unmapped_type": "float",
                                    "mode": "avg", "numeric_type": "double"}},
                {"top_int": {"order": "asc", "unmapped_type": "float",
                                "mode": "avg", "numeric_type": "double"}},
                {"create_time": {"order": "desc", "unmapped_type": "date"}},
                {"create_timestamp_flt": {
                    "order": "desc", "unmapped_type": "float"}}
            )

    if qst:
        s = s.highlight_options(
            fragment_size=120,
            number_of_fragments=5,
            boundary_scanner_locale="zh-CN",
            boundary_scanner="SENTENCE",
            boundary_chars=",./;:\\!(),。?:!……()——、"
        )
    s = s.to_dict()
    # 补充向量查询的信息

    q_vec = []
    if req.get("vector"):
        assert emb_mdl, "No embedding model selected"
        s["knn"] = self._vector(
            qst, emb_mdl, req.get(
                "similarity", 0.1), topk)
        s["knn"]["filter"] = bqry.to_dict()
        if "highlight" in s:
            del s["highlight"]
        q_vec = s["knn"]["query_vector"]

    # 将构造的完整查询提交给 ElasticSearch 进行查询

    res = self.es.search(deepcopy(s), idxnm=idxnm, timeout="600s", src=src)

    kwds = set([])
    for k in keywords:
        kwds.add(k)
        for kk in rag_tokenizer.fine_grained_tokenize(k).split(" "):
            if len(kk) < 2:
                continue
            if kk in kwds:
                continue
            kwds.add(kk)

    aggs = self.getAggregation(res, "docnm_kwd")

    return self.SearchResult(
        total=self.es.getTotal(res),
        ids=self.es.getDocIds(res),
        query_vector=q_vec,
        aggregation=aggs,
        highlight=self.getHighlight(res),
        field=self.getFields(res, src),
        keywords=list(kwds)
    )

可以看到 RAGFlow 将混合检索需求转换为复杂的查询条件,利用 elasticsearch-dsl 进行复杂查询的构造,之后直接提交给 ElasticSearch 即可。

检索结果的重排

文件的重排是在 rag/nlp/search.py 中的 rerank() 中完成的,重排是基于文本匹配得分 + 向量匹配得分混合进行排序,默认文本匹配的权重为 0.3, 向量匹配的权重为 0.7,对应的实现如下所示:

# tkweight 为文本匹配权重,vtweight 为向量匹配权重

def rerank(self, sres, query, tkweight=0.3,
            vtweight=0.7, cfield="content_ltks"):
    # 获取文本关键词

    _, keywords = self.qryr.question(query)

    # 获取文本向量

    ins_embd = [
        Dealer.trans2floats(
            sres.field[i].get("q_%d_vec" % len(sres.query_vector), "\t".join(["0"] * len(sres.query_vector)))) for i in sres.ids]
    if not ins_embd:
        return [], [], []

    for i in sres.ids:
        if isinstance(sres.field[i].get("important_kwd", []), str):
            sres.field[i]["important_kwd"] = [sres.field[i]["important_kwd"]]
    ins_tw = []
    for i in sres.ids:
        content_ltks = sres.field[i][cfield].split(" ")
        title_tks = [t for t in sres.field[i].get("title_tks", "").split(" ") if t]
        important_kwd = sres.field[i].get("important_kwd", [])
        tks = content_ltks + title_tks + important_kwd
        ins_tw.append(tks)

    # 获取整体相似分,文本相似分,向量相似分

    sim, tksim, vtsim = self.qryr.hybrid_similarity(sres.query_vector,
                                                    ins_embd,
                                                    keywords,
                                                    ins_tw, tkweight, vtweight)
    return sim, tksim, vtsim

获取混合相似分之后,基于混合的相似分进行过滤和重排,默认混合得分低于 0.2 的会被过滤掉

大模型的处理

在进行上面的检索和重排阶段中,只是进行了必要的过滤,没有限制匹配文档的数量。

实际内容可能会超过大模型的输入 token 数量,因此在调用大模型前会调用 api/db/services/dialog_service.py 文件中 message_fit_in() 根据大模型可用的 token 数量进行过滤。这部分与有道的 QAnything 的实现大同小异,就不额外展开了。

将检索的内容,历史聊天记录以及问题构造为 prompt,即可作为大模型的输入了,默认的英文 prompt 如下所示:

"""
You are an intelligent assistant. Please summarize the content of the knowledge base to answer the question. Please list the data in the knowledge base and answer in detail. When all knowledge base content is irrelevant to the question, your answer must include the sentence "The answer you are looking for is not found in the knowledge base!" Answers need to consider chat history.
      Here is the knowledge base:
      {knowledge}
      The above is the knowledge base.
"""

对应的中文 prompt 如下所示:

"""
你是一个智能助手,请总结知识库的内容来回答问题,请列举知识库中的数据详细回答。当所有知识库内容都与问题无关时,你的回答必须包括“知识库中未找到您要的答案!”这句话。回答需要考虑聊天历史。
    以下是知识库:
    {knowledge}
    以上是知识库
"""

总结

通过上面的介绍,可以对开源的 RagFlow 有了一个大致的了解,与前面的 有道 QAnything 整体流程还是比较类似的。同样支持混合检索,文本检索的方案都是基于 ElasticSearch 实现,在检索后都实现了 Rerank 流程,并在进入大模型之前基于可用最大 token 进行动态过滤。

不过对比来看,来自互联网大厂有道的 QAnything 的代码质量高很多,实现功能封装与命名都相当简洁易懂,工程的鲁棒性也明显高出一大截。如果想要通过源码了解 RAG 服务推荐优先阅读 QAnything。

当然 RagFlow 也有一些独到之处,对于文件的细粒度处理带来更高质量的参考信息,从而更好的提升参考信息,如果对这部分感兴趣可以深入了解下 RagFlow 的相关代码实现细节。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/656814.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

上交提出TrustGAIN,提出6G网络中可信AIGC新模式!

月16日至18日&#xff0c;2024全球6G技术大会在南京召开。会上&#xff0c;全球移动通信标准制定组织3GPP&#xff08;第三代合作伙伴计划&#xff09;的3位联席主席分享了3GPP6G标准时间表&#xff1a; 2024年9月&#xff0c;启动6G业务需求研究&#xff1b; 2025年6月&…

FastReport 主子表关系

代码中只需要绑定主表的数据就可以&#xff0c;子表的数据会通过报表中的关连关系自动到数据库中带出。 using CloudSaaS.DB.Handler; using CloudSaaS.Model; using CloudSaaS.DAL; using FastReport; using FastReport.Web; using System; using System.Collections.Generic;…

Hotcoin Research | 市场洞察:2024年5月13日-5月19日

加密货币市场表现 目前&#xff0c;加密货币总市值为1.32万亿&#xff0c;BTC占比54.41%。 本周行情呈现震荡上行的态势&#xff0c;BTC在5月15日-16日&#xff0c;有一波大的拉升&#xff0c;周末为震荡行情。BTC现价为67125美元。 上涨的主要原因&#xff1a;美国4月CPI为3…

Oracle创建用户时提示ORA-65096:公用用户名或角色名无效

Oracle创建用户时提示“ORA-65096&#xff1a;公用用户名或角色名无效” 如下图所示&#xff1a; 解决方法&#xff1a;在新增用户名前面加上C##或者c##就可以解决无效问题&#xff0c;具体什么原因还不清楚&#xff0c;需要再研究一下。

JS 中怎么删除数组元素?有哪几种方法?

正文开始之前推荐一位宝藏博主免费分享的学习教程,学起来! 编号学习链接1Cesium: 保姆级教程+源码示例2openlayers: 保姆级教程+源码示例3Leaflet: 保姆级教程+源码示例4MapboxGL: 保姆级教程+源码示例splice() JavaScript中的splice()方法是一个内置的数组对象函数, 用于…

vr数字成果展在线展示突破用户传统认知

想要轻松搭建一个充满互动与创意的3D数字展厅吗?vr互动数字展厅搭建编辑器将是您的不二之选!华锐视点3D云展平台提供的vr互动数字展厅搭建编辑器将空间重建与互动制作完美结合&#xff0c;让您轻松实现3D空间的搭建与互动营销制作。 在vr互动数字展厅搭建编辑器的帮助下&#…

SpringBoot 返回值 i18n 自动处理

定义基础通用类 首先定义一波错误码&#xff1a;ResultCode Getter AllArgsConstructor public enum ResultCode {SUCCESS(200, "请求成功", "request.success"),Fail(400, "请求失败", "request.failed"),PASSWORD_NOT_MATCH(1000…

独家揭秘!Amazon、lazada、Shopee测评自养号,新手也能秒变高手!

近年来&#xff0c;随着国内卖家涌入跨境电商平台&#xff0c;市场竞争愈加激烈。为了迅速占领市场&#xff0c;测评变得至关重要。然而&#xff0c;真人测评供不应求&#xff0c;服务商账号质量不一&#xff0c;且存在高权重账号稀缺和黑卡下单风险。因此&#xff0c;许多大卖…

为什么选择CleanMyMac软件呢?推荐理由

你是否曾经遇到过这样的问题&#xff1a;电脑运行缓慢&#xff0c;存储空间不足&#xff0c;不知道如何清理垃圾文件&#xff1f;别担心&#xff0c;我们为你找到了解决方案——CleanMyMac软件。这款强大的工具可以帮助你轻松解决这些问题&#xff0c;让你的电脑焕然一新&#…

VirtualBox+Ubuntu22.10+Docker+ROS2

Docker 拉取ros2镜像 docker pull osrf/ros:foxy-desktop 运行 docker run -it --nameros2 -p 50022:22 osrf/ros:foxy-desktop 进入容器安装组件 apt-get update apt-get install vim apt-get install git apt-get install net-tools # 安装ssh apt-get install openssh…

【FPGA】正原子XC7A35T

25_实战篇&#xff1a;时钟IP核MMCM&#xff08;第一讲&#xff1a;时钟资源讲解&#xff09;_哔哩哔哩_bilibili 25时钟IP核MMCM 7系列的时钟资源 bufferG bufferR 下图可视为一个FPGA&#xff08;官方手册&#xff09; 4 MRCC,SRCC 全局时钟&#xff1a;MRCC P 差分时…

Java入门-“第九大数据类型“-字符串

字符串String **字符串(String)**是指多个字符连接起来组合成的字符序列&#xff0c;例如”中国”&#xff0c;“hello world”都为字符串。注意对比字符&#xff0c;字符只能存储一个字符使用单引号’中’&#xff0c;’国’。 字符串底层源码 字符串定义 创建String对象 St…

2024年5月软考成绩什么时候出?附查询方式

2024年5月软考成绩查询时间及查询方式&#xff1a; 查询时间&#xff1a;预计在2024年7月上旬进行。 查询方式&#xff1a; 方式一&#xff1a;登陆中国计算机技术职业资格网&#xff08;www.ruankao.org.cn&#xff09;&#xff0c;点击报名系统&#xff0c;输入注册账号和…

nodejs中使用ffmpeg零基础教程(electron+vue3)

同学们可以私信我加入学习群&#xff01; 正文开始 前言一、多方案对比二、ffmpeg各插件简介三、使用ffmpeg-static插件四、使用fluent-ffmpeg插件五、如果使用ai&#xff0c;可能会踩的坑5.1第一个坑5.2第二个坑5.3第三个坑 总结 前言 最近想要把自己写的一些知识点&#xff…

【NumPy】全面解析NumPy的astype函数:高效数据类型转换指南

&#x1f9d1; 博主简介&#xff1a;阿里巴巴嵌入式技术专家&#xff0c;深耕嵌入式人工智能领域&#xff0c;具备多年的嵌入式硬件产品研发管理经验。 &#x1f4d2; 博客介绍&#xff1a;分享嵌入式开发领域的相关知识、经验、思考和感悟&#xff0c;欢迎关注。提供嵌入式方向…

VUE-watch和watchEffect的区别

区别简短扼要地说&#xff1a; watch-官方定义&#xff1a;侦听一个或多个响应式数据源&#xff0c;并在数据源变化时调用所给的回调函数。是需要指定监听的数据&#xff0c;并且只有在响应式数据变化的时候去执行 watchEffect-官方定义&#xff1a;立即运行一个函数&#xff0…

mybatis关联查询使用resultMap查询到了多条,结果返回一条。

今天在写代码时候&#xff0c;遇到了一个很让我费解的问题&#xff0c;在使用关联查询的时候&#xff0c;在明明数据库里面&#xff0c;已经查到了两条数据&#xff0c;结果resultMap这个集合里面&#xff0c;就只返回一条数据。 数据库的SQL&#xff1a; mybatis的xml里面的r…

公告:关于博主的重要通知

大家好&#xff0c;我是博主夏目。 本期不分享知识&#xff0c;博主想说明一下博主的一些重要提示。 分享的内容&#xff0c;从不收费&#xff0c;也未向任何人进行收费。 意在分享知识&#xff0c;传播文化&#xff0c;结交更多志同道合的朋友。 截至目前&#xff0c;从未…

多系统集成的项目周期为何普遍较长?

在现代企业的运营中&#xff0c;各种信息系统的集成已成为提升效率和竞争力的关键。然而&#xff0c;当工厂的ERP系统需要与MES、SRM、WMS、CRM等其他系统集成时&#xff0c;项目周期往往长达一年以上&#xff0c;这不仅耗费时间、人力和财力&#xff0c;还可能影响企业的正常运…

【GD32F303红枫派使用手册】第一节 RCU-时钟配置及输出实验

1.1 实验内容 通过本实验主要学习以下内容&#xff1a; RCU时钟原理及配置&#xff1b; RCU时钟输出验证。 1.2 实验原理 1.2.1 RCU时钟树原理 GD32F303系列MCU的时钟树如下图所示&#xff0c;由该图可知&#xff0c;GD32F303系列MCU的时钟树可大致分为三个部分&#xff…