Dify中的经济索引模式实现过程

当索引模式为经济时,使用离线的向量引擎、关键词索引等方式,降低了准确度但无需花费 Token。

一.提取函数**_extract**

根据不同文档类型进行内容的提取:

def _extract(self, index_processor: BaseIndexProcessor, dataset_document: DatasetDocument, process_rule: dict) \
        -> list[Document]:  # 提取
    # load file
    if dataset_document.data_source_type not in ["upload_file", "notion_import"]:  # 数据源类型
        return []

    data_source_info = dataset_document.data_source_info_dict  # 数据源信息
    text_docs = []  # 文本文档
    if dataset_document.data_source_type == 'upload_file':
        if not data_source_info or 'upload_file_id' not in data_source_info:
            raise ValueError("no upload file found")

        file_detail = db.session.query(UploadFile). \
            filter(UploadFile.id == data_source_info['upload_file_id']). \
            one_or_none()

        if file_detail:
            extract_setting = ExtractSetting(
                datasource_type="upload_file",
                upload_file=file_detail,
                document_model=dataset_document.doc_form
            )
            text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule['mode'])
    elif dataset_document.data_source_type == 'notion_import':
        if (not data_source_info or 'notion_workspace_id' not in data_source_info
                or 'notion_page_id' not in data_source_info):
            raise ValueError("no notion import info found")
        extract_setting = ExtractSetting(
            datasource_type="notion_import",
            notion_info={
                "notion_workspace_id": data_source_info['notion_workspace_id'],
                "notion_obj_id": data_source_info['notion_page_id'],
                "notion_page_type": data_source_info['type'],
                "document": dataset_document,
                "tenant_id": dataset_document.tenant_id
            },
            document_model=dataset_document.doc_form
        )
        text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule['mode'])
    # update document status to splitting
    self._update_document_index_status(
        document_id=dataset_document.id,
        after_indexing_status="splitting",
        extra_update_params={
            DatasetDocument.word_count: sum([len(text_doc.page_content) for text_doc in text_docs]),
            DatasetDocument.parsing_completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
        }
    )  # 更新文档状态为拆分

    # replace doc id to document model id
    text_docs = cast(list[Document], text_docs)
    for text_doc in text_docs:
        text_doc.metadata['document_id'] = dataset_document.id
        text_doc.metadata['dataset_id'] = dataset_document.dataset_id

    return text_docs

直接调用的是core.indexing_runner.IndexingRunner._extract()方法:

得到上传文件的详细信息:

接下来调用提取内容函数extract()方法:

根据不同的IndexType类型返回不同的索引处理器:

因为这里index_processor类型为core.rag.index_processor.processor.paragraph_index_processor.ParagraphIndexProcessor,实际调用的是ParagraphIndexProcessor类中的extract函数:

ExtractProcessor.extract()类方法根据文件类型进行解析:

因为文件类型为txt,所以执行TextExtractor()

TextExtractor()实际执行的位置为dify\api\core\rag\extractor\text_extractor.py中的extract()方法:

最终得到text_docs内容:

二.转换函数_transform

def _transform(self, index_processor: BaseIndexProcessor, dataset: Dataset,
               text_docs: list[Document], doc_language: str, process_rule: dict) -> list[Document]:  # 转换
    # get embedding model instance
    embedding_model_instance = None
    if dataset.indexing_technique == 'high_quality':
        if dataset.embedding_model_provider:
            embedding_model_instance = self.model_manager.get_model_instance(
                tenant_id=dataset.tenant_id,
                provider=dataset.embedding_model_provider,
                model_type=ModelType.TEXT_EMBEDDING,
                model=dataset.embedding_model
            )  # 获取嵌入模型实例
        else:
            embedding_model_instance = self.model_manager.get_default_model_instance(
                tenant_id=dataset.tenant_id,
                model_type=ModelType.TEXT_EMBEDDING,
            )  # 获取默认嵌入模型实例

    documents = index_processor.transform(text_docs, embedding_model_instance=embedding_model_instance,
                                          process_rule=process_rule, tenant_id=dataset.tenant_id,
                                          doc_language=doc_language)  # 转换文档

    return documents

实际调用的是core.indexing_runner.IndexingRunner._transform

因为设置dataset.indexing_technique'economy'

因为这里index_processor类型为core.rag.index_processor.processor.paragraph_index_processor.ParagraphIndexProcessor,实际调用的是ParagraphIndexProcessor类中的transform函数:

这里splitter类型为core.splitter.fixed_text_splitter.FixedRecursiveCharacterTextSplitter

core.splitter.fixed_text_splitter.FixedRecursiveCharacterTextSplitter类:

调用文档清理CleanProcessor.clean()类方法:

实际调用的是dify\api\core\rag\cleaner\clean_processor.py中的CleanProcessor.clean()类方法:

clean方法的处理过程如下:

(1)默认清理:首先,方法会执行一些默认的清理操作,包括删除无效的符号。这些操作主要是通过正则表达式来实现的,例如,替换<\|<,替换\|>>,以及删除一些特定的ASCII和Unicode字符。

(2)规则应用:接下来方法会根据传入的process_rule字典中定义的规则来进一步清理文本。process_rule字典包含了一系列的清理规则,这些规则在rules键下的pre_processing_rules列表中定义。

(3)删除额外的空格:如果启用了remove_extra_spaces规则,方法会删除文本中的额外空格。这包括将三个或更多连续的换行符替换为两个换行符,以及将两个或更多连续的空格(包括特定的Unicode空格字符)替换为单个空格。

(4)删除URL和电子邮件地址:如果启用了remove_urls_emails规则,方法会从文本中删除URL和电子邮件地址。这是通过匹配特定的正则表达式模式来实现的,分别用于识别和删除电子邮件地址和URL。

通过splitter.split_documents([document])分割文档为文档节点:

实际调用的是dify\api\core\splitter\fixed_text_splitter.py中的FixedRecursiveCharacterTextSplitter类的split_text()方法。split_text方法的目的是将传入的文本分割成多个块,并返回这些块组成的列表。这个方法的处理过程可以分为以下几个步骤:

(1)检查固定分隔符:首先,方法检查是否存在一个固定的分隔符(_fixed_separator)。如果存在,它将使用这个分隔符来直接分割文本。这意味着文本将在每个出现固定分隔符的地方被分割。

(2)分割文本:使用固定分隔符分割文本后,会得到一个初步的块列表。然后,对这些初步的块进行进一步的处理,以确保每个块的长度不超过设定的大小(_chunk_size)。

(3)递归分割:对于每个初步的块,如果其长度超过了设定的大小,将使用recursive_split_text方法递归地进行进一步分割。这个递归过程会继续,直到所有的块都不超过设定的大小。

(4)返回最终块列表:最后,将所有处理后符合长度要求的块组成一个列表返回。

通过固定分隔符'\n'分割:

document_nodes = splitter.split_documents([document])实际返回文档节点数量为12:

dify\api\core\rag\index_processor\processor\paragraph_index_processor.py中,transform()方法主要是对文档进行预处理和分割,以便后续的索引或其它处理步骤可以更有效地处理文档的各个部分。主要执行步骤如下:

(1)获取分割器:根据传入的处理规则(process_rule)和嵌入模型实例(embedding_model_instance),获取文档分割器(splitter)。

(2)遍历文档:对于每个传入的文档(documents列表中的每个Document对象),执行以下子步骤:

  • 文档清理:使用CleanProcessor.clean方法清理文档的内容(document.page_content),移除不需要的字符或格式。

  • 文档分割:使用步骤1中获取的分割器(splitter)将清理后的文档内容分割成多个节点(document_nodes),每个节点代表文档的一部分。

  • 节点处理:对于每个分割后的节点,如果节点内容非空,则生成一个唯一的文档ID(doc_id)和文本哈希值(hash),并将这些信息添加到节点的元数据中。如果节点内容以特定字符(如.)开头,则移除这些字符。

  • 收集文档节点:将处理后的节点添加到一个列表中(split_documents),以便进一步处理。

(3)返回结果:将所有处理后的文档节点合并到一个列表中(all_documents),并返回该列表作为transform方法的结果。

三.保存函数_load_segments

接下调用self._load_segments(dataset, dataset_document, documents)方法:

实际调用的是_load_segments(self, dataset, dataset_document, documents)方法:

_load_segments 方法的主要目的是将处理后的文档段(documents)保存到数据库中,并更新相关文档的状态。这个过程可以分为以下几个步骤:

(1)初始化数据集文档存储:创建一个 DatasetDocumentStore 实例,这个实例与特定的数据集、创建者和文档ID相关联。

(2)添加文档段:使用 DatasetDocumentStore.add_documents(documents) 方法将处理后的文档段添加到数据库中。这里的 documents 是一个包含多个文档段的列表,每个文档段都是一个 Document 实例。具体doc_store.add_documents()self.get_document_segment()方法就不逐行调试,感兴趣可自行调试。

(3)更新文档状态为索引中:在所有文档段都保存到数据库之后,更新原始文档的状态为“索引中”(indexing)。这是通过调用 _update_document_index_status 方法实现的,该方法还会更新文档的清理完成时间和拆分完成时间。self._update_document_index_status()方法就不逐行调试,感兴趣可自行调试。

(4)更新段状态为索引中:最后,更新所有相关文档段的状态为"索引中"。这是通过调用 _update_segments_by_document 方法实现的,该方法会更新所有相关文档段的状态和索引时间。self._update_segments_by_document()方法就不逐行调试,感兴趣可自行调试。

这个过程确保了文档段的正确保存和状态更新,为后续的索引和检索操作做好准备。

四.加载函数_load

接下来调用self._load()方法:

实际调用的是def _load(self, index_processor: BaseIndexProcessor, dataset: Dataset, dataset_document: DatasetDocument, documents: list[Document]) -> None:_load 方法的主要目的是将处理后的文档数据加载到索引中,并更新相关文档和文档段的状态为完成,以支持后续的搜索和检索操作。这个过程可以分为以下几个步骤:

(1)检查索引技术:根据数据集的索引技术(例如,economy),如果是high_quality,那么需要获取嵌入模型实例来进行高质量的索引处理。

(2)分块处理:如果使用嵌入模型实例,将文档数据分块处理。这通常涉及到并发执行,以提高处理效率。

(3)创建关键字索引:在一个独立的线程中,对文档数据进行关键字索引的创建,以便于后续的搜索和检索。

(4)更新文档状态:在所有文档数据被成功加载到索引后,更新相关文档的状态为completed,表示索引过程已完成。

(5)异常处理:在处理过程中,如果遇到任何异常(如文档暂停、提供者令牌未初始化错误等),将会更新文档的索引状态为error,并记录错误信息。

重点解释的是创建关键字线程这部分:

# create keyword index  # 创建关键字索引
create_keyword_thread = threading.Thread(target=self._process_keyword_index,
                                         args=(current_app._get_current_object(),
                                               dataset.id, dataset_document.id, documents))
create_keyword_thread.start() # 启动线程
create_keyword_thread.join()  # 等待线程结束

create_keyword_thread是一个 Thread 对象,之前通过 threading.Thread 创建并启动。它代表了一个独立的线程,用于执行某些任务,这里假设是处理关键字索引的任务。

.join(): 这是 Thread 类的一个方法。调用这个方法会使得调用它的线程(如主线程)等待,直到 create_keyword_thread 线程完成执行。如果 create_keyword_thread 已经完成,join() 会立即返回。

create_keyword_thread.join() 的作用是阻塞调用它的线程(通常是主线程),直到 create_keyword_thread 线程执行完成。这样做的目的是确保 create_keyword_thread 线程中的任务完全执行完毕后,主线程才继续执行后面的代码。

这种做法在需要确保某个线程中的任务完全完成后才进行下一步操作时非常有用,比如在处理完所有数据后才关闭数据库连接,或者在继续执行依赖于线程任务结果的代码之前确保线程任务已完成。

这里面执行的任务是target=self._process_keyword_index。这个方法主要用于在后台线程中处理关键字索引的创建和相关文档段状态的更新,确保这些操作在正确的应用上下文中执行:

def _process_keyword_index(self, flask_app, dataset_id, document_id, documents):  # 处理关键字索引
    with flask_app.app_context():
        dataset = Dataset.query.filter_by(id=dataset_id).first()
        if not dataset:
            raise ValueError("no dataset found")
        keyword = Keyword(dataset)
        keyword.create(documents)
        if dataset.indexing_technique != 'high_quality':
            document_ids = [document.metadata['doc_id'] for document in documents]
            db.session.query(DocumentSegment).filter(
                DocumentSegment.document_id == document_id,
                DocumentSegment.index_node_id.in_(document_ids),
                DocumentSegment.status == "indexing"
            ).update({
                DocumentSegment.status: "completed",
                DocumentSegment.enabled: True,
                DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
            })

            db.session.commit()

_process_keyword_index 方法的目的是在 Flask 应用的上下文中处理关键字索引。这个方法接收四个参数:flask_app(Flask 应用实例),dataset_id(数据集的 ID),document_id(文档的 ID),以及documents(文档对象列表)。方法的执行流程如下:

(1)使用 Flask 应用的上下文:这是必要的步骤,因为在 Flask 应用之外的线程中执行数据库操作或者访问 Flask 应用的配置时,需要手动创建应用上下文。

(2)查询数据集:通过 dataset_id 从数据库中查询对应的数据集对象。如果没有找到对应的数据集,抛出一个值错误异常。

(3)创建关键字索引:使用查询到的数据集对象初始化 Keyword 类的实例,并调用其 create 方法,传入文档对象列表来创建关键字索引。

重点分析keyword.create(documents)的过程,由于这个过程比较长,用另外一篇文档进行详细分析,具体参考文献 [1]。

(4)更新文档段状态:如果数据集的索引技术不是 'high_quality',则获取所有文档对象中的 doc_id,并更新数据库中对应 document_idindex_node_id 的文档段对象的状态为 "completed",同时设置其为启用状态,并记录完成时间。最后,提交数据库会话以保存更改。

参考文献

[1] Dify中Jieba类的create()方法执行过程:https://z0yrmerhgi8.feishu.cn/wiki/RKIewMrY2iaC1wks20FcMqhcnae

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/798687.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

pico+unity预设配置

picosdk中有很多预设的配置、使用预设配置的方法有 1、创建 XR Origin、展开 XR Origin > Camera Offset&#xff0c;选中 LeftHand Controller。点击 XR Controller (Action-Based) 面板右上角的 预设 按钮 2、打开Assets\Samples\XR Interaction Toolkit\2.5.2\Starter A…

《人工智能 从小白到大神》:一本让你彻底掌握AI的书

在当今这个快速发展的时代&#xff0c;人工智能&#xff08;AI&#xff09;已经成为改变世界的关键力量。你是否曾想过&#xff0c;如何从一个对AI一无所知的小白&#xff0c;成长为一名真正的AI大神&#xff1f;今天&#xff0c;我要向大家推荐一本能够帮助你实现这一目标的书…

51单片机11(蜂鸣器硬件设计和软件设计)

一、蜂鸣器硬件设计 1、 2、上面两张图&#xff0c;是针对不同产品的电路图。像左边这一块&#xff0c;是我们的A2&#xff0c;A3&#xff0c;A4的一个产品对应的一个封闭器的硬件电路。而右边的这一块是对应的A5到A7的一个硬件电路。因为A5到A7的一个产品&#xff0c;它的各…

Python和C++全球导航卫星系统和机器人姿态触觉感知二分图算法

&#x1f3af;要点 &#x1f3af;马尔可夫随机场网格推理学习 | &#x1f3af;二维伊辛模型四连网格模型推理 | &#x1f3af;统计物理学模型扰动与最大乘积二值反卷积 | &#x1f3af;受限玻尔兹曼机扰动和最大乘积采样 | &#x1f3af;视觉概率生成模型测试图像 &#x1f3…

关于文档理解相关工作的一些总结

过去四年时间&#xff0c;都在处理结构化数据的存储优化相关的工作。最近一段时间在做RAG相关的工作。非结构数据的存储与检索&#xff0c;接触的也越来越多。这篇文章聊聊最近一段时间关于文档理解方面的一些心得。 文档理解 文档理解旨在从非结构化文档中提取信息并将其转化…

推荐一款uniapp拖动验证码插件

插件地址&#xff1a;易盾验证码 - DCloud 插件市场 具体使用方式访问插件地址自行获取

JVM:SpringBoot TomcatEmbeddedWebappClassLoader

文章目录 一、介绍二、SpringBoot中TomcatEmbeddedWebappClassLoader与LaunchedURLClassLoader的关系 一、介绍 TomcatEmbeddedWebappClassLoader 是 Spring Boot 在其内嵌 Tomcat 容器中使用的一个类加载器&#xff08;ClassLoader&#xff09;。在 Spring Boot 应用中&#…

【学习笔记】无人机(UAV)在3GPP系统中的增强支持(八)-通过无人机进行无线接入

引言 本文是3GPP TR 22.829 V17.1.0技术报告&#xff0c;专注于无人机&#xff08;UAV&#xff09;在3GPP系统中的增强支持。文章提出了多个无人机应用场景&#xff0c;分析了相应的能力要求&#xff0c;并建议了新的服务级别要求和关键性能指标&#xff08;KPIs&#xff09;。…

实现多层感知机

目录 多层感知机&#xff1a; 介绍&#xff1a; 代码实现&#xff1a; 运行结果&#xff1a; 问题答疑&#xff1a; 线性变换与非线性变换 参数含义 为什么清除梯度&#xff1f; 反向传播的作用 为什么更新权重&#xff1f; 多层感知机&#xff1a; 介绍&#xff1a;…

LabVIEW红外热波图像缺陷检

开发使用LabVIEW开发的红外热波图像缺陷检测系统。该系统结合红外热像仪、工业相机和高效的数据采集硬件&#xff0c;实现对工件表面缺陷的自动检测和分析。通过LabVIEW的强大功能&#xff0c;系统能够实时采集、处理和显示红外热波图像&#xff0c;有效提高了检测的精度和效率…

【Playwright+Python】系列 Pytest 插件在Playwright中的使用

一、命令行使用详解 使用 Pytest 插件在Playwright 中来编写端到端的测试。 1、命令行执行测试 pytest --browser webkit --headed 2、使用 pytest.ini 文件配置 内容如下&#xff1a; [pytest] # Run firefox with UIaddopts --headed --browser firefox效果&#xff1…

机器人相关工科专业课程体系

机器人相关工科专业课程体系 前言传统工科专业机械工程自动化/控制工程计算机科学与技术 新兴工科专业智能制造人工智能机器人工程 总结Reference: 前言 机器人工程专业是一个多领域交叉的前沿学科&#xff0c;涉及自然科学、工程技术、社会科学、人文科学等相关学科的理论、方…

STM32MP135裸机编程:定时器内核时钟频率计算方法

0 工具准备 STM32MP13xx参考手册 1 定时器内核时钟频率计算方法 1.1 定时器分组 STM32MP135的定时器按照时钟源不同分成了三组&#xff0c;如下&#xff1a; APB1: APB2: APB6&#xff1a; 1.2 定时器内核时钟频率计算方法 APB1DIV是APB1的分频系数&#xff0c;APB2DIV、…

Flink Window 窗口【更新中】

Flink Window 窗口 在Flink流式计算中&#xff0c;最重要的转换就是窗口转换Window&#xff0c;在DataStream转换图中&#xff0c;可以发现处处都可以对DataStream进行窗口Window计算。 窗口&#xff08;window&#xff09;就是从 Streaming 到 Batch 的一个桥梁。窗口将无界流…

制作显卡版docker并配置TensorTR环境

感谢阅读 相关概念docker准备下载一个自己电脑cuda匹配的docker镜像拉取以及启动镜像安装cudaTensorRT部署教程 相关概念 TensorRT是可以在NVIDIA各种GPU硬件平台下运行的一个模型推理框架&#xff0c;支持C和Python推理。即我们利用Pytorch&#xff0c;Tensorflow或者其它框架…

汽车的驱动力,是驱动汽车行驶的力吗?

一、地面对驱动轮的反作用力&#xff1f; 汽车发动机产生的转矩&#xff0c;经传动系传至驱动轮上。此时作用于驱动轮上的转矩Tt产生一个对地面的圆周力F0&#xff0c;地面对驱动轮的反作用力Ft(方向与F0相反)即是驱动汽车的外力&#xff0c;此外力称为汽车的驱动力。 即汽车…

Codeforces Round 957 (Div. 3)(A~D题)

A. Only Pluses 思路: 优先增加最小的数&#xff0c;它们的乘积会是最优,假如只有两个数a和b&#xff0c;b>a&#xff0c;那么a 1&#xff0c;就增加一份b。如果b 1&#xff0c;只能增加1份a。因为 b > a&#xff0c;所以增加小的数是最优的。 代码: #include<bi…

最新PHP自助商城源码,彩虹商城源码

演示效果图 后台效果图 运行环境&#xff1a; Nginx 1.22.1 Mysql5.7 PHP7.4 直接访问域名即可安装 彩虹自助下单系统二次开发 拥有供货商系统 多余模板删除 保留一套商城,两套发卡 源码无后门隐患 已知存在的BUG修复 彩虹商城源码&#xff1a;下载 密码:chsc 免责声明&…

搞定ES6同步与异步机制、async/await的使用以及Promise的使用!

文章目录 同步和异步async/awaitPromisePromise的概念 同步和异步 ​ 同步&#xff1a;代码按照编写顺序逐行执行&#xff0c;后续的代码必须等待当前正在执行的代码完成之后才能执行&#xff0c;当遇到耗时的操作&#xff08;如网络请求等&#xff09;时&#xff0c;主线程会…

解决fidder小黑怪倒出JMeter文件缺失域名、请求头

解决fidder小黑怪倒出JMeter文件缺失域名、请求头 1、目录结构&#xff1a; 2、代码 coding:utf-8 Software:PyCharm Time:2024/7/10 14:02 Author:Dr.zxyimport zipfile import os import xml.etree.ElementTree as ET import re#定义信息头 headers_to_extract [Host, Conn…