【NLP 32、文本匹配任务 —— 深度学习】

大劫大难以后,人不该失去锐气,不该失去热度,你镇定了却依旧燃烧,你平静了却依旧浩荡,致那个从绝望中走出来的自己,共勉

                                                                                                                             —— 25.1.31

使用深度学习在文本匹配任务上主要有两种方式:① 表示型 ② 交互型

一、实现方式 ① 表示型文本匹配

表示型文本匹配要训练的目标是:得到一个编码器,用来把一句话转化为向量

        实际训练中,通常会共享一个红框内的编码器 / 表示层(可以看作一个完整的模型:输入文本过完embedding嵌入层后过一个网络层,最终输出一句话的向量),在训练时,我们通常输入两句话过同一个模型(参数共享),分别编码这两句话,得到两个向量,经过一个 matching layer 匹配层(相似度计算)得到一个分数用来衡量两向量的相似度,若两句话向量(句子语义)相似,则分数接近为1,若两句话向量不相似(语义不相似),则分数接近为0

        表示层文本匹配主要训练的是文本转换成向量的这一模型部分

        两边的表示层是参数共享的两个模型,分别编码输入的两句话,用来把输入的话转成向量形式


1.matching layer 匹配层 —— 方式 Ⅰ、基于文本相似度

① 计算文本相似度 方式Ⅰ:拼接两个句向量过网络层

        输入两个文本过Embedding层得到两个向量,拼接两向量(拼接两向量与两向量之差 u,v,u-v),然后过一个线性层Softmax归一化,映射到 0 或 1 ,然后用 均方差MSE 交叉熵 损失函数计算 Loss,然后根据梯度进行迭代优化,给它提供足够多的句子对,就可以做训练

        对于两个匹配的样本,预期输出分值为 1

        对于两个不匹配的样本,预期输出分值为 0

        本质上相当于二分类任务(两句话相似或不相似)

        编码器部分可以使用 Bert+Pooling,也可以用 LSTM,也可以用 CNN、GatedCNN、RNN 等其他模型结构,得到两句话对应的向量

        两向量的拼接方式也有多种,在实际场景中最好针对不同场景进行训练实验对比获得答案

        只要编码器 Embedding 部分正确, 两向量无论怎么组合,数据都可以得到有效的训练


② 计算文本相似度 方式Ⅱ:余弦相似度计算

        编码层也可以简单点,不使用向量拼接过网络层,而是直接过一个余弦相似度的计算,利用Cosine Embedding Loss做训练

        对于两个相似的样本匹配预期输出分值为 1

        对于两个不相似的样本匹配预期输出分值为 -1

        本质上相当于2分类任务(两句话相似或是不相似)

Cosine Embedding Loss 

        Cosine Embedding Loss:是一种用于衡量两个向量相似性的损失函数,通过余弦相似度结合标签信息(相似/不相似)来优化模型。

  • 相似样本​(标签 y=1):迫使它们的余弦相似度趋近于 ​1​(即方向完全一致)。
  • 不相似样本​(标签 y=-1):迫使它们的余弦相似度低于某个阈值 ​margin​(默认为 0)。
  • margin:惩罚项/正则项(实际训练中,一般会存储为一个默认值:例如0.1),作用:① 允许不相似样本的余弦相似度在不超过 margin 的情况下被忽略,避免过度优化。② 较大的 margin 迫使模型更严格地区分不相似样本(例如,设 margin=0.5 时,不相似样本间的相似度必须低于 0.5)

公式:

        其中,cos(x_1, x_2) 表示两个向量的余弦相似度

        训练两个文本向量的网络是同一个网络,权重参数共享,又被称为 孪生网络


2.matching layer 匹配层 —— 方式 Ⅱ、Triplet Loss

① 训练目标:

        1.使具有相同标签的样本在 embedding空间 尽量接近

        2.使具有不同标签的样本在 embedding空间 尽量远离

        3.通过调整模型的权重来改变三者在向量空间的位置关系

例: 

设计一个Loss函数,目标:衡量三个向量在空间中位置的相互关系

Anchor:某句话对应的向量在embedding空间中对应的位置

Positive:与这句话语义相同的向量

Negative:与这句话语义不同的向量


② Triplet Loss:

输入是一个三元组<a,p,n>

a:anchor 原点(任意一个样本)

p:positive 与a同一类别的样本

n:negative 与a不同类别的样本

Embedding 空间中,三元组损失函数为:L = max(d(a, p) - d(a, n) + margin, 0)

公式:

训练目标:使 相似文本 a,p 之间的距离 小于 不相似文本 a,n 之间的距离 

d:表示两向量间的一个距离函数 

d(a, p):任意一个样本 a  与其同一类别的样本 p 两个文本转成的向量间的向量距离

d(a, n):任意一个样本 a  与其不同类别的样本 n 两个文本转成的向量间的向量距离

margin:惩罚项/正则项(实际训练中,一般会存储为一个默认值:例如0.1),作用:① 即使两距离相等,还是会产生 margin 大小的Loss,只有当相似文本间的距离 小于 不相似文本间的距离 + margin 时,Loss才会为 0  较大的 margin 迫使模型更严格地区分不相似样本(例如:设 margin=0.5 时,相似样本间距离差 和 不相似样本间距离差必须低于0.5,才会停止迭代优化)

Triplet Loss中,margin的作用

1. ​控制正负样本的最小间隔

        Margin 定义了正样本对(anchor 与 positive)和负样本对(anchor 与 negative)之间的最小距离差。其核心目标是确保在嵌入空间中,正样本与锚点的距离不仅要小于负样本与锚点的距离,还要至少保持一个固定的间隔(即 margin)

  • 公式作用 d(a, p) - d(a, n) + margin < 0 ,损失为 0,此时模型已满足“正样本距离更近,负样本距离更远”的要求。反之,若未满足,则通过损失函数强制优化
  • 物理意义:Margin 类似于支持向量机(SVM)中的间隔概念,通过设定一个“安全区域”,避免不同类别的样本在嵌入空间中过于接近

2. ​防止模型学习到退化解

        如果没有 Margin(或 Margin=0),模型可能将所有样本映射到同一个点(即 d(a, p) = d(a, n) = 0,此时损失恒为 0,但模型完全失去区分能力。Margin 的引入迫使模型必须拉开正负样本的距离差异,从而学习到更有判别性的特征表示


3. ​平衡训练难度

Margin 的大小直接影响训练的难易程度:

        过大:模型需要更大的距离差才能满足条件,可能导致训练困难(损失长期不收敛)或过拟合

        过小:模型容易满足条件,但学到的特征区分度不足实践中

Margin 的大小通常通过实验调整,例如在人脸识别任务中常用 0.2~1.0 的范围


4. ​动态筛选有效三元组

        Margin 帮助模型自动忽略“简单样本”(如负样本已足够远离锚点),而专注于优化“困难样本”(如负样本距离较近或与正样本距离差异不足)

例如:

  • 当 d(a, n) > d(a, p) + margin损失为 0,无需优化。
  • 当 d(a, n) < d(a, p) + margin损失为正,需通过梯度下降调整参数

5.  ​与距离度量的关系

        Margin 的作用依赖于具体使用的距离函数:

​        欧氏距离:Margin 是绝对距离差阈值。

        余弦相似度:Margin 转化为相似度差异(如要求负样本的相似度比正样本低至少一个 Margin)

        在代码实现中,通常会对嵌入向量进行归一化(如 L2 归一化),以确保不同距离度量下 Margin 的物理意义一致

最小化L: d(a, p) —> 0d(a, n) —> margin,则L最小化

Triplet Loss同样适用于人脸识别模型(人脸匹配)的训练

同一个人不同角度可以照出多张图片,同一个意图也可以用多个文本(问法)表示

人脸识别是CV领域的文本匹配,文本匹配是NLP领域的人脸识别


3.表示型文本匹配 —— 代码示例 🚀

Ⅰ、配置文件 config.py

① 路径相关参数

model_path:指定训练后模型的保存路径。若加载预训练模型,需提供模型文件的具体存储位置

schema_path:定义数据结构的配置文件路径,通常用于数据预处理或验证输入格式(如JSON/YAML文件)

train_data_path:指向训练集的数据文件或目录。若为目录,需包含多个训练文件(如文本、图像等)

valid_data_path:指向验证集的数据文件或目录。若为目录,需包含多个训练文件(如文本、图像等)

vocab_path:词表文件路径,用于自然语言处理任务中定义词汇的映射关系(如将单词转为ID)

② 模型结构参数 

max_length:序列数据的最大长度(如文本的单词数)。超过此长度的序列会被截断或填充

hidden_size:神经网络隐藏层的维度大小,影响模型的表达能力。例如,LSTM或Transformer中每层的神经元数量

③ 训练控制参数

epoch:训练过程中遍历整个数据集的次数。适当增加轮次可提升模型性能,但可能过拟合

batch_size:每次输入模型的样本数量。较大的批次可加速训练,但需更多显存

epoch_data_size:每轮训练中采样的数量

positive_sample_rate:正样本在批次中的占比,常用于不平衡数据任务(如分类)。需结合负样本比例调整

④ 优化和学习参数

optimizer:选择优化算法(如SGD、Adam、AdamW),影响参数更新策略

learning_rate:控制参数更新的步长。学习率过高可能导致震荡,过低则收敛缓慢

# -*- coding: utf-8 -*-

"""
配置参数信息
"""

Config = {
    "model_path": "model_output",
    "schema_path": "../data/schema.json",
    "train_data_path": "../data/train.json",
    "valid_data_path": "../data/valid.json",
    "vocab_path":"../chars.txt",
    "max_length": 20,
    "hidden_size": 128,
    "epoch": 10,
    "batch_size": 32,
    "epoch_data_size": 200,     #每轮训练中采样数量
    "positive_sample_rate":0.5,  #正样本比例
    "optimizer": "adam",
    "learning_rate": 1e-3,
}

Ⅱ、数据加载 loader.py

① 初始化 def __init__()
属性类型描述
config字典存储传入的配置字典。
path字符串数据存储的路径。
vocab列表或字典从 config["vocab_path"] 加载的词汇表。
config["vocab_size"]整数词汇表的大小,通过 len(self.vocab) 计算得到。
schema字典或其他类型从 config["schema_path"] 加载的模式(schema)。
train_data_size整数每个 epoch 的采样数量,用于控制随机采样的数据量。
data_type字符串标识当前加载的数据类型,可以是 "train" 或 "test"

load_vocab():加载字词表文件

load_schema():加载schema文件

load():在文件中加载数据

len(): Python 内置函数,用于返回对象的长度或元素数量。它适用于多种数据类型,包括字符串、列表、元组、字典、集合等。

参数名类型描述
obj可迭代对象需要计算长度的对象,如字符串、列表、元组、字典、集合等。
class DataGenerator:
    def __init__(self, data_path, config):
        self.config = config
        self.path = data_path
        self.vocab = load_vocab(config["vocab_path"])
        self.config["vocab_size"] = len(self.vocab)
        self.schema = load_schema(config["schema_path"])
        self.train_data_size = config["epoch_data_size"] #由于采取随机采样,所以需要设定一个采样数量,否则可以一直采
        self.data_type = None  #用来标识加载的是训练集还是测试集 "train" or "test"
        self.load()

② 数据集加载(训练 / 测试)def load()

初始化:测试数据集 self.data 和 标准问题与该标签所有问题编码结果的字典 self.knwb

打开文件并逐行读取:使用with open语句打开指定路径的数据文件,编码格式为utf8,确保文件内容可以正确读取

对每一行进行 JSON 解析:从解析后的JSON数据中提取tagtitle字段,并将tag转换为对应的索引label(通过self.label_to_index字典映射)。

判断数据类型并加载数据:根据每一行的数据格式判断是训练集还是测试集。        

  • 如果读取的行是一个字典(即训练集),首先设置self.data_type"train"
  • 然后提取该行的"questions"键对应的值(这是一个问题列表)以及"target"键对应的值(这是一个标签)。
  • 对于问题列表中的每一个问题,先调用self.encode_sentence(question)方法对其进行词汇编码,然后再将其转换为torch.LongTensor格式,以适应后续的模型输入。
  • 最后,将编码后的问题添加到self.knwb字典中对应标签的列表里。
  • 如果读取的行是一个列表(即测试集),首先设置self.data_type"test"
  • 然后使用assert语句确保该行确实是一个列表,并从中提取问题和标签。
  • 对问题进行词汇编码,并转换为torch.LongTensor格式。
  • 同样对标签进行处理,根据self.schema字典将标签转换为索引,并也转换为torch.LongTensor格式。
  • 最后,将编码后的问题和标签索引作为一个样本添加到self.data列表中。

isinstance():Python 内置函数,用于检查一个对象是否属于某个类型或其子类的实例。

参数名类型描述
object任意对象需要检查的对象。
classinfo类型或元组可以是一个类型(如 intstr 等)或一个类型元组。如果 classinfo 是元组,isinstance() 会检查对象是否属于元组中任意一个类型的实例。

assert:Python 中的调试工具,用于检查某个条件是否为真。如果条件为假,assert 会抛出 AssertionError 异常。

torch.LongTensor():创建一个包含整数的 PyTorch 张量(Tensor),元素类型为 torch.int64

参数名类型描述
datalist 或 array包含整数的列表或数组,用于初始化张量。

列表.append():在列表的末尾添加一个元素。

参数名类型描述
elementany要添加到列表末尾的元素,可以是任意类型(如整数、字符串、列表等)。
    def load(self):
        self.data = []
        self.knwb = defaultdict(list)
        with open(self.path, encoding="utf8") as f:
            for line in f:
                line = json.loads(line)
                #加载训练集
                if isinstance(line, dict):
                    self.data_type = "train"
                    questions = line["questions"]
                    label = line["target"]
                    for question in questions:
                        input_id = self.encode_sentence(question)
                        input_id = torch.LongTensor(input_id)
                        self.knwb[self.schema[label]].append(input_id)
                #加载测试集
                else:
                    self.data_type = "test"
                    assert isinstance(line, list)
                    question, label = line
                    input_id = self.encode_sentence(question)
                    input_id = torch.LongTensor(input_id)
                    label_index = torch.LongTensor([self.schema[label]])
                    self.data.append([input_id, label_index])
        return

③ 文本编码 def encode_sentence()

初始化空列表 input_id:用于存储编码后的结果

判断使用词表文件还是字符表:如果是此表文件,则使用jieba对输入文本text进行分词,如果指向的是字符表文件,则直接对每个字符进行编码

对输入序列进行填充或截断:将编码后的序列input_id传递给padding方法。这个方法会确保每个序列的长度都等于config["max_length"]

字典.get():dict.get() 是 Python 字典的内置方法,用于安全地获取字典中指定键的值。如果键存在,则返回对应的值;如果键不存在,则返回默认值(如果未提供默认值,则返回 None)。它的主要作用是避免直接使用 dict[key] 时可能引发的 KeyError 异常。

参数名类型描述
key任意类型需要查找的键。
default任意类型可选,如果键不存在时返回的默认值。默认为 None

列表.append():  Python 列表(list)对象的一个方法,用于在列表的末尾添加一个元素。它会直接修改原列表,而不是返回一个新的列表。

参数名类型描述
element任意类型要添加到列表末尾的元素。可以是整数、字符串、列表、字典、元组等
    def encode_sentence(self, text):
        input_id = []
        if self.config["vocab_path"] == "words.txt":
            for word in jieba.cut(text):
                input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
        else:
            for char in text:
                input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
        input_id = self.padding(input_id)
        return input_id

④ 数据规范 def padding()

截取 input_id 的前 max_length 个元素

如果 input_id 长度不足 max_length,则用 0 填充至 max_length

返回处理后的 input_id

len(): Python 内置函数,用于返回对象的长度或元素数量。它适用于多种数据类型,包括字符串、列表、元组、字典、集合等。

参数名类型描述
obj可迭代对象需要计算长度的对象,如字符串、列表、元组、字典、集合等。
    #补齐或截断输入的序列,使其可以在一个batch内运算
    def padding(self, input_id):
        input_id = input_id[:self.config["max_length"]]
        input_id += [0] * (self.config["max_length"] - len(input_id))
        return input_id

⑤ 返回数据集的长度 def __len__()

该函数的作用是返回对象的“长度”,具体行为取决于对象的 data_type 属性。

如果 data_type 为 "train",则返回 self.config["epoch_data_size"] 的值,这通常表示训练数据的规模。

如果 data_type 为 "test",则返回 self.data 的长度,即测试数据的元素个数。

如果 data_type 不是 "train" 或 "test",则会触发 AssertionError,并提示 self.data_type 的值。

assert:Python 中的调试工具,用于检查某个条件是否为真。如果条件为假,assert 会抛出 AssertionError 异常。 

len(): Python 内置函数,用于返回对象的长度或元素数量。它适用于多种数据类型,包括字符串、列表、元组、字典、集合等。

参数名类型描述
obj可迭代对象需要计算长度的对象,如字符串、列表、元组、字典、集合等。
    def __len__(self):
        if self.data_type == "train":
            return self.config["epoch_data_size"]
        else:
            assert self.data_type == "test", self.data_type
            return len(self.data)

⑥ 通过索引访问数据集中的样本 def __getitem__()

判断数据类型: 如果是训练数据集,则调用self.random_train_sample()方法来随机生成一个训练样本。这是因为训练数据集中的样本是通过随机采样生成的,而不是固定的数据。

如果不是训练数据集,则认为是测试数据集:直接返回self.data列表中索引为index的样本。

    def __getitem__(self, index):
        if self.data_type == "train":
            return self.random_train_sample() #随机生成一个训练样本
        else:
            return self.data[index]

⑦ ⭐随机生成训练样本 

两两为输入,基于文本相似度的训练方式 

        如果决定生成正样本:则从 standard_question_index 中随机选择一个类别 p。接着,检查该类别下的问题数量是否大于等于2。

        如果该类别下的问题数量不足2个,则重新调用 random_train_sample() 方法,即重新随机生成一个样本。否则,从类别 p 中随机选取两个问题 s1 和 s2,并返回这两个问题及其标签 [s1, s2, torch.LongTensor([1])]。这里的标签 1 表示正样本,即这两个问题是属于同一类别的。

        如果决定生成负样本:则从 standard_question_index 中随机选择两个不同的类别 p 和 n。接着,从每个类别中随机选取一个问题,分别为 s1 和 s2,并返回这两个问题及其标签 [s1, s2, torch.LongTensor([-1])]。这里的标签 -1 表示负样本,即这两个问题不属于同一类别

list():将可迭代对象(如字符串、元组、集合等)转换为列表。

参数名类型描述
iterable可迭代对象需要转换为列表的可迭代对象,如字符串、元组、集合等。

字典.keys():返回字典中所有键的视图对象。

random.random():生成一个范围在 [0.0,1.0) 之间的随机浮点数。

random.sample(): 从序列中随机选择指定数量的唯一元素,返回一个新列表。

参数名类型描述
population序列需要从中选择的序列,如列表、元组等。
k整数需要选择的元素数量。

random.choice():从非空序列中随机选择一个元素。

参数名类型描述
seq序列需要从中选择的非空序列,如列表、元组等。

torch.LongTensor():创建一个包含整数的张量,元素类型为 torch.int64

参数名类型描述
data列表或数组包含整数的数据,用于创建张量。
    #依照一定概率生成负样本或正样本
    #负样本从随机两个不同的标准问题中各随机选取一个
    #正样本从随机一个标准问题中随机选取两个
    def random_train_sample(self):
        standard_question_index = list(self.knwb.keys())
        #随机正样本
        if random.random() <= self.config["positive_sample_rate"]:
            p = random.choice(standard_question_index)
            #如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
            if len(self.knwb[p]) < 2:
                return self.random_train_sample()
            else:
                s1, s2 = random.sample(self.knwb[p], 2)
                return [s1, s2, torch.LongTensor([1])]
        #随机负样本
        else:
            p, n = random.sample(standard_question_index, 2)
            s1 = random.choice(self.knwb[p])
            s2 = random.choice(self.knwb[n])
            return [s1, s2, torch.LongTensor([-1])]

⑧ 数据处理

        加载词汇表:从指定路径的文件中读取字表或词表,并将其转化为一个字典,其中字或词作为键,对应的索引作为值。该函数确保索引从 1 开始,因为索引 0 被保留用于填充(padding)操作。这个字典在后续的数据编码过程中会被频繁使用,能够将文本中的每个字或词映射为唯一的数字索引,便于计算机处理。

open():用于打开文件并返回文件对象,支持读取、写入等文件操作。

参数名类型描述
file字符串文件路径(包括文件名),可以是绝对路径或相对路径。
mode字符串文件打开模式,默认为 'r'(只读)。常见模式包括 'r''w''a' 等。
buffering整数缓冲设置,默认为 -1(系统默认)。
encoding字符串文件编码方式,默认为 None
errors字符串编码错误的处理方式,默认为 None
newline字符串换行符设置,默认为 None
closefd布尔值控制 file 参数的传入值类型,默认为 True

enumerate():为可迭代对象(如列表、元组等)添加索引,返回一个枚举对象,生成 (index, value) 对。

参数名类型描述
iterable可迭代对象需要枚举的对象,如列表、元组等。
start整数索引的起始值,默认为 0

strip():去除字符串开头和结尾的指定字符(默认去除空白字符,如空格、换行符等)。

参数名类型描述
chars字符串可选,指定要删除的字符。默认为去除空白字符。
#加载字表或词表
def load_vocab(vocab_path):
    token_dict = {}
    with open(vocab_path, encoding="utf8") as f:
        for index, line in enumerate(f):
            token = line.strip()
            token_dict[token] = index + 1  #0留给padding位置,所以从1开始
    return token_dict

加载schema:打开schema_path路径下的文件,使用utf8编码读取文件内容;使用json.loads将读取的内容解析为Python对象并返回

open():用于打开文件并返回文件对象,支持读取、写入等文件操作

参数名类型描述
file字符串文件路径(包括文件名),可以是绝对路径或相对路径。
mode字符串文件打开模式,默认为 'r'(只读)。常见模式包括 'r''w''a' 等。
buffering整数缓冲设置,默认为 -1(系统默认)。
encoding字符串文件编码方式,默认为 None
errors字符串编码错误的处理方式,默认为 None
newline字符串换行符设置,默认为 None
closefd布尔值控制 file 参数的传入值类型,默认为 True

json.loads():将 JSON 格式的字符串解析为 Python 对象(如字典、列表等)。

参数名类型描述
s字符串要解析的 JSON 字符串。
encoding字符串字符编码(Python 3 中已弃用)。
cls自定义解码类,默认为 None
object_hook函数可选函数,允许自定义将 JSON 对象转换为其他类型的 Python 对象。
parse_float函数自定义将 JSON 中的浮点数转换为特定类型。
parse_int函数自定义将 JSON 中的整数转换为特定类型。
object_pairs_hook函数用于处理 JSON 对象中的键值对,默认返回字典。

文件对象.read():从文件中读取指定数量的字节或整个文件内容。

参数名类型描述
size整数可选,指定要读取的字节数。如果未指定或为负数,则读取整个文件内容。
#加载schema
def load_schema(schema_path):
    with open(schema_path, encoding="utf8") as f:
        return json.loads(f.read())

封装数据加载器:根据给定的配置,将数据文件加载并编码成模型可识别的格式,然后通过DataLoader类封装成一个数据加载器。

DataLoader():PyTorch 中的一个标准工具,用于高效地加载和处理数据。它支持批量加载、数据打乱、多线程加载等功能,是深度学习训练中常用的数据加载器。

参数名类型描述
dataset数据集对象要加载的数据集,必须实现 __len__ 和 __getitem__ 方法。
batch_size整数每个批次的大小,默认为 1
shuffle布尔值是否在每个 epoch 开始时打乱数据顺序,默认为 False
sampler采样器对象自定义采样器,用于控制数据的采样方式,默认为 None
batch_sampler批次采样器对象自定义批次采样器,用于控制批次的采样方式,默认为 None
num_workers整数用于数据加载的子进程数量,默认为 0(主进程加载)。
collate_fn函数用于将一个批次的数据合并成一个张量或元组,默认为 None
pin_memory布尔值是否将数据存储在 pin memory 中(用于 GPU 加速),默认为 False
drop_last布尔值如果数据不能完全分成批次,是否删除最后一个不完整的批次,默认为 False
timeout整数数据加载的最大等待时间(秒),默认为 0(无限制)。
worker_init_fn函数用于初始化每个数据加载器子进程的函数,默认为 None
#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
    dg = DataGenerator(data_path, config)
    dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
    return dl

⑨ 加载数据文件
# -*- coding: utf-8 -*-

import json
import re
import os
import torch
import random
import jieba
import numpy as np
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
"""
数据加载
"""


class DataGenerator:
    def __init__(self, data_path, config):
        self.config = config
        self.path = data_path
        self.vocab = load_vocab(config["vocab_path"])
        self.config["vocab_size"] = len(self.vocab)
        self.schema = load_schema(config["schema_path"])
        self.train_data_size = config["epoch_data_size"] #由于采取随机采样,所以需要设定一个采样数量,否则可以一直采
        self.data_type = None  #用来标识加载的是训练集还是测试集 "train" or "test"
        self.load()

    def load(self):
        self.data = []
        self.knwb = defaultdict(list)
        with open(self.path, encoding="utf8") as f:
            for line in f:
                line = json.loads(line)
                #加载训练集
                if isinstance(line, dict):
                    self.data_type = "train"
                    questions = line["questions"]
                    label = line["target"]
                    for question in questions:
                        input_id = self.encode_sentence(question)
                        input_id = torch.LongTensor(input_id)
                        self.knwb[self.schema[label]].append(input_id)
                #加载测试集
                else:
                    self.data_type = "test"
                    assert isinstance(line, list)
                    question, label = line
                    input_id = self.encode_sentence(question)
                    input_id = torch.LongTensor(input_id)
                    label_index = torch.LongTensor([self.schema[label]])
                    self.data.append([input_id, label_index])
        return

    def encode_sentence(self, text):
        input_id = []
        if self.config["vocab_path"] == "words.txt":
            for word in jieba.cut(text):
                input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
        else:
            for char in text:
                input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
        input_id = self.padding(input_id)
        return input_id

    #补齐或截断输入的序列,使其可以在一个batch内运算
    def padding(self, input_id):
        input_id = input_id[:self.config["max_length"]]
        input_id += [0] * (self.config["max_length"] - len(input_id))
        return input_id

    def __len__(self):
        if self.data_type == "train":
            return self.config["epoch_data_size"]
        else:
            assert self.data_type == "test", self.data_type
            return len(self.data)

    def __getitem__(self, index):
        if self.data_type == "train":
            return self.random_train_sample() #随机生成一个训练样本
        else:
            return self.data[index]

    #依照一定概率生成负样本或正样本
    #负样本从随机两个不同的标准问题中各随机选取一个
    #正样本从随机一个标准问题中随机选取两个
    def random_train_sample(self):
        standard_question_index = list(self.knwb.keys())
        #随机正样本
        if random.random() <= self.config["positive_sample_rate"]:
            p = random.choice(standard_question_index)
            #如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
            if len(self.knwb[p]) < 2:
                return self.random_train_sample()
            else:
                s1, s2 = random.sample(self.knwb[p], 2)
                return [s1, s2, torch.LongTensor([1])]
        #随机负样本
        else:
            p, n = random.sample(standard_question_index, 2)
            s1 = random.choice(self.knwb[p])
            s2 = random.choice(self.knwb[n])
            return [s1, s2, torch.LongTensor([-1])]



#加载字表或词表
def load_vocab(vocab_path):
    token_dict = {}
    with open(vocab_path, encoding="utf8") as f:
        for index, line in enumerate(f):
            token = line.strip()
            token_dict[token] = index + 1  #0留给padding位置,所以从1开始
    return token_dict

#加载schema
def load_schema(schema_path):
    with open(schema_path, encoding="utf8") as f:
        return json.loads(f.read())

#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
    dg = DataGenerator(data_path, config)
    dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
    return dl



if __name__ == "__main__":
    from config import Config
    dg = DataGenerator(r"F:\人工智能NLP\NLP\Day8_文本匹配问题\data\train.json", Config)
    print(dg[1])


Ⅲ、模型定义 model.py

① 生成句子的嵌入表示(文本转向量)

模型初始化

  • hidden_size隐藏层的大小,决定了 Embedding 和线性层的输出维度。
  • vocab_size词汇表的大小,加 1 是为了处理填充索引(padding_idx=0)。
  • max_length句子的最大长度(虽然代码中未直接使用,但可能是为后续处理预留的)。
  • self.embeddingnn.Embedding :将词索引映射为 hidden_size 维的向量。
  • self.layernn.Linear 层:对 Embedding 层的输出进行线性变换。
  • self.dropoutnn.Dropout 层,设置丢弃率为 0.5。

nn.Embedding():用于将离散的类别索引(如单词索引)映射到连续的稠密向量空间中。它常用于自然语言处理(NLP)任务中,将单词或其他类别信息转换为向量表示。

参数名类型描述
num_embeddings整数嵌入字典的大小,即不同类别的总数(如词汇表大小)。
embedding_dim整数每个嵌入向量的维度。
padding_idx整数(可选)指定一个索引,用于填充(padding),该索引对应的嵌入向量不会被更新。
max_norm浮点数(可选)如果设置了该值,嵌入向量的范数会被裁剪到不超过 max_norm
norm_type浮点数(可选)用于裁剪范数的类型,默认为 2(L2 范数)。
scale_grad_by_freq布尔值(可选)如果为 True,梯度会根据单词在 mini-batch 中出现的频率进行缩放。
sparse布尔值(可选)如果为 True,梯度将变为稀疏张量,适用于大型词汇表以节省内存。

nn.Linear():是一个全连接层,用于对输入数据进行线性变换。它将输入特征与权重矩阵相乘,并加上偏置项,输出变换后的结果。

参数名类型描述
in_features整数输入特征的数量。
out_features整数输出特征的数量。
bias布尔值(可选)是否使用偏置项,默认为 True

nn.Dropout(): 一种正则化技术,用于防止模型过拟合。它在训练过程中随机将一部分神经元的输出设置为 0,从而减少神经元之间的依赖性。

参数名类型描述
p浮点数神经元被丢弃的概率,默认为 0.5。
inplace布尔值(可选)是否就地操作,默认为 False
class SentenceEncoder(nn.Module):
    def __init__(self, config):
        super(SentenceEncoder, self).__init__()
        hidden_size = config["hidden_size"]
        vocab_size = config["vocab_size"] + 1
        max_length = config["max_length"]
        self.embedding = nn.Embedding(vocab_size, hidden_size, padding_idx=0)
        # self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True, bidirectional=True)
        self.layer = nn.Linear(hidden_size, hidden_size)
        self.dropout = nn.Dropout(0.5)

前向传播

transpose():用于将数组或矩阵的行和列进行转置。例如,将行数据转换为列数据,或将列数据转换为行数据。

参数名类型描述
array数组或矩阵需要进行转置的数组或矩阵。
axes元组(可选)指定转置的轴顺序,默认为 None,表示反转所有轴。

squeeze(): 用于移除数组中维度为 1 的维度,从而简化数组的形状。

参数名类型描述
a数组输入的数组。
axis整数或元组(可选)指定要移除的维度,默认为 None,表示移除所有维度为 1 的轴。

shape(): 用于获取数组或矩阵的形状,返回一个表示各维度大小的元组。

参数名类型描述
array数组或矩阵需要获取形状的数组或矩阵。
    #输入为问题字符编码
    def forward(self, x):
        x = self.embedding(x)
        #使用lstm
        # x, _ = self.lstm(x)
        #使用线性层
        x = self.layer(x)
        x = nn.functional.max_pool1d(x.transpose(1, 2), x.shape[1]).squeeze()
        return x

② 计算句子间相似度

模型初始化

  • super(SiameseNetwork, self).__init__()调用父类 nn.Module 的初始化方法。
  • self.sentence_encoder = SentenceEncoder(config)初始化一个 SentenceEncoder 实例,用于对句子进行编码。
  • self.loss = nn.CosineEmbeddingLoss()初始化余弦嵌入损失函数,用于计算两个句子向量之间的相似度损失。

nn.CosineEmbeddingLoss(): PyTorch 中的一个损失函数,用于衡量两个输入向量的相似性。它通过计算两个输入向量的余弦相似度来评估它们的相似性,并根据标签值(1 或 -1)调整损失。

  • 如果 y=1,表示两个输入向量应相似,损失为 1−cos(x1​,x2​)。
  • 如果 y=−1,表示两个输入向量应不相似,损失为 max(0,cos(x1​,x2​))。
参数名类型描述
margin浮点数(可选)用于调整不相似样本的损失,默认为 0.0
reduction字符串(可选)指定损失的聚合方式,可选值为 'none''mean' 或 'sum',默认为 'mean'
class SiameseNetwork(nn.Module):
    def __init__(self, config):
        super(SiameseNetwork, self).__init__()
        self.sentence_encoder = SentenceEncoder(config)
        self.loss = nn.CosineEmbeddingLoss()

计算余弦距离:计算两个输入张量之间的余弦距离。它首先对输入的张量进行归一化处理,确保每个向量的长度为 1,然后计算归一化张量之间的点积,最后通过 1 - 点积 转化为余弦距离。

torch.nn.functional.normalize():用于对输入张量在指定维度上进行 Lp​ 范数归一化。默认情况下,它使用 L2​ 范数(欧几里得范数)对向量进行归一化。

公式:

  • v 是输入张量的某一维度上的向量。
  • ||v||_p​ 是 Lp​ 范数。
  • ϵ 是一个小值,用于避免除以零。
参数名类型描述
inputTensor输入张量。
pfloat范数的指数值,默认为 2(L2​ 范数)。
dimint 或 tuple归一化的维度,默认为 1
epsfloat用于避免除以零的小值,默认为 1e-12
outTensor(可选)输出张量。如果指定,操作不可微分。

torch.sum():计算输入张量在指定维度上的元素和。如果不指定维度,则计算所有元素的和。

参数名类型描述
inputTensor输入张量。
dimint 或 tuple求和的维度,默认为 None(计算所有元素的和)。
keepdimbool是否保留求和后的维度,默认为 False
dtypedtype(可选)输出张量的数据类型,默认为 None

torch.mul():对两个张量进行逐元素乘法。如果输入张量的形状不同,会进行广播操作。

参数名类型描述
inputTensor第一个输入张量。
otherTensor 或 float第二个输入张量或标量。
outTensor(可选)输出张量。
    # 计算余弦距离  1-cos(a,b)
    # cos=1时两个向量相同,余弦距离为0;cos=0时,两个向量正交,余弦距离为1
    def cosine_distance(self, tensor1, tensor2):
        tensor1 = torch.nn.functional.normalize(tensor1, dim=-1)
        tensor2 = torch.nn.functional.normalize(tensor2, dim=-1)
        cosine = torch.sum(torch.mul(tensor1, tensor2), axis=-1)
        return 1 - cosine

计算Triplet_Loss三元组损失:三元组损失可以用于训练神经网络以学习特征向量之间的相对距离,确保对于锚点而言,正样本更接近(较小的余弦距离),而负样本更远(较大的余弦距离)。通过这种方式,可以有效地训练分类器或聚类模型,使得同一类的样本点之间的距离小于不同类的样本点之间的距离。这里的cosine_triplet_loss的具体实现可以通过余弦距离来衡量样本之间的相似性,并通过margin参数来控制正负样本之间距离的差距。

squeeze(): 用于移除数组中维度为 1 的维度,从而简化数组的形状。

参数名类型描述
a数组输入的数组。
axis整数或元组(可选)指定要移除的维度,默认为 None,表示移除所有维度为 1 的轴。

torch.mean(): PyTorch 中的一个函数,用于计算张量(Tensor)的平均值。它可以计算整个张量所有元素的平均值,或者指定某个维度上的平均值。该函数在数据预处理、统计分析以及神经网络训练过程中计算损失等场景中非常有用

参数名类型描述
inputTensor需要计算平均值的输入张量。
dimint 或 tuple of ints指定在哪个维度上计算平均值。例如,dim=0 表示按行计算,dim=1 表示按列计算。默认为 None,计算所有元素的平均值。
keepdimbool是否保持输出张量的维度数量与输入张量一致。默认为 False,即减少维度数量。
outTensor (可选)指定输出的张量。如果提供,结果将存储在该张量中。
    def cosine_triplet_loss(self, a, p, n, margin=None):
        ap = self.cosine_distance(a, p)
        an = self.cosine_distance(a, n)
        if margin is None:
            diff = ap - an + 0.1
        else:
            diff = ap - an + margin.squeeze()
        return torch.mean(diff[diff.gt(0)]) #greater than

前向传播:处理单个或两个句子的输入。当输入两个句子时,模型会计算它们的向量表示,并根据是否提供标签计算损失或余弦距离;当只提供一个句子时,模型返回该句子的向量表示。这种

squeeze(): 用于移除数组中维度为 1 的维度,从而简化数组的形状。

参数名类型描述
a数组输入的数组。
axis整数或元组(可选)指定要移除的维度,默认为 None,表示移除所有维度为 1 的轴。
    #sentence : (batch_size, max_length)
    def forward(self, sentence1, sentence2=None, target=None):
        #同时传入两个句子
        if sentence2 is not None:
            vector1 = self.sentence_encoder(sentence1) #vec:(batch_size, hidden_size)
            vector2 = self.sentence_encoder(sentence2)
            #如果有标签,则计算loss
            if target is not None:
                return self.loss(vector1, vector2, target.squeeze())
            #如果无标签,计算余弦距离
            else:
                return self.cosine_distance(vector1, vector2)
        #单独传入一个句子时,认为正在使用向量化能力
        else:
            return self.sentence_encoder(sentence1)

③ 根据配置选择优化器

        根据用户在配置字典中指定的优化器类型("adam" 或 "sgd")来选择并初始化相应的优化器,并设置学习率。

model.parameters():PyTorch 中 torch.nn.Module 类的一个方法,用于获取模型中所有可训练参数的迭代器。这些参数通常是模型的权重和偏置,它们会在训练过程中通过优化器进行更新。

def choose_optimizer(config, model):
    optimizer = config["optimizer"]
    learning_rate = config["learning_rate"]
    if optimizer == "adam":
        return Adam(model.parameters(), lr=learning_rate)
    elif optimizer == "sgd":
        return SGD(model.parameters(), lr=learning_rate)

④ 模型验证

        设置配置参数、构造一个Siamese网络模型,并在此模型上运行两个句子的相似度计算。通过输入一对句子的编码和相应的标签,模型可以评估这两个句子的相似性,同时计算损失。最终的输出将是损失值或相似度度量。 

torch.LongTensor(): PyTorch 中的一个函数,用于创建一个包含整数(64 位整型)的张量。它是一种特定的张量类型,通常用于存储索引、标签或其他整数值数据。

参数名类型描述
data列表、元组或数组输入数据,用于创建张量。
dtypetorch.dtype张量的数据类型,默认为 torch.int64(64 位整数)。
devicetorch.device张量存储的设备(如 CPU 或 GPU),默认为 None(使用默认设备)。
requires_gradbool是否需要计算梯度,默认为 False
# -*- coding: utf-8 -*-

import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
"""
建立网络模型结构
"""

class SentenceEncoder(nn.Module):
    def __init__(self, config):
        super(SentenceEncoder, self).__init__()
        hidden_size = config["hidden_size"]
        vocab_size = config["vocab_size"] + 1
        max_length = config["max_length"]
        self.embedding = nn.Embedding(vocab_size, hidden_size, padding_idx=0)
        # self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True, bidirectional=True)
        self.layer = nn.Linear(hidden_size, hidden_size)
        self.dropout = nn.Dropout(0.5)

    #输入为问题字符编码
    def forward(self, x):
        x = self.embedding(x)
        #使用lstm
        # x, _ = self.lstm(x)
        #使用线性层
        x = self.layer(x)
        x = nn.functional.max_pool1d(x.transpose(1, 2), x.shape[1]).squeeze()
        return x


class SiameseNetwork(nn.Module):
    def __init__(self, config):
        super(SiameseNetwork, self).__init__()
        self.sentence_encoder = SentenceEncoder(config)
        self.loss = nn.CosineEmbeddingLoss()

    # 计算余弦距离  1-cos(a,b)
    # cos=1时两个向量相同,余弦距离为0;cos=0时,两个向量正交,余弦距离为1
    def cosine_distance(self, tensor1, tensor2):
        tensor1 = torch.nn.functional.normalize(tensor1, dim=-1)
        tensor2 = torch.nn.functional.normalize(tensor2, dim=-1)
        cosine = torch.sum(torch.mul(tensor1, tensor2), axis=-1)
        return 1 - cosine

    def cosine_triplet_loss(self, a, p, n, margin=None):
        ap = self.cosine_distance(a, p)
        an = self.cosine_distance(a, n)
        if margin is None:
            diff = ap - an + 0.1
        else:
            diff = ap - an + margin.squeeze()
        return torch.mean(diff[diff.gt(0)]) #greater than

    #sentence : (batch_size, max_length)
    def forward(self, sentence1, sentence2=None, target=None):
        #同时传入两个句子
        if sentence2 is not None:
            vector1 = self.sentence_encoder(sentence1) #vec:(batch_size, hidden_size)
            vector2 = self.sentence_encoder(sentence2)
            #如果有标签,则计算loss
            if target is not None:
                return self.loss(vector1, vector2, target.squeeze())
            #如果无标签,计算余弦距离
            else:
                return self.cosine_distance(vector1, vector2)
        #单独传入一个句子时,认为正在使用向量化能力
        else:
            return self.sentence_encoder(sentence1)


def choose_optimizer(config, model):
    optimizer = config["optimizer"]
    learning_rate = config["learning_rate"]
    if optimizer == "adam":
        return Adam(model.parameters(), lr=learning_rate)
    elif optimizer == "sgd":
        return SGD(model.parameters(), lr=learning_rate)


if __name__ == "__main__":
    from config import Config
    Config["vocab_size"] = 10
    Config["max_length"] = 4
    model = SiameseNetwork(Config)
    s1 = torch.LongTensor([[1,2,3,0], [2,2,0,0]])
    s2 = torch.LongTensor([[1,2,3,4], [3,2,3,4]])
    l = torch.LongTensor([[1],[0]])
    y = model(s1, s2, l)
    print(y)
    # print(model.state_dict())

Ⅳ、模型效果测试 evaluate.py

① 模型初始化

        初始化Evaluator类,设置配置、模型和日志记录器。加载验证数据集和训练数据集(用于效果测试),并初始化统计字典以记录正确和错误的数量。

class Evaluator:
    def __init__(self, config, model, logger):
        self.config = config
        self.model = model
        self.logger = logger
        self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)
        # 由于效果测试需要训练集当做知识库,再次加载训练集。
        # 事实上可以通过传参把前面加载的训练集传进来更合理,但是为了主流程代码改动量小,在这里重新加载一遍
        self.train_data = load_data(config["train_data_path"], config)
        self.stats_dict = {"correct":0, "wrong":0}  #用于存储测试结果

② 将问题转换为向量表示 ⭐

        将训练数据集中的问题转换为向量形式,并记录每个问题编号到标准问题编号的映射关系。这样做的目的是为了在模型评估过程中,通过计算输入问题与知识库中所有问题向量的相似度,来判断模型的预测结果是否正确。归一化处理确保了向量之间的相似度计算更加准确可靠。 

初始化字典和列表

  • 创建了一个空字典 question_index_to_standard_question_index,用于存储问题编号到标准问题编号的映射。
  • 创建了一个空列表 question_ids,用于存储所有训练问题的ID。

遍历训练数据集中的问题

  • 使用 for 循环遍历训练数据集中的每个标准问题及其对应的问题列表。
  • 对于每个问题ID,将其添加到 question_ids 列表中,并在 question_index_to_standard_question_index 中记录问题编号到标准问题编号的映射关系。

将问题ID转换为向量

  • 使用 torch.no_grad() 上下文管理器,确保在执行这段代码时不会计算梯度,因为这里不需要进行反向传播。
  • 使用 torch.stack 将 question_ids 列表中的所有问题ID堆叠在一起,形成一个二维的 question_matrixs 张量,其中每一行对应一个问题ID。
  • 检查CUDA是否可用,如果可用,则将 question_matrixs 移动到GPU上。
  • 将 question_matrixs 输入到模型 self.model 中,得到问题的向量化表示 knwb_vectors
  • 对 knwb_vectors 中的所有向量进行归一化处理,即将每个向量除以其自身的范数,这样可以使得不同向量之间的相似度计算更加合理。

items():Python 字典的方法,返回字典中所有键值对的视图,每个键值对以元组形式返回。

append():Python 列表的方法,用于在列表末尾添加一个元素。

参数名类型描述
element任意类型要添加到列表末尾的元素。

torch.no_grad():PyTorch 的上下文管理器,用于禁用梯度计算,通常在模型推理或评估时使用,以减少内存消耗并加速计算。

torch.stack():用于将多个张量沿着新的维度进行堆叠,所有输入张量的形状必须相同。

参数名类型描述
tensors张量序列要堆叠的张量序列。
dimint指定堆叠的维度,默认为 0

torch.cuda.is_available():用于检查当前系统是否支持 CUDA(即是否有可用的 GPU)。

cuda():PyTorch 张量或模型的方法,用于将张量或模型移动到 GPU 上。

参数名类型描述
deviceint 或 str指定目标 GPU 设备,默认为 None(使用默认 GPU)。

torch.nn.functional.normalize():用于对输入张量在指定维度上进行 Lp​ 范数(p 是范数的指数值)归一化,默认使用 L2​ 范数。

参数名类型描述
inputTensor输入张量。
pfloat范数的指数值,默认为 2(L2​ 范数)。
dimint归一化的维度,默认为 1
epsfloat用于避免除以零的小值,默认为 1e-12
    #将知识库中的问题向量化,为匹配做准备
    #每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
    def knwb_to_vector(self):
        self.question_index_to_standard_question_index = {}
        self.question_ids = []
        for standard_question_index, question_ids in self.train_data.dataset.knwb.items():
            for question_id in question_ids:
                #记录问题编号到标准问题标号的映射,用来确认答案是否正确
                self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_index
                self.question_ids.append(question_id)
        with torch.no_grad():
            question_matrixs = torch.stack(self.question_ids, dim=0)
            if torch.cuda.is_available():
                question_matrixs = question_matrixs.cuda()
            self.knwb_vectors = self.model(question_matrixs)
            #将所有向量都作归一化 v / |v|
            self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
        return

③ 评估模型表现

1.记录日志并初始化统计字典。

2.将模型设置为评估模式,并将知识库转换为向量表示。

3.遍历验证数据集,处理每一批次的数据:

        如果有GPU可用,将数据移动到GPU。

        使用模型进行预测,不计算梯度。

        更新统计信息。

4.显示最终的统计结果。

logger.info():Python 的 logging 模块中用于记录信息级别(info level)日志的函数。它通常用于记录程序运行时的关键信息,如状态、进度等,而不是调试信息或错误。

参数名类型描述
msgstr要记录的日志信息。
*args任意类型用于格式化日志信息的参数。
**kwargsdict可选参数,如 exc_infostack_info 等,用于附加异常或堆栈信息。

model.eval():PyTorch 中用于将模型设置为评估模式的方法。在评估模式下,模型会禁用 Dropout 和 Batch Normalization 等训练时的特定行为,以确保测试结果的稳定性。

enumerate():Python 的内置函数,用于在遍历可迭代对象(如列表、元组、字符串)时同时获取索引和值。

参数名类型描述
iterable可迭代对象要遍历的对象(如列表、元组、字符串)。
startint索引的起始值,默认为 0

torch.cuda.is_available():PyTorch 中用于检查当前系统是否支持 CUDA(即是否有可用的 GPU)的函数。

参数名类型描述
deviceint 或 str指定目标 GPU 设备,默认为 None(使用默认 GPU)。

cuda():PyTorch 中用于将张量或模型移动到 GPU 上的方法。

torch.no_grad():PyTorch 的上下文管理器,用于禁用梯度计算,通常在模型推理或评估时使用,以减少内存消耗并加速计算。

    def eval(self, epoch):
        self.logger.info("开始测试第%d轮模型效果:" % epoch)
        self.stats_dict = {"correct":0, "wrong":0}  #清空前一轮的测试结果
        self.model.eval()
        self.knwb_to_vector()
        for index, batch_data in enumerate(self.valid_data):
            if torch.cuda.is_available():
                batch_data = [d.cuda() for d in batch_data]
            input_id, labels = batch_data   #输入变化时这里需要修改,比如多输入,多输出的情况
            with torch.no_grad():
                test_question_vectors = self.model(input_id) #不输入labels,使用模型当前参数进行预测
            self.write_stats(test_question_vectors, labels)
        self.show_stats()
        return

④ 计算输入问题向量与知识库问题向量的相似度

断言检查:首先通过 assert 确保 labels 和 test_question_vectors 的长度一致。

矩阵乘法:对于每个测试问题向量 test_question_vector,通过 torch.mm 函数计算其与知识库中所有问题向量 self.knwb_vectors 的相似度。

    torch.mm 是 PyTorch 中的矩阵乘法函数,用于计算两个矩阵的乘积

    test_question_vector.unsqueeze(0)将 test_question_vector 的形状从 [vec_size] 扩展为 [1, vec_size],以便与 self.knwb_vectors.T 进行矩阵乘法。

   self.knwb_vectors.T知识库问题向量的转置,形状为 [vec_size, n],其中 n 是知识库中问题的数量。

    res矩阵乘法的结果,形状为 [1, n],表示测试问题与知识库中每个问题的相似度。

命中问题标号:通过 torch.argmax 找到相似度最高的索引 hit_index,即命中问题的标号。

标准问编号转换:将 hit_index 转换为标准问编号self.question_index_to_standard_question_index[hit_index]。​

统计更新:如果命中问题的标准问编号与标签 label 一致,则增加 

self.stats_dict["correct"] 的计数;否则增加 self.stats_dict["wrong"] 的计数。

assert:是 Python 中的调试工具,用于检查某个条件是否为真。如果条件为假,程序会抛出 AssertionError 并终止执行。

参数名类型描述
conditionbool要检查的条件表达式。如果为假,则抛出 AssertionError
messagestr可选参数,用于在断言失败时显示的错误信息。

zip():Python 的内置函数,用于将多个可迭代对象(如列表、元组)的元素按索引配对,返回一个 zip 对象。

参数名类型描述
iterables可迭代对象要配对的多个可迭代对象。

unsqueeze(): 是 PyTorch 中的函数,用于在指定维度上插入一个大小为 1 的维度,从而改变张量的形状。

参数名类型描述
inputTensor输入张量。
dimint要插入新维度的位置,范围是 [-input.dim()-1, input.dim()]

.T:PyTorch 和 NumPy 中的属性,用于返回张量或矩阵的转置。

torch.mm(): PyTorch 中的函数,用于计算两个二维张量(矩阵)的矩阵乘法。

参数名类型描述
inputTensor第一个矩阵,形状为 (m, n)
mat2Tensor第二个矩阵,形状为 (n, p)

torch.argmax():PyTorch 中的函数,用于返回张量中最大值所在的索引。

参数名类型描述
inputTensor输入张量。
dimint指定沿哪个维度计算最大值索引。
keepdimbool是否保持输出张量的维度,默认为 False

squeeze():是 PyTorch 和 NumPy 中的函数,用于移除张量或数组中大小为 1 的维度。

参数名类型描述
inputTensor输入张量。
dimint可选参数,指定要移除的维度。
    def write_stats(self, test_question_vectors, labels):
        assert len(labels) == len(test_question_vectors)
        for test_question_vector, label in zip(test_question_vectors, labels):
            #通过一次矩阵乘法,计算输入问题和知识库中所有问题的相似度
            #test_question_vector shape [vec_size]   knwb_vectors shape = [n, vec_size]
            res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
            hit_index = int(torch.argmax(res.squeeze())) #命中问题标号
            hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号
            if int(hit_index) == int(label):
                self.stats_dict["correct"] += 1
            else:
                self.stats_dict["wrong"] += 1
        return

⑤ 展示模型预测的统计信息
  • 统计信息提取
    • correct = self.stats_dict["correct"]:从 self.stats_dict 中获取预测正确的条目数。
    • wrong = self.stats_dict["wrong"]:从 self.stats_dict 中获取预测错误的条目数。
  • 日志输出
    • self.logger.info("预测集合条目总量:%d" % (correct + wrong)):输出预测集合的总条目数,即正确条目数与错误条目数之和。
    • self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong)):输出预测正确的条目数和预测错误的条目数。
    • self.logger.info("预测准确率:%f" % (correct / (correct + wrong))):输出预测的准确率,即正确条目数占总条目数的比例。
    • self.logger.info("--------------------"):输出分隔线,用于区分不同的日志信息。

logger.info(): 是 Python 的 logging 模块中用于记录信息级别(info level)日志的函数。它通常用于记录程序运行时的关键信息,如状态、进度等,而不是调试信息或错误。

参数名类型描述
msgstr要记录的日志信息。
*args任意类型用于格式化日志信息的参数。
**kwargsdict可选参数,如 exc_infostack_info 等,用于附加异常或堆栈信息。
# -*- coding: utf-8 -*-
import torch
from loader import load_data

"""
模型效果测试
"""

class Evaluator:
    def __init__(self, config, model, logger):
        self.config = config
        self.model = model
        self.logger = logger
        self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)
        # 由于效果测试需要训练集当做知识库,再次加载训练集。
        # 事实上可以通过传参把前面加载的训练集传进来更合理,但是为了主流程代码改动量小,在这里重新加载一遍
        self.train_data = load_data(config["train_data_path"], config)
        self.stats_dict = {"correct":0, "wrong":0}  #用于存储测试结果

    #将知识库中的问题向量化,为匹配做准备
    #每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
    def knwb_to_vector(self):
        self.question_index_to_standard_question_index = {}
        self.question_ids = []
        for standard_question_index, question_ids in self.train_data.dataset.knwb.items():
            for question_id in question_ids:
                #记录问题编号到标准问题标号的映射,用来确认答案是否正确
                self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_index
                self.question_ids.append(question_id)
        with torch.no_grad():
            question_matrixs = torch.stack(self.question_ids, dim=0)
            if torch.cuda.is_available():
                question_matrixs = question_matrixs.cuda()
            self.knwb_vectors = self.model(question_matrixs)
            #将所有向量都作归一化 v / |v|
            self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
        return

    def eval(self, epoch):
        self.logger.info("开始测试第%d轮模型效果:" % epoch)
        self.stats_dict = {"correct":0, "wrong":0}  #清空前一轮的测试结果
        self.model.eval()
        self.knwb_to_vector()
        for index, batch_data in enumerate(self.valid_data):
            if torch.cuda.is_available():
                batch_data = [d.cuda() for d in batch_data]
            input_id, labels = batch_data   #输入变化时这里需要修改,比如多输入,多输出的情况
            with torch.no_grad():
                test_question_vectors = self.model(input_id) #不输入labels,使用模型当前参数进行预测
            self.write_stats(test_question_vectors, labels)
        self.show_stats()
        return

    def write_stats(self, test_question_vectors, labels):
        assert len(labels) == len(test_question_vectors)
        for test_question_vector, label in zip(test_question_vectors, labels):
            #通过一次矩阵乘法,计算输入问题和知识库中所有问题的相似度
            #test_question_vector shape [vec_size]   knwb_vectors shape = [n, vec_size]
            res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
            hit_index = int(torch.argmax(res.squeeze())) #命中问题标号
            hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号
            if int(hit_index) == int(label):
                self.stats_dict["correct"] += 1
            else:
                self.stats_dict["wrong"] += 1
        return

    def show_stats(self):
        correct = self.stats_dict["correct"]
        wrong = self.stats_dict["wrong"]
        self.logger.info("预测集合条目总量:%d" % (correct +wrong))
        self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong))
        self.logger.info("预测准确率:%f" % (correct / (correct + wrong)))
        self.logger.info("--------------------")
        return

Ⅴ、训练主流程 main.py

① 配置日志记录

logging.basicConfig():用于配置日志系统的基本设置,包括日志级别、输出格式、输出目标(如控制台或文件)等。它通常在程序初始化时调用一次。 

参数名类型描述
levelint设置日志级别,如 logging.DEBUGlogging.INFO 等。
formatstr设置日志输出格式,如 '%(asctime)s - %(levelname)s - %(message)s'
filenamestr设置日志输出到文件,指定文件名。
filemodestr设置文件打开模式,默认为 'a'(追加模式)。
handlerslist设置自定义的日志处理器。

logging.getLogger(): 用于获取一个日志记录器(Logger)对象。每个记录器都有一个名称(name),可以用来区分不同的日志记录器。

参数名类型描述
namestr日志记录器的名称。如果未提供或为 None,则返回根记录器(root logger)。
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

② 创建保存模型的目录

         检查指定的模型保存路径是否存在,如果不存在,则创建该路径作为目录。

os.path.isdir(): Python 中 os.path 模块的函数,用于检查指定的路径是否为一个存在的目录。如果路径存在且是一个目录,则返回 True,否则返回 False

参数名类型描述
pathstr表示文件系统路径的类路径对象(可以是相对路径或绝对路径)。

os.mkdir():Python 中 os 模块的函数,用于创建一个新的目录。如果目录已经存在或路径无效,会抛出 FileExistsError 或 OSError

参数名类型描述
pathstr要创建的目录路径。
modeint可选参数,设置目录的权限(八进制模式),默认为 0o777
    #创建保存模型的目录
    if not os.path.isdir(config["model_path"]):
        os.mkdir(config["model_path"])

③ 加载文件

torch.cuda.is_available():PyTorch 中用于检查当前系统是否支持 CUDA(即是否有可用的 GPU)的函数。

参数名类型描述
deviceint 或 str指定目标 GPU 设备,默认为 None(使用默认 GPU)。

cuda():PyTorch 张量或模型的方法,用于将张量或模型移动到 GPU 上。

    #加载训练数据
    train_data = load_data(config["train_data_path"], config)
    #加载模型
    model = SiameseNetwork(config)
    # 标识是否使用gpu
    cuda_flag = torch.cuda.is_available()
    if cuda_flag:
        logger.info("gpu可以使用,迁移模型至gpu")
        model = model.cuda()
    #加载优化器
    optimizer = choose_optimizer(config, model)
    #加载效果测试类
    evaluator = Evaluator(config, model, logger)

④ 训练的核心过程 ⭐
  1. 训练循环

    • for epoch in range(config["epoch"]):遍历每个训练周期(epoch),config["epoch"] 是训练的总周期数。
    • epoch += 1:将当前周期数加1,用于日志记录。
    • model.train():将模型设置为训练模式,启用Dropout和Batch Normalization等训练特定行为。
  2. 数据加载与处理

    • for index, batch_data in enumerate(train_data):遍历训练数据集的每个批次(batch)。
    • optimizer.zero_grad():清空优化器的梯度缓存,避免梯度累积。
    • if cuda_flag: batch_data = [d.cuda() for d in batch_data]:如果启用了CUDA(GPU加速),将数据移动到GPU上。
  3. 模型前向传播与损失计算

    • input_id1, input_id2, labels = batch_data:从批次数据中提取两个输入(input_id1 和 input_id2)以及对应的标签(labels)。
  4. 反向传播与优化

    • train_loss.append(loss.item()):将当前批次的损失值记录下来。
    • loss.backward():执行反向传播,计算梯度。
    • optimizer.step():更新模型参数,优化损失函数。
  5. 日志记录与评估

    • logger.info("epoch average loss: %f" % np.mean(train_loss)):记录当前周期的平均损失值。
    • evaluator.eval(epoch):调用评估器对模型进行评估,通常是在验证集上计算准确率或其他指标。

model.train():将模型设置为训练模式。在训练模式下,模型会启用 Dropout 和 Batch Normalization 等层的行为,以确保模型在训练时能够正常工作

enumerate():Python 的内置函数,用于在遍历可迭代对象时同时获取索引和值

参数名类型描述
iterable可迭代对象要遍历的对象(如列表、元组、字符串)。
startint索引的起始值,默认为 0

optimizer.zero_grad():将优化器中所有参数的梯度清零,避免梯度累积

cuda():将张量或模型移动到 GPU 上,以利用 GPU 的并行计算能力加速计算

参数名类型描述
deviceint 或 str指定目标 GPU 设备,默认为 None(使用默认 GPU)。

append():在列表末尾添加一个新元素

参数名类型描述
element任意类型要添加到列表末尾的元素。

item():将包含单个元素的张量转换为 Python 标量(如 int 或 float

loss.backward():计算损失函数对模型参数的梯度,用于反向传播

optimizer.step(): 根据计算出的梯度更新模型参数

    #训练
    for epoch in range(config["epoch"]):
        epoch += 1
        model.train()
        logger.info("epoch %d begin" % epoch)
        train_loss = []
        for index, batch_data in enumerate(train_data):
            optimizer.zero_grad()
            if cuda_flag:
                batch_data = [d.cuda() for d in batch_data]
            input_id1, input_id2, labels = batch_data   #输入变化时这里需要修改,比如多输入,多输出的情况
            loss = model(input_id1, input_id2, labels)
            train_loss.append(loss.item())
            # if index % int(len(train_data) / 2) == 0:
            #     logger.info("batch loss %f" % loss)
            loss.backward()
            optimizer.step()
        logger.info("epoch average loss: %f" % np.mean(train_loss))
        evaluator.eval(epoch)

⑤ 保存模型

os.path.join():用于将多个路径片段拼接成一个完整的路径,并自动根据操作系统选择正确的路径分隔符(如 Windows 使用 \,Linux 和 macOS 使用 /

参数名类型描述
pathstr初始路径片段。
*pathsstr需要拼接的后续路径片段,可以接受任意数量的参数。

torch.save():用于将 PyTorch 对象(如模型、张量、字典等)保存到磁盘文件中,通常用于保存模型的权重或训练状态

参数名类型描述
obj任意对象需要保存的对象,如模型、张量、字典等。
fstr 或文件对象保存的目标文件路径或文件对象。

model.state_dict(): 返回一个包含模型所有可学习参数(如权重和偏置)的有序字典,通常用于保存或加载模型参数

    model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)
    torch.save(model.state_dict(), model_path)

模型训练主程序 
# -*- coding: utf-8 -*-

import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import SiameseNetwork, choose_optimizer
from evaluate import Evaluator
from loader import load_data

logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

"""
模型训练主程序
"""

def main(config):
    #创建保存模型的目录
    if not os.path.isdir(config["model_path"]):
        os.mkdir(config["model_path"])
    #加载训练数据
    train_data = load_data(config["train_data_path"], config)
    #加载模型
    model = SiameseNetwork(config)
    # 标识是否使用gpu
    cuda_flag = torch.cuda.is_available()
    if cuda_flag:
        logger.info("gpu可以使用,迁移模型至gpu")
        model = model.cuda()
    #加载优化器
    optimizer = choose_optimizer(config, model)
    #加载效果测试类
    evaluator = Evaluator(config, model, logger)
    #训练
    for epoch in range(config["epoch"]):
        epoch += 1
        model.train()
        logger.info("epoch %d begin" % epoch)
        train_loss = []
        for index, batch_data in enumerate(train_data):
            optimizer.zero_grad()
            if cuda_flag:
                batch_data = [d.cuda() for d in batch_data]
            input_id1, input_id2, labels = batch_data   #输入变化时这里需要修改,比如多输入,多输出的情况
            loss = model(input_id1, input_id2, labels)
            train_loss.append(loss.item())
            # if index % int(len(train_data) / 2) == 0:
            #     logger.info("batch loss %f" % loss)
            loss.backward()
            optimizer.step()
        logger.info("epoch average loss: %f" % np.mean(train_loss))
        evaluator.eval(epoch)
    model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)
    torch.save(model.state_dict(), model_path)
    return

if __name__ == "__main__":
    main(Config)

Ⅵ、预测文件 predict.py

① 初始化预测器对象
  • 模型初始化
    • self.config = config:将传入的配置信息保存到类属性中。
    • self.model = model:将传入的模型保存到类属性中。
    • self.train_data = knwb_data:将知识库数据保存到类属性中。
  • 设备选择
    • if torch.cuda.is_available(): self.model = model.cuda():如果 GPU 可用,则将模型移动到 GPU 上。
    • else: self.model = model.cpu():如果 GPU 不可用,则将模型移动到 CPU 上。
  • 模型模式设置
    • self.model.eval():将模型设置为评估模式,禁用 Dropout 和 Batch Normalization 等训练特定行为。
  • 知识库向量化
    • self.knwb_to_vector():调用 knwb_to_vector 方法,将知识库数据转换为向量形式,通常用于后续的相似度计算或检索。

torch.cuda.is_available():检查当前系统是否支持 CUDA(即是否有可用的 NVIDIA GPU),并且 PyTorch 是否已编译为支持 CUDA。如果返回 True,则表示可以使用 GPU 加速计算;如果返回 False,则表示只能使用 CPU。

model.cuda():将模型从 CPU 移动到 GPU 上,以便利用 GPU 的并行计算能力加速训练和推理。

参数名类型描述
deviceint 或 str指定目标 GPU 设备,默认为 None(使用默认 GPU)

model.cpu(): 将模型从 GPU 移动回 CPU 上

model.eval():将模型设置为评估模式,禁用 Dropout 和 Batch Normalization 等层的随机行为。

class Predictor:
    def __init__(self, config, model, knwb_data):
        self.config = config
        self.model = model
        self.train_data = knwb_data
        if torch.cuda.is_available():
            self.model = model.cuda()
        else:
            self.model = model.cpu()
        self.model.eval()
        self.knwb_to_vector()

② 将知识库中的问题向量化
  1. 初始化映射和列表

    • self.question_index_to_standard_question_index = {}:创建一个空字典,用于记录问题编号到标准问题编号的映射。
    • self.question_ids = []:创建一个空列表,用于存储问题编号。
  2. 加载词汇表和模式

    • self.vocab = self.train_data.dataset.vocab:从训练数据集中加载词汇表。
    • self.schema = self.train_data.dataset.schema:从训练数据集中加载模式。
    • self.index_to_standard_question = dict((y, x) for x, y in self.schema.items()):创建一个字典,将索引映射到标准问题。
  3. 记录问题编号到标准问题编号的映射

    • 遍历知识库中的每个标准问题及其对应的问题编号。
    • 对于每个问题编号,记录其到标准问题编号的映射,并将其添加到 self.question_ids 列表中。
  4. 问题向量化

    • with torch.no_grad()::禁用梯度计算,因为这是推理阶段,不需要更新模型参数。
    • question_matrixs = torch.stack(self.question_ids, dim=0):将问题编号列表堆叠成一个矩阵。
    • if torch.cuda.is_available(): question_matrixs = question_matrixs.cuda():如果 GPU 可用,则将矩阵移动到 GPU 上。
    • self.knwb_vectors = self.model(question_matrixs):通过模型将问题矩阵转换为向量。
    • self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1):对所有向量进行归一化处理,使得每个向量的长度为 1。

dict():用于创建字典

参数名类型描述
**kwargs关键字参数关键字参数会被视为字典中的键值对。
mapping映射对象映射对象中的元素会被复制到新创建的字典中。
iterable可迭代对象可迭代对象的元素通常是长度为二的元组,每个元组的第一个元素作为键,第二个元素作为值。

items():返回字典中所有键值对的视图

列表.append():在列表末尾添加一个元素

参数名类型描述
item任意对象要添加到列表末尾的元素。

torch.no_grad():禁用梯度计算,适用于模型评估或推理阶段

torch.stack():沿指定维度连接张量序列,所有张量必须具有相同的大小

参数名类型描述
tensors张量序列需要连接的张量序列。
dimint指定连接的维度,必须在 0 和所需连接的张量维度之间。
outTensor输出张量,默认为 None

torch.cuda.is_available():检查当前系统是否支持 CUDA(即是否有可用的 NVIDIA GPU),并且 PyTorch 是否已编译为支持 CUDA。如果返回 True,则表示可以使用 GPU 加速计算;如果返回 False,则表示只能使用 CPU。

cuda():将张量从 CPU 移动到 GPU 上,以便利用 GPU 的并行计算能力加速训练和推理。

参数名类型描述
deviceint 或 str指定目标 GPU 设备,默认为 None(使用默认 GPU)

torch.nn.functional.normalize(): 对输入张量沿指定维度进行归一化

参数名类型描述
inputTensor输入张量。
pfloat计算范数的类型,默认为 2(L2 范数)。
dimint计算范数的维度。
epsfloat防止分母为 0 的小数,默认为 1e-12
outTensor输出张量,默认为 None
    #将知识库中的问题向量化,为匹配做准备
    #每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
    def knwb_to_vector(self):
        self.question_index_to_standard_question_index = {}
        self.question_ids = []
        self.vocab = self.train_data.dataset.vocab
        self.schema = self.train_data.dataset.schema
        self.index_to_standard_question = dict((y, x) for x, y in self.schema.items())
        for standard_question_index, question_ids in self.train_data.dataset.knwb.items():
            for question_id in question_ids:
                #记录问题编号到标准问题标号的映射,用来确认答案是否正确
                self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_index
                self.question_ids.append(question_id)
        with torch.no_grad():
            question_matrixs = torch.stack(self.question_ids, dim=0)
            if torch.cuda.is_available():
                question_matrixs = question_matrixs.cuda()
            self.knwb_vectors = self.model(question_matrixs)
            #将所有向量都作归一化 v / |v|
            self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
        return

③ 文本编码
  • 根据配置中的 vocab_path 值,选择不同的分词方式:
    • 如果 vocab_path 为 "words.txt",则使用 jieba 对文本进行分词,并将每个分词结果映射到词汇表(self.vocab)中的 ID。
    • 否则,将文本按字符逐个映射到词汇表中的 ID。
  • 如果词汇表中不存在某个词或字符,则使用 "[UNK]"(未知词)的 ID 作为默认值。

jieba.cut():是 jieba 库中的函数,用于对中文文本进行分词。它支持多种分词模式,包括精确模式、全模式和搜索引擎模式。

参数名类型描述
sentencestr需要分词的中文字符串。
cut_allbool是否使用全模式,默认为 False(精确模式)。
HMMbool是否使用隐马尔可夫模型(HMM)进行新词发现,默认为 True

字典.get():Python 字典对象的方法,用于安全地获取字典中指定键的值。如果键不存在,返回默认值(默认为 None),而不会引发 KeyError 异常。

参数名类型描述
key任意类型需要检索的键。
default任意类型如果键不存在时返回的默认值,默认为 None

列表.append():在列表末尾添加一个元素

参数名类型描述
item任意对象要添加到列表末尾的元素。
    def encode_sentence(self, text):
        input_id = []
        if self.config["vocab_path"] == "words.txt":
            for word in jieba.cut(text):
                input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
        else:
            for char in text:
                input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
        return input_id

④ 匹配相似的标准问题
  1. 输入处理

    • input_id = self.encode_sentence(sentence):调用 encode_sentence 方法将输入的句子转换为 ID 序列。
    • input_id = torch.LongTensor([input_id]):将 ID 序列转换为 PyTorch 的 LongTensor 类型,并增加一个维度以适配模型输入。
  2. 设备选择

    • if torch.cuda.is_available(): input_id = input_id.cuda():如果 GPU 可用,则将输入数据移动到 GPU 上。
  3. 模型推理

    • with torch.no_grad()::禁用梯度计算,因为这是推理阶段,不需要更新模型参数。
    • test_question_vector = self.model(input_id):将输入数据传递给模型,生成句子的向量表示。
    • res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T):计算输入句子向量与知识库中所有问题向量的相似度(通过矩阵乘法)。
    • hit_index = int(torch.argmax(res.squeeze())):找到相似度最高的知识库问题的索引。
    • hit_index = self.question_index_to_standard_question_index[hit_index]:将命中问题的索引转换为标准问题的索引。

torch.LongTensor(): PyTorch 中的一个函数,用于创建一个包含整数(64 位整型)的张量。它是一种特定的张量类型,通常用于存储索引、标签或其他整数值数据。

参数名类型描述
data列表、元组或数组输入数据,用于创建张量。
dtypetorch.dtype张量的数据类型,默认为 torch.int64(64 位整数)。
devicetorch.device张量存储的设备(如 CPU 或 GPU),默认为 None(使用默认设备)。
requires_gradbool是否需要计算梯度,默认为 False

torch.cuda.is_available():检查当前系统是否支持 CUDA(即是否有可用的 NVIDIA GPU),并且 PyTorch 是否已编译为支持 CUDA。如果返回 True,则表示可以使用 GPU 加速计算;如果返回 False,则表示只能使用 CPU。

cuda():将张量从 CPU 移动到 GPU 上,以便利用 GPU 的并行计算能力加速训练和推理。

参数名类型描述
deviceint 或 str指定目标 GPU 设备,默认为 None(使用默认 GPU)

torch.no_grad():禁用梯度计算,适用于模型评估或推理阶段 

squeeze():是 PyTorch 和 NumPy 中的函数,用于移除张量或数组中大小为 1 的维度。

参数名类型描述
inputTensor输入张量。
dimint可选参数,指定要移除的维度。

unsqueeze(): 是 PyTorch 中的函数,用于在指定维度上插入一个大小为 1 的维度,从而改变张量的形状。

参数名类型描述
inputTensor输入张量。
dimint要插入新维度的位置,范围是 [-input.dim()-1, input.dim()]

torch.argmax():PyTorch 中的函数,用于返回张量中最大值所在的索引。

参数名类型描述
inputTensor输入张量。
dimint指定沿哪个维度计算最大值索引。
keepdimbool是否保持输出张量的维度,默认为 False
    def predict(self, sentence):
        input_id = self.encode_sentence(sentence)
        input_id = torch.LongTensor([input_id])
        if torch.cuda.is_available():
            input_id = input_id.cuda()
        with torch.no_grad():
            test_question_vector = self.model(input_id) #不输入labels,使用模型当前参数进行预测
            res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
            hit_index = int(torch.argmax(res.squeeze())) #命中问题标号
            hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号 
        return  self.index_to_standard_question[hit_index]

⑤ 实现与用户的交互问答

torch.load():PyTorch 中用于加载由 torch.save() 保存的模型、张量或其他对象的函数。它可以将保存在文件中的数据加载到程序中,以便进行推理、继续训练或其他操作

参数名类型/值说明
f类文件对象或字符串类文件对象(必须实现 read()readline()tell() 和 seek()),或包含文件名的字符串
map_location函数、torch.device、字符串或字典指定如何重新映射存储位置。例如,将 GPU 保存的模型加载到 CPU 上
pickle_module模块(默认:pickle用于反序列化的模块,必须与序列化时使用的模块匹配。
**pickle_load_args可选关键字参数

传递给 

pickle_module.load() 和 pickle_module.Unpickler() 

的可选参数

input():Python 内置函数,用于从标准输入(通常是键盘)读取用户输入的数据。它会将用户输入的内容作为字符串返回。

参数名类型说明
prompt字符串可选参数,用于在等待用户输入时显示的提示信息。如果未提供,则不显示任何提示。
# -*- coding: utf-8 -*-
import torch
from loader import load_data
from config import Config
from model import SiameseNetwork, choose_optimizer

"""
模型效果测试
"""

class Predictor:
    def __init__(self, config, model, knwb_data):
        self.config = config
        self.model = model
        self.train_data = knwb_data
        if torch.cuda.is_available():
            self.model = model.cuda()
        else:
            self.model = model.cpu()
        self.model.eval()
        self.knwb_to_vector()

    #将知识库中的问题向量化,为匹配做准备
    #每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
    def knwb_to_vector(self):
        self.question_index_to_standard_question_index = {}
        self.question_ids = []
        self.vocab = self.train_data.dataset.vocab
        self.schema = self.train_data.dataset.schema
        self.index_to_standard_question = dict((y, x) for x, y in self.schema.items())
        for standard_question_index, question_ids in self.train_data.dataset.knwb.items():
            for question_id in question_ids:
                #记录问题编号到标准问题标号的映射,用来确认答案是否正确
                self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_index
                self.question_ids.append(question_id)
        with torch.no_grad():
            question_matrixs = torch.stack(self.question_ids, dim=0)
            if torch.cuda.is_available():
                question_matrixs = question_matrixs.cuda()
            self.knwb_vectors = self.model(question_matrixs)
            #将所有向量都作归一化 v / |v|
            self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
        return

    def encode_sentence(self, text):
        input_id = []
        if self.config["vocab_path"] == "words.txt":
            for word in jieba.cut(text):
                input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
        else:
            for char in text:
                input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
        return input_id

    def predict(self, sentence):
        input_id = self.encode_sentence(sentence)
        input_id = torch.LongTensor([input_id])
        if torch.cuda.is_available():
            input_id = input_id.cuda()
        with torch.no_grad():
            test_question_vector = self.model(input_id) #不输入labels,使用模型当前参数进行预测
            res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
            hit_index = int(torch.argmax(res.squeeze())) #命中问题标号
            hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号 
        return  self.index_to_standard_question[hit_index]

if __name__ == "__main__":
    knwb_data = load_data(Config["train_data_path"], Config)
    model = SiameseNetwork(Config)
    model.load_state_dict(torch.load("model_output/epoch_10.pth"))
    pd = Predictor(Config, model, knwb_data)

    while True:
        # sentence = "固定宽带服务密码修改"
        sentence = input("请输入问题:")
        res = pd.predict(sentence)
        print(res)


二、实现方式 ② 交互型文本匹配

同时输入两句话,对两句话之间进行分割,然后同时送入这个模型,然后过模型后取出【CLS】token 位置对应的向量送入这个模型,过模型后取出【CLS】token 位置对应的向量做一个二分类

Bert中进行匹配这两句话判断是否是上下文关系,而文本匹配模型中,将数据换为文本匹配的数据,最终匹配两句话的语义相似度

表示型文本匹配:两句话分别单独进入这个模型,两句话分别编码,在编码的过程中互不影响

交互式文本匹配:两句话一同送入这个模型,计算交互时与self-attention计算类似,对比可以看到另一句话的信息,对比发现两句话的主要区别,例:今天下了        今天下

representation layer:表示层输出是否匹配,二分类任务

interaction layer:交互层进行信息融合,常以attention的方式

Embedding layer:嵌入层,权重共享

将两句话同时输入这个模型,让模型看到,让模型进行对比发现文本的重点


1.交互型文本匹配 —— 代码示例🚀

Ⅰ、配置文件 config.py

① 路径相关参数

model_path:指定训练后模型的保存路径。若加载预训练模型,需提供模型文件的具体存储位置

schema_path:定义数据结构的配置文件路径,通常用于数据预处理或验证输入格式(如JSON/YAML文件)

train_data_path:指向训练集的数据文件或目录。若为目录,需包含多个训练文件(如文本、图像等)

valid_data_path:指向验证集的数据文件或目录。若为目录,需包含多个训练文件(如文本、图像等)

vocab_path:词表文件路径,用于自然语言处理任务中定义词汇的映射关系(如将单词转为ID)

② 模型结构参数 

max_length:序列数据的最大长度(如文本的单词数)。超过此长度的序列会被截断或填充

hidden_size:神经网络隐藏层的维度大小,影响模型的表达能力。例如,LSTM或Transformer中每层的神经元数量

③ 训练控制参数

epoch:训练过程中遍历整个数据集的次数。适当增加轮次可提升模型性能,但可能过拟合

batch_size:每次输入模型的样本数量。较大的批次可加速训练,但需更多显存

epoch_data_size:每轮训练中采样的数量

positive_sample_rate:正样本在批次中的占比,常用于不平衡数据任务(如分类)。需结合负样本比例调整

④ 优化和学习参数

optimizer:选择优化算法(如SGD、Adam、AdamW),影响参数更新策略

learning_rate:控制参数更新的步长。学习率过高可能导致震荡,过低则收敛缓慢

# -*- coding: utf-8 -*-

"""
配置参数信息
"""

Config = {
    "model_path": "model_output",
    "schema_path": "../data/schema.json",
    "train_data_path": "../data/train.json",
    "valid_data_path": "../data/valid.json",
    "pretrain_model_path":r"F:\人工智能NLP\NLP资料\week6 语言模型\bert-base-chinese",
    "vocab_path":r"F:\人工智能NLP\NLP资料\week6 语言模型\bert-base-chinese\vocab.txt",
    "max_length": 20,
    "hidden_size": 256,
    "epoch": 10,
    "batch_size": 128,
    "epoch_data_size": 10000,     #每轮训练中采样数量
    "positive_sample_rate":0.5,  #正样本比例
    "optimizer": "adam",
    "learning_rate": 1e-3,
}

Ⅱ、数据加载 loader.py

① 设置日志级别

logging.getLogger():Python 中 logging 模块的核心函数,用于获取或创建一个日志记录器(Logger)实例。每个日志记录器都有一个唯一的名称,用于标识和配置日志记录行为。如果未提供名称,则返回根记录器

参数名类型说明
name字符串可选参数,指定日志记录器的名称。如果未提供或为 None,则返回根记录器。

setLevel():Logger 对象的方法,用于设置日志记录器的日志级别。只有等于或高于该级别的日志消息才会被处理,低于该级别的日志消息将被忽略

参数名类型说明
level整数指定日志级别,常用值包括 logging.DEBUGlogging.INFOlogging.WARNINGlogging.ERROR 和 logging.CRITICAL
logging.getLogger("transformers").setLevel(logging.ERROR)

②  初始化 def __init__()

self.config = config将传入的配置字典保存到实例变量中。

self.path = data_path将数据路径保存到实例变量中。

self.tokenizer = load_vocab(config["vocab_path"])加载词汇表文件,初始化分词器。config["vocab_path"] 是词汇表文件的路径。

self.config["vocab_size"] = len(self.tokenizer.vocab)计算词汇表的大小,并将其更新到配置字典中。

self.schema = load_schema(config["schema_path"])加载模式文件(如数据格式定义),config["schema_path"] 是模式文件的路径。

self.train_data_size = config["epoch_data_size"]设置每个 epoch 的采样数据量。由于采用随机采样,需要指定一个固定的采样数量,否则可以一直采样self.max_length = config["max_length"]**:设置输入序列的最大长度。

self.data_type = None用于标识加载的数据类型是训练集还是测试集,初始值为 None,后续可以通过方法设置为 "train" 或 "test"

self.load()调用 load 方法,加载数据或执行其他初始化操作。

    def __init__(self, data_path, config):
        self.config = config
        self.path = data_path
        self.tokenizer = load_vocab(config["vocab_path"])
        self.config["vocab_size"] = len(self.tokenizer.vocab)
        self.schema = load_schema(config["schema_path"])
        self.train_data_size = config["epoch_data_size"] #由于采取随机采样,所以需要设定一个采样数量,否则可以一直采
        self.max_length = config["max_length"]
        self.data_type = None  #用来标识加载的是训练集还是测试集 "train" or "test"
        self.load()

③  从指定路径文件加载数据
  • 打开文件(self.path),逐行读取并解析为 JSON 对象。
  • 根据数据的类型(字典或列表)进行分类:
    • 训练集:如果数据是字典类型,提取 questions 和 target,将问题存储到 self.knwb 中,键为标签(通过 self.schema 映射)。
    • 测试集:如果数据是列表类型,提取问题和标签,将标签转换为 torch.LongTensor 并存储到 self.data 中。

defaultdict():是 collections 模块中的一个类,继承自 dict。它允许在访问不存在的键时返回一个默认值,而不是抛出 KeyError 异常。

参数名类型说明
default_factory可调用对象指定默认值的生成函数,如 listintset 等。如果未提供,默认为 None

json.loads():将 JSON 格式的字符串解析为 Python 对象(如字典、列表等)。

参数名类型说明
s字符串要解析的 JSON 字符串。
encoding字符串字符编码(Python 3 中已弃用)。
cls自定义解码类,默认为 None
object_hook函数用于自定义 JSON 对象转换为 Python 对象的函数。
parse_float函数自定义将 JSON 中的浮点数转换为特定类型的函数。
parse_int函数自定义将 JSON 中的整数转换为特定类型的函数。
parse_constant函数自定义将 JSON 中的常量(如 Infinity)转换为特定类型的函数。
object_pairs_hook函数用于处理 JSON 对象中的键值对的函数,默认返回字典。

isinstance():检查一个对象是否是指定类型或其子类的实例。

参数名类型说明
object对象需要检查的对象。
classinfo类型或元组可以是一个类型(如 intstr 等),也可以是这些类型的元组。

append():在列表的末尾添加一个元素。

参数名类型说明
element任意类型要添加到列表末尾的元素。

assert:用于调试,检查一个条件是否为真。如果条件为假,则抛出 AssertionError 异常。

参数名类型说明
condition布尔表达式需要检查的条件。
message字符串可选参数,当条件为假时抛出的错误信息。

torch.LongTensor():创建一个包含整数的张量(Tensor),元素类型为 64 位整数

参数名类型说明
data列表或数组用于初始化张量的数据。
dtype类型指定张量的数据类型,默认为 torch.long
device设备指定张量存储的设备(如 cpu 或 gpu),默认为 cpu
requires_grad布尔值指定是否需要计算张量的梯度,默认为 False
    def load(self):
        self.data = []
        self.knwb = defaultdict(list)
        with open(self.path, encoding="utf8") as f:
            for line in f:
                line = json.loads(line)
                #加载训练集
                if isinstance(line, dict):
                    self.data_type = "train"
                    questions = line["questions"]
                    label = line["target"]
                    for question in questions:
                        self.knwb[self.schema[label]].append(question)
                #加载测试集
                else:
                    self.data_type = "test"
                    assert isinstance(line, list)
                    question, label = line
                    label_index = torch.LongTensor([self.schema[label]])
                    self.data.append([question, label_index])
        return

④  拼接两句话转成向量
  1. 拼接与编码

    • 使用 self.tokenizer.encode 方法对 text1 和 text2 进行拼接和编码。
    • 拼接后的文本会被转换为模型可接受的输入格式(如 input_ids)。
  2. 参数说明

    • truncation='longest_first':如果拼接后的文本长度超过 max_length,则从较长的部分开始截断。
    • max_length=self.max_length:指定编码后的最大长度,超过的部分会被截断。
    • padding='max_length':如果拼接后的文本长度不足 max_length,则用填充符(如 [PAD])补齐。
  3. 返回结果

    • 返回编码后的 input_id,通常是一个包含 token ID 的列表或张量。

tokenizer.encode():transformers 库中 Tokenizer 类的一个方法,用于将文本转换为模型可接受的 token ID 序列。它通常用于自然语言处理任务中,将输入文本编码为模型输入格式。

参数名类型说明
textstr 或 List[str]需要编码的文本,可以是单个字符串或字符串列表。
text_pairstr 或 List[str]可选参数,第二个文本(如句子对任务中的第二句话)。
add_special_tokensbool是否添加特殊标记(如 [CLS] 和 [SEP]),默认为 True
max_lengthint指定编码后的最大长度,超过的部分会被截断。
truncationbool 或 str指定截断策略,如 'longest_first''only_first' 或 'only_second'
paddingbool 或 str指定填充策略,如 'max_length' 或 'longest'
return_tensorsstr指定返回的 tensor 类型,如 'pt'(PyTorch)或 'tf'(TensorFlow)。
return_token_type_idsbool是否返回 token_type_ids(用于区分句子对任务中的两个句子)。
return_attention_maskbool是否返回 attention_mask(用于标识有效 token)。
    #每次加载两个文本,输出他们的拼接后编码
    def encode_sentence(self, text1, text2):
        input_id = self.tokenizer.encode(text1, text2,
                                         truncation='longest_first',
                                         max_length=self.max_length,
                                         padding='max_length',
                                         )
        return input_id

⑤ 返回数据集的长度 def __len__()

该函数的作用是返回对象的“长度”,具体行为取决于对象的 data_type 属性。

如果 data_type 为 "train",则返回 self.config["epoch_data_size"] 的值,这通常表示训练数据的规模。

如果 data_type 为 "test",则返回 self.data 的长度,即测试数据的元素个数。

如果 data_type 不是 "train" 或 "test",则会触发 AssertionError,并提示 self.data_type 的值。

assert:Python 中的调试工具,用于检查某个条件是否为真。如果条件为假,assert 会抛出 AssertionError 异常。 

len(): Python 内置函数,用于返回对象的长度或元素数量。它适用于多种数据类型,包括字符串、列表、元组、字典、集合等。

参数名类型描述
obj可迭代对象需要计算长度的对象,如字符串、列表、元组、字典、集合等。
    def __len__(self):
        if self.data_type == "train":
            return self.config["epoch_data_size"]
        else:
            assert self.data_type == "test", self.data_type
            return len(self.data)

⑥ 通过索引访问数据集中的样本 def __getitem__()

判断数据类型: 如果是训练数据集,则调用self.random_train_sample()方法来随机生成一个训练样本。这是因为训练数据集中的样本是通过随机采样生成的,而不是固定的数据。

如果不是训练数据集,则认为是测试数据集:直接返回self.data列表中索引为index的样本。

    def __getitem__(self, index):
        if self.data_type == "train":
            return self.random_train_sample() #随机生成一个训练样本
        else:
            return self.data[index]

⑦ 随机生成训练样本 ⭐

正样本生成:从所有标准问题中随机选择一个标准问题 p。如果该标准问题下至少有两个问题,则从中随机选取两个问题 s1 和 s2。使用 encode_sentence 方法将 s1 和 s2 编码为模型输入格式,并返回编码结果及标签 1(表示正样本)。如果标准问题下不足两个问题,则重新随机选择。

负样本生成:从所有标准问题中随机选择两个不同的标准问题 p 和 n。分别从 p 和 n 中随机选取一个问题 s1 和 s2。使用 encode_sentence 方法将 s1 和 s2 编码为模型输入格式,并返回编码结果及标签 0(表示负样本)。

概率控制:使用 random.random() 和 self.config["positive_sample_rate"] 控制生成正样本的概率。

list():将可迭代对象(如字符串、元组、集合等)转换为列表。

参数名类型说明
iterable可迭代对象需要转换为列表的对象,如字符串、元组、集合等。如果未提供,则返回空列表。

字典.keys():返回字典中所有键的视图对象(dict_keys),可以转换为列表

random.random():生成一个范围在 [0.0, 1.0) 之间的随机浮点数。

random.choice():从非空序列(如列表、元组、字符串等)中随机选择一个元素。

参数名类型说明
seq序列非空序列,如列表、元组、字符串等。

random.sample():从序列中随机选择指定数量的唯一元素,返回一个新列表。

参数名类型说明
population序列需要从中选择的序列,如列表、元组、字符串等。
k整数需要选择的元素数量。

torch.LongTensor():创建一个包含 64 位整数的张量(Tensor)

参数名类型说明
data列表或数组用于初始化张量的数据,如列表或数组。
    #依照一定概率生成负样本或正样本
    #负样本从随机两个不同的标准问题中各随机选取一个
    #正样本从随机一个标准问题中随机选取两个
    def random_train_sample(self):
        standard_question_index = list(self.knwb.keys())
        #随机正样本
        if random.random() <= self.config["positive_sample_rate"]:
            p = random.choice(standard_question_index)
            #如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
            if len(self.knwb[p]) < 2:
                return self.random_train_sample()
            else:
                s1, s2 = random.sample(self.knwb[p], 2)
                input_ids = self.encode_sentence(s1, s2)
                input_ids = torch.LongTensor(input_ids)
                return [input_ids, torch.LongTensor([1])]
        #随机负样本
        else:
            p, n = random.sample(standard_question_index, 2)
            s1 = random.choice(self.knwb[p])
            s2 = random.choice(self.knwb[n])
            input_ids = self.encode_sentence(s1, s2)
            input_ids = torch.LongTensor(input_ids)
            return [input_ids, torch.LongTensor([0])]

⑧ 数据处理

加载词汇表:

        ​加载词汇表:使用 vocab_path 参数指定词汇表文件的路径。通过 BertTokenizer 类加载词汇表,并初始化分词器。​

        返回分词器:返回初始化好的 BertTokenizer 分词器,用于后续的文本编码和分词操作。

#加载字表或词表
def load_vocab(vocab_path):
    tokenizer = BertTokenizer(vocab_path)
    return tokenizer

加载schema:

        ​加载模式文件:使用 schema_path 参数指定模式文件的路径。打开文件并读取其内容。

        解析 JSON 数据:使用 json.loads 函数将文件内容解析为 Python 对象(通常是字典)。

        返回解析结果:返回解析后的 Python 对象,用于后续的处理或配置。

open():用于打开文件,并返回一个文件对象,以便进行读取或写入操作。

参数名类型说明
file字符串文件路径,可以是相对路径或绝对路径。
mode字符串打开文件的模式,如 'r'(只读)、'w'(只写)、'a'(追加)等。
encoding字符串文件编码格式,如 'utf-8'
errors字符串指定编码错误处理方式,如 'ignore' 或 'strict'
newline字符串控制换行符的行为,如 None(默认)或 '\n'
closefd布尔值是否在关闭文件时关闭文件描述符,默认为 True
opener函数自定义文件打开器。

json.loads():将 JSON 格式的字符串解析为 Python 对象(如字典、列表等)。

参数名类型说明
s字符串要解析的 JSON 字符串。
encoding字符串字符编码(Python 3 中已弃用)。
cls自定义解码类,默认为 None
object_hook函数自定义将 JSON 对象转换为其他类型的 Python 对象。
parse_float函数自定义将 JSON 中的浮点数转换为特定类型。
parse_int函数自定义将 JSON 中的整数转换为特定类型。

文件对象.read(): 从文件中读取指定数量的字节或整个文件内容。

参数名类型说明
size整数要读取的字节数。如果未指定或为负数,则读取整个文件。
#加载schema
def load_schema(schema_path):
    with open(schema_path, encoding="utf8") as f:
        return json.loads(f.read())

封装数据加载器:

        创建数据生成器:使用 DataGenerator 类(假设已定义)从 data_path 和 config 中加载数据,并生成数据集对象 dg。​

        封装为 DataLoader:使用 DataLoader 将 dg 封装为可迭代的数据加载器 dlbatch_size 从 config 中获取,用于指定每个批次的样本数量。shuffle 参数控制是否在每个训练周期开始时打乱数据顺序。        

         返回 DataLoader:返回封装好的 DataLoader 对象 dl,用于后续的训练或验证。

DataLoader():PyTorch 中的一个标准工具,用于高效地加载和处理数据。它支持批量加载、数据打乱、多线程加载等功能,是深度学习训练中常用的数据加载器。

参数名类型描述
dataset数据集对象要加载的数据集,必须实现 __len__ 和 __getitem__ 方法。
batch_size整数每个批次的大小,默认为 1
shuffle布尔值是否在每个 epoch 开始时打乱数据顺序,默认为 False
sampler采样器对象自定义采样器,用于控制数据的采样方式,默认为 None
batch_sampler批次采样器对象自定义批次采样器,用于控制批次的采样方式,默认为 None
num_workers整数用于数据加载的子进程数量,默认为 0(主进程加载)。
collate_fn函数用于将一个批次的数据合并成一个张量或元组,默认为 None
pin_memory布尔值是否将数据存储在 pin memory 中(用于 GPU 加速),默认为 False
drop_last布尔值如果数据不能完全分成批次,是否删除最后一个不完整的批次,默认为 False
timeout整数数据加载的最大等待时间(秒),默认为 0(无限制)。
worker_init_fn函数用于初始化每个数据加载器子进程的函数,默认为 None
#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
    dg = DataGenerator(data_path, config)
    dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
    return dl

⑨ 加载数据文件
# -*- coding: utf-8 -*-

import json
import re
import os
import torch
import random
import logging
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
from transformers import BertTokenizer
"""
数据加载
"""

logging.getLogger("transformers").setLevel(logging.ERROR)

class DataGenerator:
    def __init__(self, data_path, config):
        self.config = config
        self.path = data_path
        self.tokenizer = load_vocab(config["vocab_path"])
        self.config["vocab_size"] = len(self.tokenizer.vocab)
        self.schema = load_schema(config["schema_path"])
        self.train_data_size = config["epoch_data_size"] #由于采取随机采样,所以需要设定一个采样数量,否则可以一直采
        self.max_length = config["max_length"]
        self.data_type = None  #用来标识加载的是训练集还是测试集 "train" or "test"
        self.load()

    def load(self):
        self.data = []
        self.knwb = defaultdict(list)
        with open(self.path, encoding="utf8") as f:
            for line in f:
                line = json.loads(line)
                #加载训练集
                if isinstance(line, dict):
                    self.data_type = "train"
                    questions = line["questions"]
                    label = line["target"]
                    for question in questions:
                        self.knwb[self.schema[label]].append(question)
                #加载测试集
                else:
                    self.data_type = "test"
                    assert isinstance(line, list)
                    question, label = line
                    label_index = torch.LongTensor([self.schema[label]])
                    self.data.append([question, label_index])
        return

    #每次加载两个文本,输出他们的拼接后编码
    def encode_sentence(self, text1, text2):
        input_id = self.tokenizer.encode(text1, text2,
                                         truncation='longest_first',
                                         max_length=self.max_length,
                                         padding='max_length',
                                         )
        return input_id

    def __len__(self):
        if self.data_type == "train":
            return self.config["epoch_data_size"]
        else:
            assert self.data_type == "test", self.data_type
            return len(self.data)

    def __getitem__(self, index):
        if self.data_type == "train":
            return self.random_train_sample() #随机生成一个训练样本
        else:
            return self.data[index]

    #依照一定概率生成负样本或正样本
    #负样本从随机两个不同的标准问题中各随机选取一个
    #正样本从随机一个标准问题中随机选取两个
    def random_train_sample(self):
        standard_question_index = list(self.knwb.keys())
        #随机正样本
        if random.random() <= self.config["positive_sample_rate"]:
            p = random.choice(standard_question_index)
            #如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
            if len(self.knwb[p]) < 2:
                return self.random_train_sample()
            else:
                s1, s2 = random.sample(self.knwb[p], 2)
                input_ids = self.encode_sentence(s1, s2)
                input_ids = torch.LongTensor(input_ids)
                return [input_ids, torch.LongTensor([1])]
        #随机负样本
        else:
            p, n = random.sample(standard_question_index, 2)
            s1 = random.choice(self.knwb[p])
            s2 = random.choice(self.knwb[n])
            input_ids = self.encode_sentence(s1, s2)
            input_ids = torch.LongTensor(input_ids)
            return [input_ids, torch.LongTensor([0])]



#加载字表或词表
def load_vocab(vocab_path):
    tokenizer = BertTokenizer(vocab_path)
    return tokenizer

#加载schema
def load_schema(schema_path):
    with open(schema_path, encoding="utf8") as f:
        return json.loads(f.read())

#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
    dg = DataGenerator(data_path, config)
    dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
    return dl



if __name__ == "__main__":
    from config import Config
    dg = DataGenerator("../data/valid.json", Config)

Ⅲ、模型定义 model.py

① 从输入张量提取第一个元素并返回
class GetFirst(nn.Module):
    def __init__(self):
        super(GetFirst, self).__init__()

    def forward(self, x):
        return x[0]

② 文本匹配网络初始化 

输入:输入是一个词索引序列,形状为 (batch_size, max_len),其中 max_len 是序列的最大长度。​

嵌入层:将词索引映射为稠密向量,输出形状为 (batch_size, max_len, hidden_size)

LSTM 编码器:双向 LSTM 编码输入序列,输出形状为 (batch_size, max_len, hidden_size * 2)GetFirst() 提取第一个时间步的隐藏状态,输出形状为 (batch_size, hidden_size * 2)。线性层和 ReLU 激活函数进一步处理,输出形状为 (batch_size, hidden_size)

分类层:将隐藏层的输出映射为 2 维向量,输出形状为 (batch_size, 2)

损失计算:使用交叉熵损失函数计算模型预测值与真实标签之间的损失。

nn.Embedding():将离散的整数(如单词索引)映射到低维的稠密向量空间,常用于自然语言处理任务。

参数名类型说明
num_embeddingsint嵌入字典的大小,即词汇表的大小。
embedding_dimint每个嵌入向量的维度。
padding_idxint, optional指定一个索引,用于填充(padding),在计算梯度时不会被更新。
max_normfloat, optional如果设置,嵌入向量的范数会被裁剪到不超过该值。
norm_typefloat, optional用于裁剪范数的类型,默认为 2(L2 范数)。
scale_grad_by_freqbool, optional如果为 True,梯度会根据单词在 mini-batch 中的频率进行缩放。
sparsebool, optional如果为 True,梯度将会是稀疏张量。

nn.Sequential():按顺序组织多个神经网络层或模块,简化模型定义。

参数名类型说明
*argsModule按顺序传入多个模块(如层或激活函数)。

nn.ReLU():修正线性单元激活函数,将负值置为 0,正值保持不变,用于引入非线性。

参数名类型说明
inplacebool, optional如果为 True,会直接修改输入数据,节省内存。默认为 False。

nn.Linear():全连接层,对输入进行线性变换(矩阵乘法和偏置加法)。

参数名类型说明
in_featuresint输入特征维度。
out_featuresint输出特征维度。
biasbool, optional是否添加偏置项,默认为 True。

nn.CrossEntropyLoss():用于多分类任务的损失函数,结合了nn.LogSoftmax() 和 nn.NLLLoss()

参数名类型说明
weightTensor, optional类别权重,用于处理类别不平衡问题。
ignore_indexint, optional指定一个索引,忽略该类别的损失计算。
reductionstr, optional指定损失计算的方式,如 'mean''sum' 或 'none'。默认为 'mean'
def __init__(self, config):
        super(SentenceMatchNetwork, self).__init__()
        # 可以用bert,参考下面
        # pretrain_model_path = config["pretrain_model_path"]
        # self.bert_encoder = BertModel.from_pretrained(pretrain_model_path)

        # 常规的embedding + layer
        hidden_size = config["hidden_size"]
        #20000应为词表大小,这里借用bert的词表,没有用它精确的数字,因为里面有很多无用词,舍弃一部分,不影响效果
        self.embedding = nn.Embedding(20000, hidden_size)
        #一种多层按顺序执行的写法,具体的层可以换
        #unidirection:batch_size, max_len, hidden_size
        #bidirection:batch_size, max_len, hidden_size * 2
        self.encoder = nn.Sequential(nn.LSTM(hidden_size, hidden_size, bidirectional=True, batch_first=True),
                                     GetFirst(),
                                     nn.ReLU(),
                                     nn.Linear(hidden_size * 2, hidden_size), #batch_size, max_len, hidden_size
                                     nn.ReLU(),
                                     )
        self.classify_layer = nn.Linear(hidden_size, 2)
        self.loss = nn.CrossEntropyLoss()

③  计算句子间相似度

输入:接收 input_ids(两个句子的拼接编码)和可选的 target(标签)。

嵌入层:将 input_ids 转换为稠密向量 x。​

编码器:通过 LSTM、ReLU 和线性层处理 x,得到编码后的输出。

池化:对编码后的输出进行最大池化,提取特征。​

分类层:将池化后的输出映射为 2 维向量,表示匹配和不匹配的得分。

输出:如果有 target,计算交叉熵损失并返回。如果没有 target,返回两句话匹配的概率。

squeeze():移除张量中所有大小为 1 的维度,或根据需要移除特定维度。例如,形状为 (1,3,1,4) 的张量,经过 squeeze() 后将变为 (3,4)。

参数名类型说明
inputTensor输入张量。
dimint, optional指定要移除的维度。如果未指定,则移除所有大小为 1 的维度。

torch.softmax(): 将输入张量的元素转换为概率分布,适用于多分类任务。输出的每个元素值在 (0,1) 之间,且所有元素的和为 1。

参数名类型说明
inputTensor输入张量。
dimint指定在哪个维度上计算 Softmax。例如,dim=1 表示对每一行计算 Softmax。
dtypedtype, optional输出张量的数据类型。
    # 同时传入两个句子的拼接编码
    # 输出一个相似度预测,不匹配的概率
    def forward(self, input_ids, target=None):
        # x = self.bert_encoder(input_ids)[1]
        #input_ids = batch_size, max_length
        x = self.embedding(input_ids) #x:batch_size, max_length, embedding_size
        x = self.encoder(x) #
        #x: batch_size, max_len, hidden_size
        x = nn.MaxPool1d(x.shape[1])(x.transpose(1,2)).squeeze()
        #x: batch_size, hidden_size
        x = self.classify_layer(x)
        #x: batch_size, 2
        #如果有标签,则计算loss
        if target is not None:
            return self.loss(x, target.squeeze())
        #如果无标签,预测相似度
        else:
            return torch.softmax(x, dim=-1)[:, 1] #如果改为x[:,0]则是两句话不匹配的概率

 根据配置选择优化器

        根据用户在配置字典中指定的优化器类型("adam" 或 "sgd")来选择并初始化相应的优化器,并设置学习率。

model.parameters():PyTorch 中 torch.nn.Module 类的一个方法,用于获取模型中所有可训练参数的迭代器。这些参数通常是模型的权重和偏置,它们会在训练过程中通过优化器进行更新。

def choose_optimizer(config, model):
    optimizer = config["optimizer"]
    learning_rate = config["learning_rate"]
    if optimizer == "adam":
        return Adam(model.parameters(), lr=learning_rate)
    elif optimizer == "sgd":
        return SGD(model.parameters(), lr=learning_rate)

⑨ 加载数据文件

torch.LongTensor(): PyTorch 中的一个函数,用于创建一个包含整数(64 位整型)的张量。它主要用于处理整数类型的数据,例如索引、标签或其他整数值。

参数名类型说明
datalist, tuple, numpy array输入数据,可以是列表、元组或 NumPy 数组,包含整数值。
devicetorch.device, optional指定张量存储的设备(如 CPU 或 GPU)。默认为 CPU。
requires_gradbool, optional是否需要对张量计算梯度。默认为 False
# -*- coding: utf-8 -*-

import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from transformers import BertModel, BertConfig

"""
建立网络模型结构
"""

class GetFirst(nn.Module):
    def __init__(self):
        super(GetFirst, self).__init__()

    def forward(self, x):
        return x[0]

class SentenceMatchNetwork(nn.Module):
    def __init__(self, config):
        super(SentenceMatchNetwork, self).__init__()
        # 可以用bert,参考下面
        # pretrain_model_path = config["pretrain_model_path"]
        # self.bert_encoder = BertModel.from_pretrained(pretrain_model_path)

        # 常规的embedding + layer
        hidden_size = config["hidden_size"]
        #20000应为词表大小,这里借用bert的词表,没有用它精确的数字,因为里面有很多无用词,舍弃一部分,不影响效果
        self.embedding = nn.Embedding(20000, hidden_size)
        #一种多层按顺序执行的写法,具体的层可以换
        #unidirection:batch_size, max_len, hidden_size
        #bidirection:batch_size, max_len, hidden_size * 2
        self.encoder = nn.Sequential(nn.LSTM(hidden_size, hidden_size, bidirectional=True, batch_first=True),
                                     GetFirst(),
                                     nn.ReLU(),
                                     nn.Linear(hidden_size * 2, hidden_size), #batch_size, max_len, hidden_size
                                     nn.ReLU(),
                                     )
        self.classify_layer = nn.Linear(hidden_size, 2)
        self.loss = nn.CrossEntropyLoss()

    # 同时传入两个句子的拼接编码
    # 输出一个相似度预测,不匹配的概率
    def forward(self, input_ids, target=None):
        # x = self.bert_encoder(input_ids)[1]
        #input_ids = batch_size, max_length
        x = self.embedding(input_ids) #x:batch_size, max_length, embedding_size
        x = self.encoder(x) #
        #x: batch_size, max_len, hidden_size
        x = nn.MaxPool1d(x.shape[1])(x.transpose(1,2)).squeeze()
        #x: batch_size, hidden_size
        x = self.classify_layer(x)
        #x: batch_size, 2
        #如果有标签,则计算loss
        if target is not None:
            return self.loss(x, target.squeeze())
        #如果无标签,预测相似度
        else:
            return torch.softmax(x, dim=-1)[:, 1] #如果改为x[:,0]则是两句话不匹配的概率



def choose_optimizer(config, model):
    optimizer = config["optimizer"]
    learning_rate = config["learning_rate"]
    if optimizer == "adam":
        return Adam(model.parameters(), lr=learning_rate)
    elif optimizer == "sgd":
        return SGD(model.parameters(), lr=learning_rate)


if __name__ == "__main__":
    from config import Config
    Config["vocab_size"] = 10
    Config["max_length"] = 4
    model = SentenceMatchNetwork(Config)
    s1 = torch.LongTensor([[1,2,3,0], [2,2,0,0]])
    s2 = torch.LongTensor([[1,2,3,4], [3,2,3,4]])
    l = torch.LongTensor([[1],[0]])
    # y = model(s1, s2, l)
    # print(y)
    # print(model.state_dict())

Ⅳ、模型效果测试 evaluate.py

① 模型初始化

保存配置、模型和日志self.config = config将传入的配置字典保存到实例变量中。self.model = model将传入的模型对象保存到实例变量中。self.logger = logger将传入的日志对象保存到实例变量中。​

加载验证数据self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)加载验证数据,并通过 load_data 函数封装为 DataLoader 对象。shuffle=False 表示不打乱数据顺序。​

加载训练数据self.train_data = load_data(config["train_data_path"], config)加载训练数据,并通过 load_data 函数封装为 DataLoader 对象。这里提到重新加载训练数据是为了将训练集作为知识库进行效果测试。

获取分词器self.tokenizer = self.train_data.dataset.tokenizer从训练数据集中获取分词器对象,用于后续的文本处理。

初始化统计字典self.stats_dict = {"correct":0, "wrong":0}用于存储测试结果的字典,记录正确和错误的样本数量。

    def __init__(self, config, model, logger):
        self.config = config
        self.model = model
        self.logger = logger
        self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)
        # 由于效果测试需要训练集当做知识库,再次加载训练集。
        # 事实上可以通过传参把前面加载的训练集传进来更合理,但是为了主流程代码改动量小,在这里重新加载一遍
        self.train_data = load_data(config["train_data_path"], config)
        self.tokenizer = self.train_data.dataset.tokenizer
        self.stats_dict = {"correct":0, "wrong":0}  #用于存储测试结果

②  将问题转换为向量表示 ⭐

初始化 self.question_index_to_standard_question_index 字典,用于存储问题编号到标准问题编号的映射。

初始化 self.questions 列表,用于存储所有问题。

遍历 self.train_data.dataset.knwb,获取每个标准问题编号及其对应的问题列表。对于每个问题,记录其编号到标准问题编号的映射,并将问题添加到 self.questions 列表中。

items():Python 字典的内置方法,用于返回字典中所有键值对的视图对象。每个键值对以元组形式表示,方便遍历或操作字典数据。

len():是 Python 的内置函数,用于返回序列(如字符串、列表、元组)或集合(如字典、集合)中元素的数量。

参数名类型说明
sequence序列或集合需要计算长度的对象,如字符串、列表、元组、字典等。

列表.append():Python 列表的内置方法,用于在列表末尾添加一个元素。该方法会直接修改原列表,而不是返回一个新列表。

参数名类型说明
element任意类型要添加到列表末尾的元素,可以是字符串、数字、列表、元组等。
    #将知识库中的问题向量化,为匹配做准备
    #每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
    def knwb_to_vector(self):
        self.question_index_to_standard_question_index = {}
        self.questions = []
        for standard_question_index, questions in self.train_data.dataset.knwb.items():
            for question in questions:
                #记录问题编号到标准问题标号的映射,用来确认答案是否正确
                self.question_index_to_standard_question_index[len(self.questions)] = standard_question_index
                self.questions.append(question)
        return

③  评估模型表现 
  • 日志记录:使用 self.logger.info 记录当前测试的轮次。​
  • 初始化统计字典self.stats_dict 用于记录正确和错误的预测数量。
  • 模型设置为评估模式self.model.eval() 将模型设置为评估模式,关闭 dropout 和 batch normalization 的随机性。
  • 知识库向量化:调用 self.knwb_to_vector() 将知识库中的问题向量化,为匹配做准备。​
  • 遍历验证数据
    • 对每个批次的数据,提取测试问题和标签;
    • 对每个测试问题,计算其与知识库中所有问题的相似度得分;
    • 使用 torch.no_grad0) 关闭梯度计算,提高效率将得分转换为列表,并找到最高得分的索引(即最匹配的问题)
    • 将预测结果添加到predicts 列表中​
  • 记录统计结果:调用 self.write_stats 记录预测结果。
  • 显示统计结果:调用 self.show_stats 显示测试结果。

logger.info():用于记录信息级别的日志,通常用于记录程序运行时的状态或过程信息。

参数名类型说明
msg字符串要记录的日志信息。
*args可变参数用于格式化日志信息的参数。
**kwargs关键字参数可选参数,如 exc_info=True 用于记录异常信息。

model.eval():将模型设置为评估模式,关闭 Dropout 和 Batch Normalization 的训练模式,确保模型在推理时行为一致。

enumerate():将可迭代对象(如列表、元组、字符串)组合为索引序列,返回一个枚举对象,包含索引和值。

参数名类型说明
iterable可迭代对象需要枚举的对象,如列表、元组、字符串等。
start整数可选参数,指定索引的起始值,默认为 0。

列表.append():在列表末尾添加一个元素。

参数名类型说明
element任意类型要添加到列表末尾的元素。

torch.no_grad():上下文管理器,用于禁用梯度计算,通常在模型评估或推理时使用,以减少内存消耗并加速计算。

torch.LongTensor():创建一个包含 64 位整数的张量。

参数名类型说明
data列表、元组输入数据,用于初始化张量。

torch.cuda.is_available():检查当前系统是否支持 CUDA(即是否有可用的 GPU)。

cuda():将张量或模型移动到 GPU 上,以利用 GPU 进行计算。

参数名类型说明
device整数或设备可选参数,指定目标 GPU 设备,默认为当前设备。

.detach():从计算图中分离张量,返回一个不需要梯度的新张量

.cpu():将张量或模型移动到 CPU 上。

.tolist():将张量转换为 Python 列表。

np.argmax(): 返回数组中最大值的索引。

参数名类型说明
array数组输入数组。
axis整数可选参数,指定沿哪个轴查找最大值索引,默认为 None(展平数组)。
    def eval(self, epoch):
        self.logger.info("开始测试第%d轮模型效果:" % epoch)
        self.stats_dict = {"correct":0, "wrong":0}  #清空前一轮的测试结果
        self.model.eval()
        self.knwb_to_vector()
        for index, batch_data in enumerate(self.valid_data):
            test_questions, labels = batch_data
            predicts = []
            for test_question in test_questions:
                input_ids = []
                for question in self.questions:
                    input_ids.append(self.train_data.dataset.encode_sentence(test_question, question))

                with torch.no_grad():
                    input_ids = torch.LongTensor(input_ids)
                    if torch.cuda.is_available():
                        input_ids = input_ids.cuda()
                    scores = self.model(input_ids).detach().cpu().tolist()
                hit_index = np.argmax(scores)
                # print(hit_index)
                predicts.append(hit_index)
            self.write_stats(predicts, labels)
        self.show_stats()
        return

④ 记录模型预测结果

断言检查assert len(labels) == len(predicts)确保预测结果和标签的数量一致,否则抛出异常。​

遍历预测结果和标签:使用 zip(predicts, labels) 将预测结果和标签一一对应。​

转换预测索引hit_index = self.question_index_to_standard_question_index[hit_index]将预测的问题索引转换为标准问题索引。​

统计正确和错误的预测:如果预测的标准问题索引与标签一致,则 self.stats_dict["correct"] += 1。否则,self.stats_dict["wrong"] += 1

返回:方法执行完毕后返回 None

zip():用于将多个可迭代对象的元素打包成元组,返回一个可迭代对象

参数名类型说明
*iterables可迭代对象一个或多个可迭代对象(如列表、元组、字符串等),用于打包成元组。
strict布尔值可选参数,默认为 False。如果为 True,当可迭代对象长度不一致时会抛出 ValueError

len():用于返回对象的长度或项目数量,支持多种数据类型

参数名类型说明
obj对象需要计算长度的对象,可以是字符串、列表、元组、字典、集合等。

int():用于返回对象的长度或项目数量,支持多种数据类型

参数名类型说明
x字符串、数字需要转换为整数的对象,可以是字符串、类似字节的对象或数字。
base整数可选参数,默认为 10。指定 x 的进制,例如 2(二进制)、16(十六进制)等。
    def write_stats(self, predicts, labels):
        assert len(labels) == len(predicts)
        for hit_index, label in zip(predicts, labels):
            hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号
            if int(hit_index) == int(label):
                self.stats_dict["correct"] += 1
            else:
                self.stats_dict["wrong"] += 1
        return

⑤ 展示模型预测的统计信息 
  • 统计信息提取
    • correct = self.stats_dict["correct"]从 self.stats_dict 中获取预测正确的条目数。
    • wrong = self.stats_dict["wrong"]从 self.stats_dict 中获取预测错误的条目数。
  • 日志输出
    • self.logger.info("预测集合条目总量:%d" % (correct + wrong))输出预测集合的总条目数,即正确条目数与错误条目数之和。
    • self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong))输出预测正确的条目数和预测错误的条目数。
    • self.logger.info("预测准确率:%f" % (correct / (correct + wrong)))输出预测的准确率,即正确条目数占总条目数的比例。
    • self.logger.info("--------------------")输出分隔线,用于区分不同的日志信息。

logger.info(): 是 Python 的 logging 模块中用于记录信息级别(info level)日志的函数。它通常用于记录程序运行时的关键信息,如状态、进度等,而不是调试信息或错误。

参数名类型描述
msgstr要记录的日志信息。
*args任意类型用于格式化日志信息的参数。
**kwargsdict可选参数,如 exc_infostack_info 等,用于附加异常或堆栈信息。
# -*- coding: utf-8 -*-
import torch
from loader import load_data
import numpy as np

"""
模型效果测试
"""

class Evaluator:
    def __init__(self, config, model, logger):
        self.config = config
        self.model = model
        self.logger = logger
        self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)
        # 由于效果测试需要训练集当做知识库,再次加载训练集。
        # 事实上可以通过传参把前面加载的训练集传进来更合理,但是为了主流程代码改动量小,在这里重新加载一遍
        self.train_data = load_data(config["train_data_path"], config)
        self.tokenizer = self.train_data.dataset.tokenizer
        self.stats_dict = {"correct":0, "wrong":0}  #用于存储测试结果

    #将知识库中的问题向量化,为匹配做准备
    #每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
    def knwb_to_vector(self):
        self.question_index_to_standard_question_index = {}
        self.questions = []
        for standard_question_index, questions in self.train_data.dataset.knwb.items():
            for question in questions:
                #记录问题编号到标准问题标号的映射,用来确认答案是否正确
                self.question_index_to_standard_question_index[len(self.questions)] = standard_question_index
                self.questions.append(question)
        return

    def eval(self, epoch):
        self.logger.info("开始测试第%d轮模型效果:" % epoch)
        self.stats_dict = {"correct":0, "wrong":0}  #清空前一轮的测试结果
        self.model.eval()
        self.knwb_to_vector()
        for index, batch_data in enumerate(self.valid_data):
            test_questions, labels = batch_data
            predicts = []
            for test_question in test_questions:
                input_ids = []
                for question in self.questions:
                    input_ids.append(self.train_data.dataset.encode_sentence(test_question, question))

                with torch.no_grad():
                    input_ids = torch.LongTensor(input_ids)
                    if torch.cuda.is_available():
                        input_ids = input_ids.cuda()
                    scores = self.model(input_ids).detach().cpu().tolist()
                hit_index = np.argmax(scores)
                # print(hit_index)
                predicts.append(hit_index)
            self.write_stats(predicts, labels)
        self.show_stats()
        return

    def write_stats(self, predicts, labels):
        assert len(labels) == len(predicts)
        for hit_index, label in zip(predicts, labels):
            hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号
            if int(hit_index) == int(label):
                self.stats_dict["correct"] += 1
            else:
                self.stats_dict["wrong"] += 1
        return

    def show_stats(self):
        correct = self.stats_dict["correct"]
        wrong = self.stats_dict["wrong"]
        self.logger.info("预测集合条目总量:%d" % (correct +wrong))
        self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong))
        self.logger.info("预测准确率:%f" % (correct / (correct + wrong)))
        self.logger.info("--------------------")
        return

Ⅴ、训练主流程 main.py

① 配置日志记录

logging.basicConfig():用于配置日志系统的基本设置,包括日志级别、输出格式、输出目标(如控制台或文件)等。它通常在程序初始化时调用一次。 

参数名类型描述
levelint设置日志级别,如 logging.DEBUGlogging.INFO 等。
formatstr设置日志输出格式,如 '%(asctime)s - %(levelname)s - %(message)s'
filenamestr设置日志输出到文件,指定文件名。
filemodestr设置文件打开模式,默认为 'a'(追加模式)。
handlerslist设置自定义的日志处理器。

logging.getLogger(): 用于获取一个日志记录器(Logger)对象。每个记录器都有一个名称(name),可以用来区分不同的日志记录器。

参数名类型描述
namestr日志记录器的名称。如果未提供或为 None,则返回根记录器(root logger)。
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

② 创建保存模型的目录

          检查指定的模型保存路径是否存在,如果不存在,则创建该路径作为目录。

os.path.isdir(): Python 中 os.path 模块的函数,用于检查指定的路径是否为一个存在的目录。如果路径存在且是一个目录,则返回 True,否则返回 False

参数名类型描述
pathstr表示文件系统路径的类路径对象(可以是相对路径或绝对路径)。

os.mkdir():Python 中 os 模块的函数,用于创建一个新的目录。如果目录已经存在或路径无效,会抛出 FileExistsError 或 OSError

参数名类型描述
pathstr要创建的目录路径。
modeint可选参数,设置目录的权限(八进制模式),默认为 0o777
    #创建保存模型的目录
    if not os.path.isdir(config["model_path"]):
        os.mkdir(config["model_path"])

③ 加载文件

 torch.cuda.is_available():PyTorch 中用于检查当前系统是否支持 CUDA(即是否有可用的 GPU)的函数。

参数名类型描述
deviceint 或 str指定目标 GPU 设备,默认为 None(使用默认 GPU)。

cuda():PyTorch 张量或模型的方法,用于将张量或模型移动到 GPU 上。

    #加载训练数据
    train_data = load_data(config["train_data_path"], config)
    #加载模型
    model = SiameseNetwork(config)
    # 标识是否使用gpu
    cuda_flag = torch.cuda.is_available()
    if cuda_flag:
        logger.info("gpu可以使用,迁移模型至gpu")
        model = model.cuda()
    #加载优化器
    optimizer = choose_optimizer(config, model)
    #加载效果测试类
    evaluator = Evaluator(config, model, logger)

④ 训练的核心过程 ⭐
  1. 训练循环

    • for epoch in range(config["epoch"]):遍历每个训练周期(epoch),config["epoch"] 是训练的总周期数。
    • epoch += 1:将当前周期数加1,用于日志记录。
    • model.train():将模型设置为训练模式,启用Dropout和Batch Normalization等训练特定行为。
  2. 数据加载与处理

    • for index, batch_data in enumerate(train_data):遍历训练数据集的每个批次(batch)。
    • optimizer.zero_grad():清空优化器的梯度缓存,避免梯度累积。
    • if cuda_flag: batch_data = [d.cuda() for d in batch_data]:如果启用了CUDA(GPU加速),将数据移动到GPU上。
  3. 模型前向传播与损失计算

    • input_id1, input_id2, labels = batch_data:从批次数据中提取两个输入(input_id1 和 input_id2)以及对应的标签(labels)。
  4. 反向传播与优化

    • train_loss.append(loss.item()):将当前批次的损失值记录下来。
    • loss.backward():执行反向传播,计算梯度。
    • optimizer.step():更新模型参数,优化损失函数。
  5. 日志记录与评估

    • logger.info("epoch average loss: %f" % np.mean(train_loss)):记录当前周期的平均损失值。
    • evaluator.eval(epoch):调用评估器对模型进行评估,通常是在验证集上计算准确率或其他指标。

model.train():将模型设置为训练模式。在训练模式下,模型会启用 Dropout 和 Batch Normalization 等层的行为,以确保模型在训练时能够正常工作

enumerate():Python 的内置函数,用于在遍历可迭代对象时同时获取索引和值

参数名类型描述
iterable可迭代对象要遍历的对象(如列表、元组、字符串)。
startint索引的起始值,默认为 0

optimizer.zero_grad():将优化器中所有参数的梯度清零,避免梯度累积

cuda():将张量或模型移动到 GPU 上,以利用 GPU 的并行计算能力加速计算

参数名类型描述
deviceint 或 str指定目标 GPU 设备,默认为 None(使用默认 GPU)。

append():在列表末尾添加一个新元素

参数名类型描述
element任意类型要添加到列表末尾的元素。

item():将包含单个元素的张量转换为 Python 标量(如 int 或 float

loss.backward():计算损失函数对模型参数的梯度,用于反向传播

optimizer.step(): 根据计算出的梯度更新模型参数

    #训练
    for epoch in range(config["epoch"]):
        epoch += 1
        model.train()
        logger.info("epoch %d begin" % epoch)
        train_loss = []
        for index, batch_data in enumerate(train_data):
            optimizer.zero_grad()
            if cuda_flag:  #如果gpu可用则使用gpu加速
                batch_data = [d.cuda() for d in batch_data]
            input_ids, labels = batch_data
            loss = model(input_ids, labels)  #计算loss
            train_loss.append(loss.item())
            #每轮训练一半的时候输出一下loss,观察下降情况
            if index % int(len(train_data) / 2) == 0:
                logger.info("batch loss %f" % loss)
            loss.backward()  #梯度计算
            optimizer.step() #梯度更新
        logger.info("epoch average loss: %f" % np.mean(train_loss))
    evaluator.eval(config["epoch"])

⑤ 保存模型

os.path.join():用于将多个路径片段拼接成一个完整的路径,并自动根据操作系统选择正确的路径分隔符(如 Windows 使用 \,Linux 和 macOS 使用 /

参数名类型描述
pathstr初始路径片段。
*pathsstr需要拼接的后续路径片段,可以接受任意数量的参数。

torch.save():用于将 PyTorch 对象(如模型、张量、字典等)保存到磁盘文件中,通常用于保存模型的权重或训练状态

参数名类型描述
obj任意对象需要保存的对象,如模型、张量、字典等。
fstr 或文件对象保存的目标文件路径或文件对象。

model.state_dict(): 返回一个包含模型所有可学习参数(如权重和偏置)的有序字典,通常用于保存或加载模型参数

    model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)
    torch.save(model.state_dict(), model_path)

模型训练主程序
# -*- coding: utf-8 -*-

import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import SentenceMatchNetwork, choose_optimizer
from evaluate import Evaluator
from loader import load_data

logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

"""
模型训练主程序
"""

def main(config):
    #创建保存模型的目录
    if not os.path.isdir(config["model_path"]):
        os.mkdir(config["model_path"])
    #加载训练数据
    train_data = load_data(config["train_data_path"], config)
    #加载模型
    model = SentenceMatchNetwork(config)
    # 标识是否使用gpu
    cuda_flag = torch.cuda.is_available()
    if cuda_flag:
        logger.info("gpu可以使用,迁移模型至gpu")
        model = model.cuda()
    #加载优化器
    optimizer = choose_optimizer(config, model)
    #加载效果测试类
    evaluator = Evaluator(config, model, logger)
    #训练
    for epoch in range(config["epoch"]):
        epoch += 1
        model.train()
        logger.info("epoch %d begin" % epoch)
        train_loss = []
        for index, batch_data in enumerate(train_data):
            optimizer.zero_grad()
            if cuda_flag:  #如果gpu可用则使用gpu加速
                batch_data = [d.cuda() for d in batch_data]
            input_ids, labels = batch_data
            loss = model(input_ids, labels)  #计算loss
            train_loss.append(loss.item())
            #每轮训练一半的时候输出一下loss,观察下降情况
            if index % int(len(train_data) / 2) == 0:
                logger.info("batch loss %f" % loss)
            loss.backward()  #梯度计算
            optimizer.step() #梯度更新
        logger.info("epoch average loss: %f" % np.mean(train_loss))
    evaluator.eval(config["epoch"])
    model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)
    torch.save(model.state_dict(), model_path)
    return

if __name__ == "__main__":
    main(Config)

三、对比:交互型 vs 表示型

1.表示型

Ⅰ、模型结构与交互机制

① 结构特点

        采用双塔(Siamese)架构,两个文本分别通过共享参数的编码器(如MLP、CNN、LSTM、Transformer等)独立生成固定维度的语义向量,再通过余弦相似度、点积等方式计算匹配得分。

② 典型模型

        DSSM、CDSSM、MV-LSTM、ARC-I等。

③ 交互时机

        仅在最终匹配层进行浅层交互(如相似度计算),编码过程中文本间无信息交换。

Ⅱ、表示型模型的优势

① 计算效率高

        可预先计算文本向量,在线匹配仅需向量相似度计算,适用于大规模检索场景(如搜索引擎)

​② 参数共享

        双塔共享编码器参数,减少模型复杂度并提升泛化能力

Ⅲ、表示型模型的局限性

① 语义焦点丢失

        全局向量可能受无关词干扰(如长文本中冗余信息淹没关键短语)

② 词级信息缺失

        无法捕捉词法、句法层面的局部匹配特征(如近义词替换或语序变化)

Ⅳ、应用场景

        适合对响应速度要求高、候选集规模大的场景,如搜索引擎召回、推荐系统粗排


2.交互型

Ⅰ、模型结构与交互机制

① 结构特点

        将两个文本拼接后输入单一编码器(如BERT),通过自注意力机制或交叉注意力直接在词 / 短语级别进行细粒度交互,再通过池化或分类器输出匹配结果

② 典型模型

        BERT-based模型、ESIM、InferSent等

③ 交互时机

        在编码层即引入词级交互,构建交互矩阵或注意力图,捕捉局部语义关联

Ⅱ、交互型模型的优势

① 匹配精度高

        通过词级交互矩阵捕捉细粒度语义关联(如同义词、反义词、指代关系),显著提升复杂语义匹配的准确率

② 上下文建模强

        自注意力机制可动态加权重要词汇,避免语义漂移

Ⅲ、交互型模型的局限性

① 计算成本高

        需实时拼接文本对输入模型,难以支持海量候选集的快速检索

② 交互型模板

        需实时拼接文本对输入模型,难以支持海量候选集的快速检索

Ⅳ、应用场景

        适合对精度要求高、候选集较小的场景,如问答系统精排、复述检测、语义相似度评测


3.总结

表示型:

优点:训练好的模型可以对知识库内的问题计算向量,在实际查找过程中,只对输入文本做一次向量化,训练速度快

缺点:在向量化的过程中不知道文本重点

交互型:

优点:通过对比把握句子重点

缺点:每次计算都需要两个输入,需要等到问题来了,再去知识库中的问题作拼接,需要调入模型n次,比较耗时,模型效率低。

维度表示型模型交互型模型
交互时机后期(匹配层)早期(编码层)
计算效率高(支持预计算)低(需实时计算)
匹配精度一般
适用场景大规模检索、粗排精细化匹配、精排
典型优化方向增强embedding层编码器(如Transformer)轻量化交互(如蒸馏、剪枝)

四、对比学习

主要解决的问题:文本表示 / 图像表示

主要目标:训练一个好的文本 / 图像 编码器(model)

输入一个样本,对样本做一些处理(数据增强),就构造出了一个正样本 / 相似样本,再寻找一些负样本,传入模型让其进行学习,相当于一种无监督的表示型学习方式


五、海量向量查找

假如我们有1亿以上的候选向量

对于一个给定向量,希望查找距离最接近的

如何高效的完成?

快速的向量查找在问答,搜索,推荐等场景下均会使用

1.KD树

原理 —— 空间切割:

事先构造一棵树,步骤:

        ① 先选取计算维度,计算每个点x、y轴集合的方差,方差大的更加均匀

        ② 然后再选取当前维度下数据的中值位置

        ③ 将维度方差大的轴的中值位置作为根节点,轴的两边为二叉树的左右子树

        ④ 然后对于子树重复选取点的步骤,直到将所有点全部加入树中

查询步骤: 

        ① 依照建索引方式(左右子树)找到查询点的空间位置

        ② 向上回退,计算查询点与离其最近的节点距离 和 查询点到各个切割平面的距离

        ③ 如果到切割平面的距离大于到已知点的距离,就没必要跨过平面计算距离,根据情况判断是否需要查找平面另一侧节点

        ④ 回退到根节点为止

优点:减少余弦距离的计算次数,带来了效率的提升


2.KD树 —— 代码实现 🚀

Ⅰ、定义KD树的节点类

① 初始化KD树的结点

self.father:父节点

self.left:左子结点

self.right:右子结点

self.feature:当前结点用于分割数据的特征(维度)的索引

self.split:包含分割点向量和其对应的标签(索引) ,在叶子节点中,split包含了数据点本身及其标签

class Node(object):
    def __init__(self):
        """Node class to build tree leaves.
        """
        self.father = None
        self.left = None
        self.right = None
        self.feature = None
        self.split = None

② 字符串表示方法

        返回了节点分割点向量的字符串表示,方便用户查看单个节点的信息。

self.split:包含分割点向量和其对应的标签(索引) ,在叶子节点中,split包含了数据点本身及其标签

    def __str__(self):
        return str(self.split[0])

③ 获取当前节点的兄弟节点

        如果节点没有父节点(即它本身是根节点),则返回None。否则,检查当前节点是父节点的左子节点还是右子节点,然后返回另一个子节点作为兄弟节点。

@property:@property装饰器是实现面向对象编程中属性管理的重要工具,它通过将方法转换为属性访问的形式,既保持了代码的简洁性,又能实现数据校验、动态计算等高级功能

属性式访问:通过将方法伪装成属性,调用时无需添加括号,直接通过对象.属性名访问

数据校验与控制:通过@属性名.setter定义设置逻辑,拦截非法赋值操作

只读属性与删除控制:若未定义setter方法,则属性为只读;通过@属性名.deleter可自定义删除行为

    @property
    def brother(self):
        """Find the node's brother.
        Returns:
            node -- Brother node.
        """
        if not self.father:
            ret = None
        else:
            if self.father.left is self:
                ret = self.father.right
            else:
                ret = self.father.left
        return ret

Ⅱ、定义KD树类

① 初始化

        初始化KDTree类,创建一个根节点。根节点是KD树的起始点,用于后续的搜索和插入操作。

class KDTree(object):
    def __init__(self):
        """KD Tree class to improve search efficiency in KNN.
        Attributes:
            root: the root node of KDTree.
        """
        self.root = Node()

②  打印树节点信息

        使用广度优先搜索遍历树节点,将节点信息格式化为字符串并返回。

list.pop():移除列表中指定索引位置的元素并返回该元素。若未指定索引,默认移除最后一个元素

参数类型是否可选描述
indexint要移除元素的索引(默认-1,即最后一个元素)。若索引越界会引发IndexError

list.append():在列表末尾添加单个元素,直接修改原列表且无返回值

参数类型是否可选描述
element任意类型要添加的元素(可以是数字、字符串、列表等复合类型)

str.join():用指定字符串连接可迭代对象中的元素,生成新字符串。要求可迭代对象内的元素均为字符串类型

参数类型是否可选描述
iterable可迭代对象需连接的序列(如列表、元组),非字符串元素会引发TypeError
    def __str__(self):
        """Show the relationship of each node in the KD Tree.
        Returns:
            str -- KDTree Nodes information.
        """
        ret = []
        i = 0
        que = [(self.root, -1)]
        while que:
            nd, idx_father = que.pop(0)
            ret.append("%d -> %d: %s" % (idx_father, i, str(nd)))
            if nd.left:
                que.append((nd.left, i))
            if nd.right:
                que.append((nd.right, i))
            i += 1
        return "\n".join(ret)

③ 计算中位数索引

        计算给定二维数组中指定特征列的中位数所对应的行索引。通过从指定的列收集数据、排序并提取中位数的索引,该方法能有效地用于构建 KD 树等数据结构中,以此提高相似度搜索的效率。

map():将指定函数依次作用于可迭代对象的每个元素,返回一个包含结果的迭代器(需转换为列表或元组等容器类型)

参数类型是否可选描述
function可调用对象处理元素的函数(如内置函数、lambda 或自定义函数)
iterable可迭代对象待处理的数据序列(如列表、元组),可传入多个(函数需匹配参数数量)

lambda:创建匿名函数,简化一次性使用的简单逻辑,常用于配合 map()filter() 等高阶函数

参数类型是否可选描述
arguments参数列表函数参数(如 x, y
expression表达式单行表达式,计算结果作为返回值(如 x + y 或 x**2

sorted():对可迭代对象进行排序,返回新列表(原数据不变)

参数类型是否可选描述
iterable可迭代对象待排序的数据(如列表、元组)
key函数排序依据的函数(常用 lambda,如 key=lambda x: x["age"]

8

reverse布尔值是否降序排列(默认 False 升序)

list(): 将可迭代对象(如字符串、元组、集合)转换为列表

参数类型是否可选描述
iterable可迭代对象若省略则创建空列表;若传入则转换元素(如 list("abc")["a","b","c"]
    def _get_median_idx(self, X, idxs, feature):
        """Calculate the median of a column of data.
        Arguments:
            X {list} -- 2d list object with int or float.
            idxs {list} -- 1D list with int.
            feature {int} -- Feature number.
            sorted_idxs_2d {list} -- 2D list with int.
        Returns:
            list -- The row index corresponding to the median of this column.
        """
        n = len(idxs)
        # Ignoring the number of column elements is odd and even.
        k = n // 2
        # Get all the indexes and elements of column j as tuples.
        col = map(lambda i: (i, X[i][feature]), idxs)
        # Sort the tuples by the elements' values
        # and get the corresponding indexes.
        sorted_idxs = map(lambda x: x[0], sorted(col, key=lambda x: x[1]))
        # Search the median value.
        median_idx = list(sorted_idxs)[k]
        return median_idx

④ 计算给定二维列表在指定特征(维度)上的方差

         计算指定特征在给定数据集上的方差,这在构建 KD 树时用于选择最优的分割特征(即具有最大方差的特征),以提高后续最近邻搜索的效率。通过这种方法,可以有效地找到数据集中最具区分度的特征,从而优化 KD 树的构建和查询过程。

    def _get_variance(self, X, idxs, feature):
        """Calculate the variance of a column of data.
        Arguments:
            X {list} -- 2d list object with int or float.
            idxs {list} -- 1D list with int.
            feature {int} -- Feature number.
        Returns:
            float -- variance
        """
        n = len(idxs)
        col_sum = col_sum_sqr = 0
        for idx in idxs:
            xi = X[idx][feature]
            col_sum += xi
            col_sum_sqr += xi ** 2
        # D(X) = E{[X-E(X)]^2} = E(X^2)-[E(X)]^2
        return col_sum_sqr / n - (col_sum / n) ** 2

⑤ 指定特征维度

        在KD树构建过程中,选择具有最大方差的特征作为当前节点的分割特征。这样做可以确保树的每个层上的数据分布尽可能均匀,从而提高后续的搜索效率。

map():将指定函数依次作用于可迭代对象的每个元素,返回一个包含结果的迭代器(需转换为列表或元组等容器类型)

参数类型是否可选描述
function可调用对象处理元素的函数(如内置函数、lambda 或自定义函数)
iterable可迭代对象待处理的数据序列(如列表、元组),可传入多个(函数需匹配参数数量)

lambda:创建匿名函数,简化一次性使用的简单逻辑,常用于配合 map()filter() 等高阶函数

参数类型是否可选描述
arguments参数列表函数参数(如 x, y
expression表达式单行表达式,计算结果作为返回值(如 x + y 或 x**2

max(): Python 内置的高效函数,用于获取可迭代对象多个参数中的最大值。其功能灵活,支持多种数据类型和自定义比较逻辑。

参数类型是否可选描述
iterable可迭代对象如列表、元组、字符串等(需元素可比较)
args多个参数直接传入多个参数进行比较(如 max(3,5,7)
key函数自定义比较规则(如 key=lambda x: x["age"]
default任意类型当可迭代对象为空时返回此默认值,否则引发 ValueError
    def _choose_feature(self, X, idxs):
        """Choose the feature which has maximum variance.
        Arguments:
            X {list} -- 2d list object with int or float.
            idxs {list} -- 1D list with int.
        Returns:
            feature number {int}
        """
        m = len(X[0])
        variances = map(lambda j: (
            j, self._get_variance(X, idxs, j)), range(m))
        return max(variances, key=lambda x: x[1])[0]

⑥  根据给定特征分割数据集 

        根据特征的中位数将给定的索引列表分割成两个子集。这在构建k-d树(k维空间树)时非常重要,有助于在构造树的过程中有效地将数据划分为更小的、更可管理的部分,从而在后续的查找中提高效率。

        通过在指定特征上比较中位数,函数将数据分为小于和大于(或等于)中位数的两部分,这种分割方式是 k-d树构建的关键步骤之一,旨在平衡树的高度,优化查找性能。

list.append(): Python 列表的内置方法,用于在列表末尾添加单个元素。它会直接修改原列表,​不返回新列表​(返回 None

参数类型是否可选说明
object任意类型要添加的元素(支持数字、字符串、列表、元组、字典、None 等所有类型)
    def _split_feature(self, X, idxs, feature, median_idx):
        """Split indexes into two arrays according to split point.
        Arguments:
            X {list} -- 2d list object with int or float.
            idx {list} -- Indexes, 1d list object with int.
            feature {int} -- Feature number.
            median_idx {float} -- Median index of the feature.
        Returns:
            list -- [left idx, right idx]
        """
        idxs_split = [[], []]
        split_val = X[median_idx][feature]
        for idx in idxs:
            # Keep the split point in current node.
            if idx == median_idx:
                continue
            # Split
            xi = X[idx][feature]
            if xi < split_val:    # 根据当前索引 idx 对应的特征值 xi 来判断其相对于 split_val 的位置:
                idxs_split[0].append(idx)
            else:
                idxs_split[1].append(idx)
        return idxs_split

Ⅲ、构建KD树

        根据给定的数据集和标签构建一棵Kd树。通过选择具有最大方差的特征来进行数据点的分割,并将分割点存储在节点中。构建的过程采用广度优先搜索的方式,逐层构建树的结构。这样构建的Kd树可以有效地用于在多维空间中快速查找最近邻点

列表,pop():移除列表中指定索引位置的元素并返回该元素。若未指定索引,默认移除最后一个元素

参数类型是否可选描述
indexint要移除元素的索引(默认 -1,即最后一个元素)。若索引越界会引发 IndexError

str.split():将字符串按指定分隔符拆分为列表,默认按空格分割

参数类型是否可选描述
sepstr分隔符(默认空格)。
maxsplitint最大分割次数(默认全部拆分)

列表.append():在列表末尾添加单个元素,直接修改原列表且无返回值

参数类型是否可选描述
element任意类型要添加的元素(支持数字、字符串、列表等所有类型)
    def build_tree(self, X, y):
        """Build a KD Tree. The data should be scaled so as to calculate variances.
        Arguments:
            X {list} -- 2d list object with int or float.
            y {list} -- 1d list object with int or float.
        """
        # Initialize with node, indexes
        nd = self.root
        idxs = range(len(X))
        que = [(nd, idxs)]
        while que:
            nd, idxs = que.pop(0)
            n = len(idxs)
            # Stop split if there is only one element in this node
            if n == 1:
                nd.split = (X[idxs[0]], y[idxs[0]])
                continue
            # Split
            feature = self._choose_feature(X, idxs)
            median_idx = self._get_median_idx(X, idxs, feature)
            idxs_left, idxs_right = self._split_feature(
                X, idxs, feature, median_idx)
            # Update properties of current node
            nd.feature = feature
            nd.split = (X[median_idx], y[median_idx])
            # Put children of current node in que
            if idxs_left != []:
                nd.left = Node()
                nd.left.father = nd
                que.append((nd.left, idxs_left))
            if idxs_right != []:
                nd.right = Node()
                nd.right.father = nd
                que.append((nd.right, idxs_right))

Ⅳ、最近邻搜索 

① 查找叶子节点

        从KD树的根节点开始,沿着树的路径向下搜索,直到找到包含搜索样本Xi的叶节点,并返回该叶节点。

str.split():将字符串按指定分隔符拆分为列表,默认按空格分割

参数类型是否可选描述
sepstr分隔符(默认空格)。
maxsplitint最大分割次数(默认全部拆分)
    def _search(self, Xi, nd):
        """Search Xi from the KDTree until Xi is at an leafnode.
        Arguments:
            Xi {list} -- 1d list with int or float.
        Returns:
            node -- Leafnode.
        """
        while nd.left or nd.right:
            if not nd.left:
                nd = nd.right
            elif not nd.right:
                nd = nd.left
            else:
                if Xi[nd.feature] < nd.split[0][nd.feature]:
                    nd = nd.left
                else:
                    nd = nd.right
        return nd

② 计算两向量欧几里得距离

        从给定的KD树节点中获取与其分割点向量的欧几里得距离。具体来说,它首先从节点对象中提取出分割点向量X0,然后调用类中的另一个方法get_eu_dist来计算输入向量Xi与分割点向量X0之间的欧几里得距离。

        get_eu_dist:计算两个一维向量之间的欧几里得距离。它通过将两个向量对应位置的元素相减,求平方和,再取平方根来实现。

str.split():将字符串按指定分隔符拆分为列表,默认按空格分割

参数类型是否可选描述
sepstr分隔符(默认空格)。
maxsplitint最大分割次数(默认全部拆分)

sum():对可迭代对象中的数值元素求和,支持自定义起始值

参数类型是否可选默认值描述
iterable可迭代对象必须为数值类型(整数、浮点数等)的可迭代对象(如列表、元组、集合)。
start数值0求和的初始值,会加到最终结果中。

zip():将多个可迭代对象的对应元素打包为元组,返回一个迭代器

参数类型是否可选默认值描述
*iterables多个可迭代对象支持列表、元组、字符串等可迭代对象。
strict布尔值False若设为 True,强制要求所有可迭代对象长度一致(Python 3.10+)。

列表推导式: 通过简洁语法快速生成列表,支持条件筛选和嵌套循环

语法部分类型是否可选描述
expression表达式对当前元素的操作(如 x**2)。
item变量名从可迭代对象中逐个提取元素。
iterable可迭代对象数据来源(如 range(5))。
if condition条件表达式筛选元素的条件(如 x % 2 == 0)。
    def _get_eu_dist(self, Xi, nd):
        """Calculate euclidean distance between Xi and node.
        Arguments:
            Xi {list} -- 1d list with int or float.
            nd {node}
        Returns:
            float -- Euclidean distance.
        """
        X0 = nd.split[0]
        return self.get_eu_dist(Xi, X0)

    def get_eu_dist(self, arr1, arr2):
        """Calculate the Euclidean distance of two vectors.
        Arguments:
            arr1 {list} -- 1d list object with int or float
            arr2 {list} -- 1d list object with int or float
        Returns:
            float -- Euclidean distance
        """
        return sum((x1 - x2) ** 2 for x1, x2 in zip(arr1, arr2)) ** 0.5

③ 计算点到分割超平面的距离

        计算给定点 Xi 到 KD 树节点 nd 所表示的超平面的欧几里得距离。具体来说,它通过比较点 Xi 在某个特征上的值与节点 nd 的分割点在该特征上的值,来得到点 Xi 到超平面的距离。这个距离用于判断在最近邻搜索过程中是否需要访问兄弟节点。

nd.feature:当前结点用于分割数据的特征(维度)的索引 

str.split():将字符串按指定分隔符分割为列表,支持限制分割次数

参数类型是否可选默认值描述
sep字符串所有空字符分隔符(如 ",""*"),若未指定则按空格、换行符等分割
maxsplit整数不限分割次数指定最大分割次数,剩余未分割部分作为列表最后一个元素

abs():返回数值的绝对值,支持整数、浮点数和复数

参数类型是否可选默认值描述
number数值(含复数)接受整数、浮点数或复数(返回复数的模)
    def _get_hyper_plane_dist(self, Xi, nd):
        """Calculate euclidean distance between Xi and hyper plane.
        Arguments:
            Xi {list} -- 1d list with int or float.
            nd {node}
        Returns:
            float -- Euclidean distance.
        """
        j = nd.feature
        X0 = nd.split[0]
        return abs(Xi[j] - X0[j])

④ 执行最近邻搜索

        基于KD树执行最近邻搜索。首先,通过递归的方式找到待查找向量Xi所属的叶子节点nd_best,然后在此基础上进行回溯,检查是否在兄弟子树中存在距离更近的节点,最终返回距离Xi最近的节点。

float(): 将字符串或数字转换为浮点数。如果未提供参数,则返回 0.0

参数类型是否可选默认值描述
x字符串或数字要转换为浮点数的字符串或数字。若未提供参数,则返回 0.0

列表.pop():移除列表中指定索引处的元素并返回该元素。若未指定索引,则移除并返回最后一个元素。

参数类型是否可选默认值描述
index整数-1要移除元素的索引。若未提供参数,则移除并返回最后一个元素。

列表.append():在列表末尾添加一个元素。

参数类型是否可选默认值描述
obj任意类型要添加到列表末尾的元素。
    def nearest_neighbour_search(self, Xi):
        """Nearest neighbour search and backtracking.
        Arguments:
            Xi {list} -- 1d list with int or float.
        Returns:
            node -- The nearest node to Xi.
        """
        # The leaf node after searching Xi.
        dist_best = float("inf")
        nd_best = self._search(Xi, self.root)
        que = [(self.root, nd_best)]
        while que:
            nd_root, nd_cur = que.pop(0)
            # Calculate distance between Xi and root node
            dist = self._get_eu_dist(Xi, nd_root)
            # Update best node and distance.
            if dist < dist_best:
                dist_best, nd_best = dist, nd_root
            while nd_cur is not nd_root:
                # Calculate distance between Xi and current node
                dist = self._get_eu_dist(Xi, nd_cur)
                # Update best node, distance and visit flag.
                if dist < dist_best:
                    dist_best, nd_best = dist, nd_cur
                # If it's necessary to visit brother node.
                if nd_cur.brother and dist_best > \
                        self._get_hyper_plane_dist(Xi, nd_cur.father):
                    _nd_best = self._search(Xi, nd_cur.brother)
                    que.append((nd_cur.brother, _nd_best))
                # Back track.
                nd_cur = nd_cur.father
        return nd_best

Ⅴ、传统搜索方法

         在一个给定的数据集matrix中找到与目标向量arr1最近的邻居。通过逐个计算目标向量与数据集中每个向量之间的欧几里得距离,并将这些距离与其对应的索引存储在一个列表中,最后对这个列表进行排序以找到距离最近的向量。

enumerate():将可迭代对象组合为一个索引序列,返回枚举对象(包含索引和值)。

参数类型是否可选默认值描述
iterable可迭代对象需要枚举的可迭代对象(如列表、字符串等)。
start整数0索引的起始值。

sum():对可迭代对象中的数值元素求和,支持自定义起始值。

参数类型是否可选默认值描述
iterable可迭代对象包含数值元素的可迭代对象(如列表、元组等)。
start数值0求和的初始值,会加到最终结果中。

zip():将多个可迭代对象的对应元素打包为元组,返回一个迭代器。

参数类型是否可选默认值描述
*iterables多个可迭代对象支持列表、元组、字符串等可迭代对象。
strict布尔值False若设为 True,强制要求所有可迭代对象长度一致(Python 3.10+)。

列表.append():在列表末尾添加一个元素。

参数类型是否可选默认值描述
obj任意类型要添加到列表末尾的元素。

sorted():对所有可迭代对象进行排序,返回一个新的列表。

参数类型是否可选默认值描述
iterable可迭代对象需要排序的可迭代对象(如列表、元组等)。
key函数用于排序的函数(如 lambda x: x[1])。
reverse布尔值False若为 True,则降序排序;若为 False,则升序排序。
#传统方式,逐个计算并排序
def traditional_search(arr1, matrix):
    res = []
    for index, arr2 in enumerate(matrix):
        score = sum((x1 - x2) ** 2 for x1, x2 in zip(arr1, arr2)) ** 0.5
        res.append([score, index])
    res = sorted(res, key=lambda x:x[0])
    return matrix[res[0][1]]

Ⅵ、性能对比

         构建KD树并使用KD树搜索方法与传统的逐个计算并排序的方法来查找最近邻向量,并比较这两种方法在相同数据集上搜索100次的耗时。

vec_dim:定义向量的维度为 8

matrix:创建了一个1000行、8列的随机数矩阵matrix

np.random.random():生成一个或多个介于 0 和 1 之间的随机浮点数,返回一个 NumPy 数组或单个浮点数。

参数类型是否可选默认值描述
size整数或元组None输出的形状。若为 None,则返回单个浮点数;否则返回指定形状的数组。

list():将可迭代对象(如字符串、元组、集合等)转换为列表。

参数类型是否可选默认值描述
iterable可迭代对象要转换为列表的可迭代对象。若未提供参数,则返回空列表。

range():生成一个不可变的整数序列,通常用于循环控制或生成数字序列。

参数类型是否可选默认值描述
start整数0序列的起始值(包含)。
stop整数序列的结束值(不包含)。
step整数1步长,表示每次递增或递减的值。

len():返回对象(如字符串、列表、字典等)的长度或元素个数。

参数类型是否可选默认值描述
obj可迭代对象要计算长度的对象(如字符串、列表、字典等)。

time.time():返回当前时间的时间戳(从 1970 年 1 月 1 日 00:00:00 UTC 开始到现在的秒数)。

import time
import numpy as np

'''
基于kd树的向量快速查找
'''

class Node(object):
    def __init__(self):
        """Node class to build tree leaves.
        """
        self.father = None
        self.left = None
        self.right = None
        self.feature = None
        self.split = None

    def __str__(self):
        return str(self.split[0])

    @property
    def brother(self):
        """Find the node's brother.
        Returns:
            node -- Brother node.
        """
        if not self.father:
            ret = None
        else:
            if self.father.left is self:
                ret = self.father.right
            else:
                ret = self.father.left
        return ret


class KDTree(object):
    def __init__(self):
        """KD Tree class to improve search efficiency in KNN.
        Attributes:
            root: the root node of KDTree.
        """
        self.root = Node()

    def __str__(self):
        """Show the relationship of each node in the KD Tree.
        Returns:
            str -- KDTree Nodes information.
        """
        ret = []
        i = 0
        que = [(self.root, -1)]
        while que:
            nd, idx_father = que.pop(0)
            ret.append("%d -> %d: %s" % (idx_father, i, str(nd)))
            if nd.left:
                que.append((nd.left, i))
            if nd.right:
                que.append((nd.right, i))
            i += 1
        return "\n".join(ret)

    def _get_median_idx(self, X, idxs, feature):
        """Calculate the median of a column of data.
        Arguments:
            X {list} -- 2d list object with int or float.
            idxs {list} -- 1D list with int.
            feature {int} -- Feature number.
            sorted_idxs_2d {list} -- 2D list with int.
        Returns:
            list -- The row index corresponding to the median of this column.
        """
        n = len(idxs)
        # Ignoring the number of column elements is odd and even.
        k = n // 2
        # Get all the indexes and elements of column j as tuples.
        col = map(lambda i: (i, X[i][feature]), idxs)
        # Sort the tuples by the elements' values
        # and get the corresponding indexes.
        sorted_idxs = map(lambda x: x[0], sorted(col, key=lambda x: x[1]))
        # Search the median value.
        median_idx = list(sorted_idxs)[k]
        return median_idx

    def _get_variance(self, X, idxs, feature):
        """Calculate the variance of a column of data.
        Arguments:
            X {list} -- 2d list object with int or float.
            idxs {list} -- 1D list with int.
            feature {int} -- Feature number.
        Returns:
            float -- variance
        """
        n = len(idxs)
        col_sum = col_sum_sqr = 0
        for idx in idxs:
            xi = X[idx][feature]
            col_sum += xi
            col_sum_sqr += xi ** 2
        # D(X) = E{[X-E(X)]^2} = E(X^2)-[E(X)]^2
        return col_sum_sqr / n - (col_sum / n) ** 2

    def _choose_feature(self, X, idxs):
        """Choose the feature which has maximum variance.
        Arguments:
            X {list} -- 2d list object with int or float.
            idxs {list} -- 1D list with int.
        Returns:
            feature number {int}
        """
        m = len(X[0])
        variances = map(lambda j: (
            j, self._get_variance(X, idxs, j)), range(m))
        return max(variances, key=lambda x: x[1])[0]

    def _split_feature(self, X, idxs, feature, median_idx):
        """Split indexes into two arrays according to split point.
        Arguments:
            X {list} -- 2d list object with int or float.
            idx {list} -- Indexes, 1d list object with int.
            feature {int} -- Feature number.
            median_idx {float} -- Median index of the feature.
        Returns:
            list -- [left idx, right idx]
        """
        idxs_split = [[], []]
        split_val = X[median_idx][feature]
        for idx in idxs:
            # Keep the split point in current node.
            if idx == median_idx:
                continue
            # Split
            xi = X[idx][feature]
            if xi < split_val:
                idxs_split[0].append(idx)
            else:
                idxs_split[1].append(idx)
        return idxs_split

    def build_tree(self, X, y):
        """Build a KD Tree. The data should be scaled so as to calculate variances.
        Arguments:
            X {list} -- 2d list object with int or float.
            y {list} -- 1d list object with int or float.
        """
        # Initialize with node, indexes
        nd = self.root
        idxs = range(len(X))
        que = [(nd, idxs)]
        while que:
            nd, idxs = que.pop(0)
            n = len(idxs)
            # Stop split if there is only one element in this node
            if n == 1:
                nd.split = (X[idxs[0]], y[idxs[0]])
                continue
            # Split
            feature = self._choose_feature(X, idxs)
            median_idx = self._get_median_idx(X, idxs, feature)
            idxs_left, idxs_right = self._split_feature(
                X, idxs, feature, median_idx)
            # Update properties of current node
            nd.feature = feature
            nd.split = (X[median_idx], y[median_idx])
            # Put children of current node in que
            if idxs_left != []:
                nd.left = Node()
                nd.left.father = nd
                que.append((nd.left, idxs_left))
            if idxs_right != []:
                nd.right = Node()
                nd.right.father = nd
                que.append((nd.right, idxs_right))

    def _search(self, Xi, nd):
        """Search Xi from the KDTree until Xi is at an leafnode.
        Arguments:
            Xi {list} -- 1d list with int or float.
        Returns:
            node -- Leafnode.
        """
        while nd.left or nd.right:
            if not nd.left:
                nd = nd.right
            elif not nd.right:
                nd = nd.left
            else:
                if Xi[nd.feature] < nd.split[0][nd.feature]:
                    nd = nd.left
                else:
                    nd = nd.right
        return nd

    def _get_eu_dist(self, Xi, nd):
        """Calculate euclidean distance between Xi and node.
        Arguments:
            Xi {list} -- 1d list with int or float.
            nd {node}
        Returns:
            float -- Euclidean distance.
        """
        X0 = nd.split[0]
        return self.get_eu_dist(Xi, X0)

    def get_eu_dist(self, arr1, arr2):
        """Calculate the Euclidean distance of two vectors.
        Arguments:
            arr1 {list} -- 1d list object with int or float
            arr2 {list} -- 1d list object with int or float
        Returns:
            float -- Euclidean distance
        """
        return sum((x1 - x2) ** 2 for x1, x2 in zip(arr1, arr2)) ** 0.5

    def _get_hyper_plane_dist(self, Xi, nd):
        """Calculate euclidean distance between Xi and hyper plane.
        Arguments:
            Xi {list} -- 1d list with int or float.
            nd {node}
        Returns:
            float -- Euclidean distance.
        """
        j = nd.feature
        X0 = nd.split[0]
        return abs(Xi[j] - X0[j])

    def nearest_neighbour_search(self, Xi):
        """Nearest neighbour search and backtracking.
        Arguments:
            Xi {list} -- 1d list with int or float.
        Returns:
            node -- The nearest node to Xi.
        """
        # The leaf node after searching Xi.
        dist_best = float("inf")
        nd_best = self._search(Xi, self.root)
        que = [(self.root, nd_best)]
        while que:
            nd_root, nd_cur = que.pop(0)
            # Calculate distance between Xi and root node
            dist = self._get_eu_dist(Xi, nd_root)
            # Update best node and distance.
            if dist < dist_best:
                dist_best, nd_best = dist, nd_root
            while nd_cur is not nd_root:
                # Calculate distance between Xi and current node
                dist = self._get_eu_dist(Xi, nd_cur)
                # Update best node, distance and visit flag.
                if dist < dist_best:
                    dist_best, nd_best = dist, nd_cur
                # If it's necessary to visit brother node.
                if nd_cur.brother and dist_best > \
                        self._get_hyper_plane_dist(Xi, nd_cur.father):
                    _nd_best = self._search(Xi, nd_cur.brother)
                    que.append((nd_cur.brother, _nd_best))
                # Back track.
                nd_cur = nd_cur.father
        return nd_best

#传统方式,逐个计算并排序
def traditional_search(arr1, matrix):
    res = []
    for index, arr2 in enumerate(matrix):
        score = sum((x1 - x2) ** 2 for x1, x2 in zip(arr1, arr2)) ** 0.5
        res.append([score, index])
    res = sorted(res, key=lambda x:x[0])
    return matrix[res[0][1]]

vec_dim = 8
matrix = np.random.random((1000, vec_dim))

kd_tree = KDTree()
kd_tree.build_tree(matrix, list(range(len(matrix))))

# x = np.random.random((vec_dim))
# print(kd_tree.nearest_neighbour_search(x))
# print(traditional_search(x, matrix))

start_time = time.time()
for i in range(100):
    x = np.random.random((vec_dim))
    best = kd_tree.nearest_neighbour_search(x)
print("kd树搜索耗时:%s"%(time.time() - start_time))

start_time = time.time()
for i in range(100):
    x = np.random.random((vec_dim))
    best = traditional_search(x, matrix)
print("穷举搜索耗时:%s"%(time.time() - start_time))


2.Annoy 

也是依据空间分割的原理来做,空间分割的过程相当于Kmeans聚类

重复分割过程,直到每个空间内的点个数小于设定值

可以同时在多个接近的分支上查找 或 通过不同初始划分,生成多个树

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/983579.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

打造智能聊天体验:前端集成 DeepSeek AI 助你快速上手

DeepSeek AI 聊天助手集成指南 先看完整效果&#xff1a; PixPin_2025-02-19_09-15-59 效果图&#xff1a; 目录 项目概述功能特点环境准备项目结构组件详解 ChatContainerChatInputMessageBubbleTypeWriter 核心代码示例使用指南常见问题 项目概述 基于 Vue 3 TypeScrip…

2008-2024年中国手机基站数据/中国移动通信基站数据

2008-2024年中国手机基站数据/中国移动通信基站数据 1、时间&#xff1a;2008-2024年 2、来源&#xff1a;OpenCelliD 3、指标&#xff1a;网络类型、网络代数、移动国家/地区、移动网络代码、区域代码、小区标识、单元标识、坐标经度、坐标纬度、覆盖范围、测量样本数、坐标…

开源订货系统哪个好 三大订货系统源码推荐

在数字化转型加速的今天&#xff0c;企业对订货系统的需求日益增长。一款优质的订货系统源码不仅能提升供应链效率&#xff0c;还能通过二次开发满足个性化业务需求。这里结合 “标准化、易扩展” 两大核心要求&#xff0c;为您精选三款主流订货系统源码&#xff0c;助您快速搭…

实现插入排序

#include <bits/stdc.h> using namespace std; int a[100005],n; void charu() {for(int i1;i<n;i){int keya[i]; //key表示要排序的数字的数值int ji-1; //j表示前面已排序好的数的个数while(j>0&&key<a[j]){a[j1]a[j]; //把a[j]往后挪j--;}a[j1]…

ROM修改进阶教程------修改安卓机型SELinux宽容的几种方式方法 以及第三方系统中如何关闭SELinux宽容

SELinux是一种强制访问控制安全机制,用于增强Linux系统的安全性。在某些情况下,可能需要对 SELinux 进行宽容设置,以满足特定的应用需求。当SELinux处于宽容模式时,系统允许违反安全策略的行为发生,但不会阻止这些行为,通常会在日志中记录这些违规事件。这与强制模式不同…

MySQL索引数据结构

目录 1 索引常用的数据结构 1.1 二叉树 1.2 平衡二叉树 1.3 红黑树 1.3 Hash表 1.4 B树 1.4 B树 2 MySQL索引的数据结构 2.1 MyISAM存储引擎索引 2.2 InnoDB存储引擎索引 2.2.1 聚集索引 2.2.2 非聚集索引 2.2.3 联合索引数 2.2.4 hash索引 1 索引常用的数据结构 1.1 二叉树 二…

分布式锁—7.Curator的分布式锁一

大纲 1.Curator的可重入锁的源码 2.Curator的非可重入锁的源码 3.Curator的可重入读写锁的源码 4.Curator的MultiLock源码 5.Curator的Semaphore源码 1.Curator的可重入锁的源码 (1)InterProcessMutex获取分布式锁 (2)InterProcessMutex的初始化 (3)InterProcessMutex.…

【高级篇】大疆Pocket 3加ENC编码器实现无线RTMP转HDMI进导播台

【高级篇】大疆Pocket 3加ENC编码器实现无线RTMP转HDMI进导播台 文章目录 准备工作连接设备RTMP概念ENCSHV2推流地址设置大疆Pocket 3直播设置总结 老铁们好&#xff01; 很久没写软文了&#xff0c;今天给大家带了一个干货&#xff0c;如上图&#xff0c;大疆Pocket 3加ENC编…

VS Code连接服务器教程

VS Code是什么 VS Code&#xff08;全称 Visual Studio Code&#xff09;是一款由微软推出的免费、开源、跨平台的代码编辑神器。VS Code 支持 所有主流操作系统&#xff0c;拥有强大的功能和灵活的扩展性。 官网&#xff1a;https://code.visualstudio.com/插件市场&#xff1…

Ubuntu-docker安装mysql

只记录执行步骤。 1 手动下载myql镜像&#xff08;拉去华为云镜像&#xff09; docker pull swr.cn-east-3.myhuaweicloud.com/library/mysql:latest配置并启动mysql 在opt下创建文件夹 命令&#xff1a;cd /opt/ 命令&#xff1a;mkdir mysql_docker 命令&#xff1a;cd m…

【MySQL】事务|概念|如何回滚|基本特性|MySQL事务隔离性具体怎么实现的

目录 1.为啥引入 2.是啥 3.如何回滚&#xff08;日志&#xff09; &#x1f525;4.面试题&#xff1a;谈谈事务的基本特性 &#xff08;1&#xff09;原子性 &#xff08;2&#xff09;一致性&#xff08;收入和支出相匹配&#xff09; &#xff08;3&#xff09;持久性…

C语言中的选择结构:决策的艺术

目录 一、选择结构的概念与意义 二、if语句 1. 基本语法 2. 示例代码 三、if-else语句 1. 基本语法 2. 示例代码 3. 嵌套if-else语句 四、switch语句 1. 基本语法 2. 示例代码 五、选择结构的注意事项 1. 条件表达式的正确性 2. if-else语句的配对问题 3. switch…

【0013】Python数据类型-列表类型详解

如果你觉得我的文章写的不错&#xff0c;请关注我哟&#xff0c;请点赞、评论&#xff0c;收藏此文章&#xff0c;谢谢&#xff01; 本文内容体系结构如下&#xff1a; Python列表&#xff0c;作为编程中的基础数据结构&#xff0c;扮演着至关重要的角色。它不仅能够存储一系…

SwanLab简明教程:从萌新到高手

目录 1. 什么是SwanLab&#xff1f; 1.1 核心特性 2. 安装SwanLab 3. 登录SwanLab账号&#xff08;云端版&#xff09; 4. 5分钟快速上手 更多案例 5. SwanLab功能组件 5.1 图表视图 5.2 表格视图 5.3 硬件监控 5.4 环境记录 5.5 组织协同 6. 训练框架集成 6.1 基…

TCP7680端口是什么服务

WAF上看到有好多tcp7680端口的访问信息 于是上网搜索了一下&#xff0c;确认TCP7680端口是Windows系统更新“传递优化”功能的服务端口&#xff0c;个人理解应该是Windows利用这个TCP7680端口&#xff0c;直接从内网已经具备更新包的主机上共享下载该升级包&#xff0c;无需从微…

【SegRNN 源码理解】【今天不水文系列】编码器部分理解

我来小小的理解一下&#xff1a; 首先&#xff0c;16 batchsize&#xff0c;60sequendcelength&#xff0c;7 个特征的通俗解释 16 个独立的样本&#xff0c;每个样本有 60 个连续的时间步及对应的标签值&#xff0c;每个时间步有 60 个特征 所以就是因为样本是随机从训练集…

【CUDA】Reduce归约求和(下)

目录 前言1. 优化技巧4&#xff1a;展开最后一个warp减少同步2. 优化技巧5&#xff1a;完全展开循环3. 优化技巧6&#xff1a;调节GridSize和BlockSize4. 优化技巧7&#xff1a;使用shuffle指令5. 拓展—CUDA工具链的使用结语下载链接参考 前言 学习 UP 主 比飞鸟贵重的多_HKL …

IDE集成开发环境MyEclipse中安装SVN

打开Myeclipse的help菜单----install from site 点击add弹出对话框 在输入框中输入对应内容 http://subclipse.tigris.org/update_1.10.x 点击OK之后&#xff0c;会刷新出两个选项&#xff0c;需要选中的 点击next&#xff0c;出现许可的时候选中同意&#xff0c;一直结束等…

如何计算两个向量的余弦相似度

参考笔记&#xff1a; https://zhuanlan.zhihu.com/p/677639498 日常学习之&#xff1a;如何计算两个向量或者矩阵的余弦相似度-CSDN博客 1.余弦相似度定理 百度的解释&#xff1a;余弦相似度&#xff0c;又称为余弦相似性&#xff0c;是通过计算两个向量的夹角余弦值来评估…

国产编辑器EverEdit - 宏功能介绍

1 宏 1.1 应用场景 宏是一种重复执行简单工作的利器&#xff0c;可以让用户愉快的从繁琐的工作中解放出来&#xff0c;其本质是对键盘和菜单的操作序列的录制&#xff0c;并不会识别文件的内容&#xff0c;属于无差别无脑执行。 特别是对一些有规律的重复按键动作&#xff0c;…