大劫大难以后,人不该失去锐气,不该失去热度,你镇定了却依旧燃烧,你平静了却依旧浩荡,致那个从绝望中走出来的自己,共勉
—— 25.1.31
使用深度学习在文本匹配任务上主要有两种方式:① 表示型 ② 交互型
一、实现方式 ① 表示型文本匹配
表示型文本匹配要训练的目标是:得到一个编码器,用来把一句话转化为向量
实际训练中,通常会共享一个红框内的编码器 / 表示层(可以看作一个完整的模型:输入文本过完embedding嵌入层后过一个网络层,最终输出一句话的向量),在训练时,我们通常输入两句话过同一个模型(参数共享),分别编码这两句话,得到两个向量,经过一个 matching layer 匹配层(相似度计算)得到一个分数用来衡量两向量的相似度,若两句话向量(句子语义)相似,则分数接近为1,若两句话向量不相似(语义不相似),则分数接近为0
表示层文本匹配主要训练的是文本转换成向量的这一模型部分
两边的表示层是参数共享的两个模型,分别编码输入的两句话,用来把输入的话转成向量形式
1.matching layer 匹配层 —— 方式 Ⅰ、基于文本相似度
① 计算文本相似度 方式Ⅰ:拼接两个句向量过网络层
输入两个文本过Embedding层得到两个向量,拼接两向量(拼接两向量与两向量之差 u,v,u-v),然后过一个线性层Softmax归一化,映射到 0 或 1 ,然后用 均方差MSE 或 交叉熵 损失函数计算 Loss,然后根据梯度进行迭代优化,给它提供足够多的句子对,就可以做训练
对于两个匹配的样本,预期输出分值为 1,
对于两个不匹配的样本,预期输出分值为 0,
本质上相当于二分类任务(两句话相似或不相似)
编码器部分可以使用 Bert+Pooling,也可以用 LSTM,也可以用 CNN、GatedCNN、RNN 等其他模型结构,得到两句话对应的向量
两向量的拼接方式也有多种,在实际场景中最好针对不同场景进行训练实验对比获得答案
只要编码器 Embedding 部分正确, 两向量无论怎么组合,数据都可以得到有效的训练
② 计算文本相似度 方式Ⅱ:余弦相似度计算
编码层也可以简单点,不使用向量拼接过网络层,而是直接过一个余弦相似度的计算,利用Cosine Embedding Loss做训练
对于两个相似的样本匹配预期输出分值为 1
对于两个不相似的样本匹配预期输出分值为 -1
本质上相当于2分类任务(两句话相似或是不相似)
Cosine Embedding Loss
Cosine Embedding Loss:是一种用于衡量两个向量相似性的损失函数,通过余弦相似度结合标签信息(相似/不相似)来优化模型。
- 相似样本(标签
y=1
):迫使它们的余弦相似度趋近于 1(即方向完全一致)。 - 不相似样本(标签
y=-1
):迫使它们的余弦相似度低于某个阈值 margin(默认为 0)。 - margin:惩罚项/正则项(实际训练中,一般会存储为一个默认值:例如0.1),作用:① 允许不相似样本的余弦相似度在不超过
margin
的情况下被忽略,避免过度优化。② 较大的margin
迫使模型更严格地区分不相似样本(例如,设margin=0.5
时,不相似样本间的相似度必须低于 0.5)
公式:
其中,cos(x_1, x_2) 表示两个向量的余弦相似度
训练两个文本向量的网络是同一个网络,权重参数共享,又被称为 孪生网络
2.matching layer 匹配层 —— 方式 Ⅱ、Triplet Loss
① 训练目标:
1.使具有相同标签的样本在 embedding空间 尽量接近
2.使具有不同标签的样本在 embedding空间 尽量远离
3.通过调整模型的权重来改变三者在向量空间的位置关系
例:
设计一个Loss函数,目标:衡量三个向量在空间中位置的相互关系
Anchor:某句话对应的向量在embedding空间中对应的位置
Positive:与这句话语义相同的向量
Negative:与这句话语义不同的向量
② Triplet Loss:
输入是一个三元组<a,p,n>
a:anchor 原点(任意一个样本)
p:positive 与a同一类别的样本
n:negative 与a不同类别的样本
在 Embedding 空间中,三元组损失函数为:L = max(d(a, p) - d(a, n) + margin, 0)
公式:
训练目标:使 相似文本 a,p 之间的距离 小于 不相似文本 a,n 之间的距离
d:表示两向量间的一个距离函数
d(a, p):任意一个样本 a 和 与其同一类别的样本 p 两个文本转成的向量间的向量距离
d(a, n):任意一个样本 a 和 与其不同类别的样本 n 两个文本转成的向量间的向量距离
margin:惩罚项/正则项(实际训练中,一般会存储为一个默认值:例如0.1),作用:① 即使两距离相等,还是会产生 margin 大小的Loss,只有当相似文本间的距离 小于 不相似文本间的距离 + margin 时,Loss才会为 0 ② 较大的 margin
迫使模型更严格地区分不相似样本(例如:设 margin=0.5
时,相似样本间距离差 和 不相似样本间距离差必须低于0.5,才会停止迭代优化)
Triplet Loss中,margin的作用
1. 控制正负样本的最小间隔
Margin 定义了正样本对(anchor 与 positive)和负样本对(anchor 与 negative)之间的最小距离差。其核心目标是确保在嵌入空间中,正样本与锚点的距离不仅要小于负样本与锚点的距离,还要至少保持一个固定的间隔(即 margin)
- 公式作用:当
d(a, p) - d(a, n) + margin < 0
时,损失为 0,此时模型已满足“正样本距离更近,负样本距离更远”的要求。反之,若未满足,则通过损失函数强制优化- 物理意义:Margin 类似于支持向量机(SVM)中的间隔概念,通过设定一个“安全区域”,避免不同类别的样本在嵌入空间中过于接近
2. 防止模型学习到退化解
如果没有 Margin(或 Margin=0),模型可能将所有样本映射到同一个点(即
d(a, p) = d(a, n) = 0
),此时损失恒为 0,但模型完全失去区分能力。Margin 的引入迫使模型必须拉开正负样本的距离差异,从而学习到更有判别性的特征表示
3. 平衡训练难度
Margin 的大小直接影响训练的难易程度:
过大:模型需要更大的距离差才能满足条件,可能导致训练困难(损失长期不收敛)或过拟合
过小:模型容易满足条件,但学到的特征区分度不足实践中
Margin 的大小通常通过实验调整,例如在人脸识别任务中常用 0.2~1.0 的范围
4. 动态筛选有效三元组
Margin 帮助模型自动忽略“简单样本”(如负样本已足够远离锚点),而专注于优化“困难样本”(如负样本距离较近或与正样本距离差异不足)
例如:
- 当
d(a, n) > d(a, p) + margin
:损失为 0,无需优化。- 当
d(a, n) < d(a, p) + margin
:损失为正,需通过梯度下降调整参数
5. 与距离度量的关系
Margin 的作用依赖于具体使用的距离函数:
欧氏距离:Margin 是绝对距离差阈值。
余弦相似度:Margin 转化为相似度差异(如要求负样本的相似度比正样本低至少一个 Margin)
在代码实现中,通常会对嵌入向量进行归一化(如 L2 归一化),以确保不同距离度量下 Margin 的物理意义一致
最小化L: d(a, p) —> 0,d(a, n) —> margin,则L最小化
Triplet Loss同样适用于人脸识别模型(人脸匹配)的训练
同一个人不同角度可以照出多张图片,同一个意图也可以用多个文本(问法)表示
人脸识别是CV领域的文本匹配,文本匹配是NLP领域的人脸识别
3.表示型文本匹配 —— 代码示例 🚀
Ⅰ、配置文件 config.py
① 路径相关参数
model_path:指定训练后模型的保存路径。若加载预训练模型,需提供模型文件的具体存储位置
schema_path:定义数据结构的配置文件路径,通常用于数据预处理或验证输入格式(如JSON/YAML文件)
train_data_path:指向训练集的数据文件或目录。若为目录,需包含多个训练文件(如文本、图像等)
valid_data_path:指向验证集的数据文件或目录。若为目录,需包含多个训练文件(如文本、图像等)
vocab_path:词表文件路径,用于自然语言处理任务中定义词汇的映射关系(如将单词转为ID)
② 模型结构参数
max_length:序列数据的最大长度(如文本的单词数)。超过此长度的序列会被截断或填充
hidden_size:神经网络隐藏层的维度大小,影响模型的表达能力。例如,LSTM或Transformer中每层的神经元数量
③ 训练控制参数
epoch:训练过程中遍历整个数据集的次数。适当增加轮次可提升模型性能,但可能过拟合
batch_size:每次输入模型的样本数量。较大的批次可加速训练,但需更多显存
epoch_data_size:每轮训练中采样的数量
positive_sample_rate:正样本在批次中的占比,常用于不平衡数据任务(如分类)。需结合负样本比例调整
④ 优化和学习参数
optimizer:选择优化算法(如SGD、Adam、AdamW),影响参数更新策略
learning_rate:控制参数更新的步长。学习率过高可能导致震荡,过低则收敛缓慢
# -*- coding: utf-8 -*-
"""
配置参数信息
"""
Config = {
"model_path": "model_output",
"schema_path": "../data/schema.json",
"train_data_path": "../data/train.json",
"valid_data_path": "../data/valid.json",
"vocab_path":"../chars.txt",
"max_length": 20,
"hidden_size": 128,
"epoch": 10,
"batch_size": 32,
"epoch_data_size": 200, #每轮训练中采样数量
"positive_sample_rate":0.5, #正样本比例
"optimizer": "adam",
"learning_rate": 1e-3,
}
Ⅱ、数据加载 loader.py
① 初始化 def __init__()
属性 | 类型 | 描述 |
---|---|---|
config | 字典 | 存储传入的配置字典。 |
path | 字符串 | 数据存储的路径。 |
vocab | 列表或字典 | 从 config["vocab_path"] 加载的词汇表。 |
config["vocab_size"] | 整数 | 词汇表的大小,通过 len(self.vocab) 计算得到。 |
schema | 字典或其他类型 | 从 config["schema_path"] 加载的模式(schema)。 |
train_data_size | 整数 | 每个 epoch 的采样数量,用于控制随机采样的数据量。 |
data_type | 字符串 | 标识当前加载的数据类型,可以是 "train" 或 "test" |
load_vocab():加载字词表文件
load_schema():加载schema文件
load():在文件中加载数据
len(): Python 内置函数,用于返回对象的长度或元素数量。它适用于多种数据类型,包括字符串、列表、元组、字典、集合等。
参数名 | 类型 | 描述 |
---|---|---|
obj | 可迭代对象 | 需要计算长度的对象,如字符串、列表、元组、字典、集合等。 |
class DataGenerator:
def __init__(self, data_path, config):
self.config = config
self.path = data_path
self.vocab = load_vocab(config["vocab_path"])
self.config["vocab_size"] = len(self.vocab)
self.schema = load_schema(config["schema_path"])
self.train_data_size = config["epoch_data_size"] #由于采取随机采样,所以需要设定一个采样数量,否则可以一直采
self.data_type = None #用来标识加载的是训练集还是测试集 "train" or "test"
self.load()
② 数据集加载(训练 / 测试)def load()
初始化:测试数据集 self.data 和 标准问题与该标签所有问题编码结果的字典 self.knwb
打开文件并逐行读取:使用with open
语句打开指定路径的数据文件,编码格式为utf8
,确保文件内容可以正确读取
对每一行进行 JSON 解析:从解析后的JSON数据中提取tag
和title
字段,并将tag
转换为对应的索引label
(通过self.label_to_index
字典映射)。
判断数据类型并加载数据:根据每一行的数据格式判断是训练集还是测试集。
- 如果读取的行是一个字典(即训练集),首先设置
self.data_type
为"train"
。 - 然后提取该行的
"questions"
键对应的值(这是一个问题列表)以及"target"
键对应的值(这是一个标签)。 - 对于问题列表中的每一个问题,先调用
self.encode_sentence(question)
方法对其进行词汇编码,然后再将其转换为torch.LongTensor
格式,以适应后续的模型输入。 - 最后,将编码后的问题添加到
self.knwb
字典中对应标签的列表里。 - 如果读取的行是一个列表(即测试集),首先设置
self.data_type
为"test"
。 - 然后使用
assert
语句确保该行确实是一个列表,并从中提取问题和标签。 - 对问题进行词汇编码,并转换为
torch.LongTensor
格式。 - 同样对标签进行处理,根据
self.schema
字典将标签转换为索引,并也转换为torch.LongTensor
格式。 - 最后,将编码后的问题和标签索引作为一个样本添加到
self.data
列表中。
isinstance():Python 内置函数,用于检查一个对象是否属于某个类型或其子类的实例。
参数名 | 类型 | 描述 |
---|---|---|
object | 任意对象 | 需要检查的对象。 |
classinfo | 类型或元组 | 可以是一个类型(如 int 、str 等)或一个类型元组。如果 classinfo 是元组,isinstance() 会检查对象是否属于元组中任意一个类型的实例。 |
assert:Python 中的调试工具,用于检查某个条件是否为真。如果条件为假,assert
会抛出 AssertionError
异常。
torch.LongTensor():创建一个包含整数的 PyTorch 张量(Tensor),元素类型为 torch.int64
参数名 | 类型 | 描述 |
---|---|---|
data | list 或 array | 包含整数的列表或数组,用于初始化张量。 |
列表.append():在列表的末尾添加一个元素。
参数名 | 类型 | 描述 |
---|---|---|
element | any | 要添加到列表末尾的元素,可以是任意类型(如整数、字符串、列表等)。 |
def load(self):
self.data = []
self.knwb = defaultdict(list)
with open(self.path, encoding="utf8") as f:
for line in f:
line = json.loads(line)
#加载训练集
if isinstance(line, dict):
self.data_type = "train"
questions = line["questions"]
label = line["target"]
for question in questions:
input_id = self.encode_sentence(question)
input_id = torch.LongTensor(input_id)
self.knwb[self.schema[label]].append(input_id)
#加载测试集
else:
self.data_type = "test"
assert isinstance(line, list)
question, label = line
input_id = self.encode_sentence(question)
input_id = torch.LongTensor(input_id)
label_index = torch.LongTensor([self.schema[label]])
self.data.append([input_id, label_index])
return
③ 文本编码 def encode_sentence()
初始化空列表 input_id:用于存储编码后的结果
判断使用词表文件还是字符表:如果是此表文件,则使用jieba对输入文本text进行分词,如果指向的是字符表文件,则直接对每个字符进行编码
对输入序列进行填充或截断:将编码后的序列input_id
传递给padding
方法。这个方法会确保每个序列的长度都等于config["max_length"]
字典.get():dict.get()
是 Python 字典的内置方法,用于安全地获取字典中指定键的值。如果键存在,则返回对应的值;如果键不存在,则返回默认值(如果未提供默认值,则返回 None
)。它的主要作用是避免直接使用 dict[key]
时可能引发的 KeyError
异常。
参数名 | 类型 | 描述 |
---|---|---|
key | 任意类型 | 需要查找的键。 |
default | 任意类型 | 可选,如果键不存在时返回的默认值。默认为 None 。 |
列表.append(): Python 列表(list
)对象的一个方法,用于在列表的末尾添加一个元素。它会直接修改原列表,而不是返回一个新的列表。
参数名 | 类型 | 描述 |
---|---|---|
element | 任意类型 | 要添加到列表末尾的元素。可以是整数、字符串、列表、字典、元组等 |
def encode_sentence(self, text):
input_id = []
if self.config["vocab_path"] == "words.txt":
for word in jieba.cut(text):
input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
else:
for char in text:
input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
input_id = self.padding(input_id)
return input_id
④ 数据规范 def padding()
截取 input_id 的前 max_length 个元素
如果 input_id 长度不足 max_length,则用 0 填充至 max_length
返回处理后的 input_id
len(): Python 内置函数,用于返回对象的长度或元素数量。它适用于多种数据类型,包括字符串、列表、元组、字典、集合等。
参数名 | 类型 | 描述 |
---|---|---|
obj | 可迭代对象 | 需要计算长度的对象,如字符串、列表、元组、字典、集合等。 |
#补齐或截断输入的序列,使其可以在一个batch内运算
def padding(self, input_id):
input_id = input_id[:self.config["max_length"]]
input_id += [0] * (self.config["max_length"] - len(input_id))
return input_id
⑤ 返回数据集的长度 def __len__()
该函数的作用是返回对象的“长度”,具体行为取决于对象的 data_type
属性。
如果 data_type
为 "train"
,则返回 self.config["epoch_data_size"]
的值,这通常表示训练数据的规模。
如果 data_type
为 "test"
,则返回 self.data
的长度,即测试数据的元素个数。
如果 data_type
不是 "train"
或 "test"
,则会触发 AssertionError
,并提示 self.data_type
的值。
assert:Python 中的调试工具,用于检查某个条件是否为真。如果条件为假,assert
会抛出 AssertionError
异常。
len(): Python 内置函数,用于返回对象的长度或元素数量。它适用于多种数据类型,包括字符串、列表、元组、字典、集合等。
参数名 | 类型 | 描述 |
---|---|---|
obj | 可迭代对象 | 需要计算长度的对象,如字符串、列表、元组、字典、集合等。 |
def __len__(self):
if self.data_type == "train":
return self.config["epoch_data_size"]
else:
assert self.data_type == "test", self.data_type
return len(self.data)
⑥ 通过索引访问数据集中的样本 def __getitem__()
判断数据类型: 如果是训练数据集,则调用self.random_train_sample()
方法来随机生成一个训练样本。这是因为训练数据集中的样本是通过随机采样生成的,而不是固定的数据。
如果不是训练数据集,则认为是测试数据集:直接返回self.data
列表中索引为index
的样本。
def __getitem__(self, index):
if self.data_type == "train":
return self.random_train_sample() #随机生成一个训练样本
else:
return self.data[index]
⑦ ⭐随机生成训练样本
两两为输入,基于文本相似度的训练方式
如果决定生成正样本:则从 standard_question_index
中随机选择一个类别 p
。接着,检查该类别下的问题数量是否大于等于2。
如果该类别下的问题数量不足2个,则重新调用 random_train_sample()
方法,即重新随机生成一个样本。否则,从类别 p
中随机选取两个问题 s1
和 s2
,并返回这两个问题及其标签 [s1, s2, torch.LongTensor([1])]
。这里的标签 1
表示正样本,即这两个问题是属于同一类别的。
如果决定生成负样本:则从 standard_question_index
中随机选择两个不同的类别 p
和 n
。接着,从每个类别中随机选取一个问题,分别为 s1
和 s2
,并返回这两个问题及其标签 [s1, s2, torch.LongTensor([-1])]
。这里的标签 -1
表示负样本,即这两个问题不属于同一类别
list():将可迭代对象(如字符串、元组、集合等)转换为列表。
参数名 | 类型 | 描述 |
---|---|---|
iterable | 可迭代对象 | 需要转换为列表的可迭代对象,如字符串、元组、集合等。 |
字典.keys():返回字典中所有键的视图对象。
random.random():生成一个范围在 [0.0,1.0) 之间的随机浮点数。
random.sample(): 从序列中随机选择指定数量的唯一元素,返回一个新列表。
参数名 | 类型 | 描述 |
---|---|---|
population | 序列 | 需要从中选择的序列,如列表、元组等。 |
k | 整数 | 需要选择的元素数量。 |
random.choice():从非空序列中随机选择一个元素。
参数名 | 类型 | 描述 |
---|---|---|
seq | 序列 | 需要从中选择的非空序列,如列表、元组等。 |
torch.LongTensor():创建一个包含整数的张量,元素类型为 torch.int64
。
参数名 | 类型 | 描述 |
---|---|---|
data | 列表或数组 | 包含整数的数据,用于创建张量。 |
#依照一定概率生成负样本或正样本
#负样本从随机两个不同的标准问题中各随机选取一个
#正样本从随机一个标准问题中随机选取两个
def random_train_sample(self):
standard_question_index = list(self.knwb.keys())
#随机正样本
if random.random() <= self.config["positive_sample_rate"]:
p = random.choice(standard_question_index)
#如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
if len(self.knwb[p]) < 2:
return self.random_train_sample()
else:
s1, s2 = random.sample(self.knwb[p], 2)
return [s1, s2, torch.LongTensor([1])]
#随机负样本
else:
p, n = random.sample(standard_question_index, 2)
s1 = random.choice(self.knwb[p])
s2 = random.choice(self.knwb[n])
return [s1, s2, torch.LongTensor([-1])]
⑧ 数据处理
加载词汇表:从指定路径的文件中读取字表或词表,并将其转化为一个字典,其中字或词作为键,对应的索引作为值。该函数确保索引从 1 开始,因为索引 0 被保留用于填充(padding)操作。这个字典在后续的数据编码过程中会被频繁使用,能够将文本中的每个字或词映射为唯一的数字索引,便于计算机处理。
open():用于打开文件并返回文件对象,支持读取、写入等文件操作。
参数名 | 类型 | 描述 |
---|---|---|
file | 字符串 | 文件路径(包括文件名),可以是绝对路径或相对路径。 |
mode | 字符串 | 文件打开模式,默认为 'r' (只读)。常见模式包括 'r' 、'w' 、'a' 等。 |
buffering | 整数 | 缓冲设置,默认为 -1 (系统默认)。 |
encoding | 字符串 | 文件编码方式,默认为 None 。 |
errors | 字符串 | 编码错误的处理方式,默认为 None 。 |
newline | 字符串 | 换行符设置,默认为 None 。 |
closefd | 布尔值 | 控制 file 参数的传入值类型,默认为 True 。 |
enumerate():为可迭代对象(如列表、元组等)添加索引,返回一个枚举对象,生成 (index, value)
对。
参数名 | 类型 | 描述 |
---|---|---|
iterable | 可迭代对象 | 需要枚举的对象,如列表、元组等。 |
start | 整数 | 索引的起始值,默认为 0 。 |
strip():去除字符串开头和结尾的指定字符(默认去除空白字符,如空格、换行符等)。
参数名 | 类型 | 描述 |
---|---|---|
chars | 字符串 | 可选,指定要删除的字符。默认为去除空白字符。 |
#加载字表或词表
def load_vocab(vocab_path):
token_dict = {}
with open(vocab_path, encoding="utf8") as f:
for index, line in enumerate(f):
token = line.strip()
token_dict[token] = index + 1 #0留给padding位置,所以从1开始
return token_dict
加载schema:打开schema_path路径下的文件,使用utf8编码读取文件内容;使用json.loads将读取的内容解析为Python对象并返回
open():用于打开文件并返回文件对象,支持读取、写入等文件操作
参数名 | 类型 | 描述 |
---|---|---|
file | 字符串 | 文件路径(包括文件名),可以是绝对路径或相对路径。 |
mode | 字符串 | 文件打开模式,默认为 'r' (只读)。常见模式包括 'r' 、'w' 、'a' 等。 |
buffering | 整数 | 缓冲设置,默认为 -1 (系统默认)。 |
encoding | 字符串 | 文件编码方式,默认为 None 。 |
errors | 字符串 | 编码错误的处理方式,默认为 None 。 |
newline | 字符串 | 换行符设置,默认为 None 。 |
closefd | 布尔值 | 控制 file 参数的传入值类型,默认为 True 。 |
json.loads():将 JSON 格式的字符串解析为 Python 对象(如字典、列表等)。
参数名 | 类型 | 描述 |
---|---|---|
s | 字符串 | 要解析的 JSON 字符串。 |
encoding | 字符串 | 字符编码(Python 3 中已弃用)。 |
cls | 类 | 自定义解码类,默认为 None 。 |
object_hook | 函数 | 可选函数,允许自定义将 JSON 对象转换为其他类型的 Python 对象。 |
parse_float | 函数 | 自定义将 JSON 中的浮点数转换为特定类型。 |
parse_int | 函数 | 自定义将 JSON 中的整数转换为特定类型。 |
object_pairs_hook | 函数 | 用于处理 JSON 对象中的键值对,默认返回字典。 |
文件对象.read():从文件中读取指定数量的字节或整个文件内容。
参数名 | 类型 | 描述 |
---|---|---|
size | 整数 | 可选,指定要读取的字节数。如果未指定或为负数,则读取整个文件内容。 |
#加载schema
def load_schema(schema_path):
with open(schema_path, encoding="utf8") as f:
return json.loads(f.read())
封装数据加载器:根据给定的配置,将数据文件加载并编码成模型可识别的格式,然后通过DataLoader
类封装成一个数据加载器。
DataLoader():PyTorch 中的一个标准工具,用于高效地加载和处理数据。它支持批量加载、数据打乱、多线程加载等功能,是深度学习训练中常用的数据加载器。
参数名 | 类型 | 描述 |
---|---|---|
dataset | 数据集对象 | 要加载的数据集,必须实现 __len__ 和 __getitem__ 方法。 |
batch_size | 整数 | 每个批次的大小,默认为 1 。 |
shuffle | 布尔值 | 是否在每个 epoch 开始时打乱数据顺序,默认为 False 。 |
sampler | 采样器对象 | 自定义采样器,用于控制数据的采样方式,默认为 None 。 |
batch_sampler | 批次采样器对象 | 自定义批次采样器,用于控制批次的采样方式,默认为 None 。 |
num_workers | 整数 | 用于数据加载的子进程数量,默认为 0 (主进程加载)。 |
collate_fn | 函数 | 用于将一个批次的数据合并成一个张量或元组,默认为 None 。 |
pin_memory | 布尔值 | 是否将数据存储在 pin memory 中(用于 GPU 加速),默认为 False 。 |
drop_last | 布尔值 | 如果数据不能完全分成批次,是否删除最后一个不完整的批次,默认为 False 。 |
timeout | 整数 | 数据加载的最大等待时间(秒),默认为 0 (无限制)。 |
worker_init_fn | 函数 | 用于初始化每个数据加载器子进程的函数,默认为 None 。 |
#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
dg = DataGenerator(data_path, config)
dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
return dl
⑨ 加载数据文件
# -*- coding: utf-8 -*-
import json
import re
import os
import torch
import random
import jieba
import numpy as np
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
"""
数据加载
"""
class DataGenerator:
def __init__(self, data_path, config):
self.config = config
self.path = data_path
self.vocab = load_vocab(config["vocab_path"])
self.config["vocab_size"] = len(self.vocab)
self.schema = load_schema(config["schema_path"])
self.train_data_size = config["epoch_data_size"] #由于采取随机采样,所以需要设定一个采样数量,否则可以一直采
self.data_type = None #用来标识加载的是训练集还是测试集 "train" or "test"
self.load()
def load(self):
self.data = []
self.knwb = defaultdict(list)
with open(self.path, encoding="utf8") as f:
for line in f:
line = json.loads(line)
#加载训练集
if isinstance(line, dict):
self.data_type = "train"
questions = line["questions"]
label = line["target"]
for question in questions:
input_id = self.encode_sentence(question)
input_id = torch.LongTensor(input_id)
self.knwb[self.schema[label]].append(input_id)
#加载测试集
else:
self.data_type = "test"
assert isinstance(line, list)
question, label = line
input_id = self.encode_sentence(question)
input_id = torch.LongTensor(input_id)
label_index = torch.LongTensor([self.schema[label]])
self.data.append([input_id, label_index])
return
def encode_sentence(self, text):
input_id = []
if self.config["vocab_path"] == "words.txt":
for word in jieba.cut(text):
input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
else:
for char in text:
input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
input_id = self.padding(input_id)
return input_id
#补齐或截断输入的序列,使其可以在一个batch内运算
def padding(self, input_id):
input_id = input_id[:self.config["max_length"]]
input_id += [0] * (self.config["max_length"] - len(input_id))
return input_id
def __len__(self):
if self.data_type == "train":
return self.config["epoch_data_size"]
else:
assert self.data_type == "test", self.data_type
return len(self.data)
def __getitem__(self, index):
if self.data_type == "train":
return self.random_train_sample() #随机生成一个训练样本
else:
return self.data[index]
#依照一定概率生成负样本或正样本
#负样本从随机两个不同的标准问题中各随机选取一个
#正样本从随机一个标准问题中随机选取两个
def random_train_sample(self):
standard_question_index = list(self.knwb.keys())
#随机正样本
if random.random() <= self.config["positive_sample_rate"]:
p = random.choice(standard_question_index)
#如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
if len(self.knwb[p]) < 2:
return self.random_train_sample()
else:
s1, s2 = random.sample(self.knwb[p], 2)
return [s1, s2, torch.LongTensor([1])]
#随机负样本
else:
p, n = random.sample(standard_question_index, 2)
s1 = random.choice(self.knwb[p])
s2 = random.choice(self.knwb[n])
return [s1, s2, torch.LongTensor([-1])]
#加载字表或词表
def load_vocab(vocab_path):
token_dict = {}
with open(vocab_path, encoding="utf8") as f:
for index, line in enumerate(f):
token = line.strip()
token_dict[token] = index + 1 #0留给padding位置,所以从1开始
return token_dict
#加载schema
def load_schema(schema_path):
with open(schema_path, encoding="utf8") as f:
return json.loads(f.read())
#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
dg = DataGenerator(data_path, config)
dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
return dl
if __name__ == "__main__":
from config import Config
dg = DataGenerator(r"F:\人工智能NLP\NLP\Day8_文本匹配问题\data\train.json", Config)
print(dg[1])
Ⅲ、模型定义 model.py
① 生成句子的嵌入表示(文本转向量)
模型初始化
hidden_size
:隐藏层的大小,决定了 Embedding 和线性层的输出维度。vocab_size
:词汇表的大小,加 1 是为了处理填充索引(padding_idx=0
)。max_length
:句子的最大长度(虽然代码中未直接使用,但可能是为后续处理预留的)。self.embedding
:nn.Embedding
层:将词索引映射为hidden_size
维的向量。self.layer
:nn.Linear
层:对 Embedding 层的输出进行线性变换。self.dropout
:nn.Dropout
层,设置丢弃率为 0.5。
nn.Embedding():用于将离散的类别索引(如单词索引)映射到连续的稠密向量空间中。它常用于自然语言处理(NLP)任务中,将单词或其他类别信息转换为向量表示。
参数名 | 类型 | 描述 |
---|---|---|
num_embeddings | 整数 | 嵌入字典的大小,即不同类别的总数(如词汇表大小)。 |
embedding_dim | 整数 | 每个嵌入向量的维度。 |
padding_idx | 整数(可选) | 指定一个索引,用于填充(padding),该索引对应的嵌入向量不会被更新。 |
max_norm | 浮点数(可选) | 如果设置了该值,嵌入向量的范数会被裁剪到不超过 max_norm 。 |
norm_type | 浮点数(可选) | 用于裁剪范数的类型,默认为 2(L2 范数)。 |
scale_grad_by_freq | 布尔值(可选) | 如果为 True ,梯度会根据单词在 mini-batch 中出现的频率进行缩放。 |
sparse | 布尔值(可选) | 如果为 True ,梯度将变为稀疏张量,适用于大型词汇表以节省内存。 |
nn.Linear():是一个全连接层,用于对输入数据进行线性变换。它将输入特征与权重矩阵相乘,并加上偏置项,输出变换后的结果。
参数名 | 类型 | 描述 |
---|---|---|
in_features | 整数 | 输入特征的数量。 |
out_features | 整数 | 输出特征的数量。 |
bias | 布尔值(可选) | 是否使用偏置项,默认为 True 。 |
nn.Dropout(): 一种正则化技术,用于防止模型过拟合。它在训练过程中随机将一部分神经元的输出设置为 0,从而减少神经元之间的依赖性。
参数名 | 类型 | 描述 |
---|---|---|
p | 浮点数 | 神经元被丢弃的概率,默认为 0.5。 |
inplace | 布尔值(可选) | 是否就地操作,默认为 False 。 |
class SentenceEncoder(nn.Module):
def __init__(self, config):
super(SentenceEncoder, self).__init__()
hidden_size = config["hidden_size"]
vocab_size = config["vocab_size"] + 1
max_length = config["max_length"]
self.embedding = nn.Embedding(vocab_size, hidden_size, padding_idx=0)
# self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True, bidirectional=True)
self.layer = nn.Linear(hidden_size, hidden_size)
self.dropout = nn.Dropout(0.5)
前向传播
transpose():用于将数组或矩阵的行和列进行转置。例如,将行数据转换为列数据,或将列数据转换为行数据。
参数名 | 类型 | 描述 |
---|---|---|
array | 数组或矩阵 | 需要进行转置的数组或矩阵。 |
axes | 元组(可选) | 指定转置的轴顺序,默认为 None ,表示反转所有轴。 |
squeeze(): 用于移除数组中维度为 1 的维度,从而简化数组的形状。
参数名 | 类型 | 描述 |
---|---|---|
a | 数组 | 输入的数组。 |
axis | 整数或元组(可选) | 指定要移除的维度,默认为 None ,表示移除所有维度为 1 的轴。 |
shape(): 用于获取数组或矩阵的形状,返回一个表示各维度大小的元组。
参数名 | 类型 | 描述 |
---|---|---|
array | 数组或矩阵 | 需要获取形状的数组或矩阵。 |
#输入为问题字符编码
def forward(self, x):
x = self.embedding(x)
#使用lstm
# x, _ = self.lstm(x)
#使用线性层
x = self.layer(x)
x = nn.functional.max_pool1d(x.transpose(1, 2), x.shape[1]).squeeze()
return x
② 计算句子间相似度
模型初始化
super(SiameseNetwork, self).__init__()
:调用父类nn.Module
的初始化方法。self.sentence_encoder = SentenceEncoder(config)
:初始化一个SentenceEncoder
实例,用于对句子进行编码。self.loss = nn.CosineEmbeddingLoss()
:初始化余弦嵌入损失函数,用于计算两个句子向量之间的相似度损失。
nn.CosineEmbeddingLoss(): PyTorch 中的一个损失函数,用于衡量两个输入向量的相似性。它通过计算两个输入向量的余弦相似度来评估它们的相似性,并根据标签值(1 或 -1)调整损失。
- 如果 y=1,表示两个输入向量应相似,损失为 1−cos(x1,x2)。
- 如果 y=−1,表示两个输入向量应不相似,损失为 max(0,cos(x1,x2))。
参数名 | 类型 | 描述 |
---|---|---|
margin | 浮点数(可选) | 用于调整不相似样本的损失,默认为 0.0 。 |
reduction | 字符串(可选) | 指定损失的聚合方式,可选值为 'none' 、'mean' 或 'sum' ,默认为 'mean' 。 |
class SiameseNetwork(nn.Module):
def __init__(self, config):
super(SiameseNetwork, self).__init__()
self.sentence_encoder = SentenceEncoder(config)
self.loss = nn.CosineEmbeddingLoss()
计算余弦距离:计算两个输入张量之间的余弦距离。它首先对输入的张量进行归一化处理,确保每个向量的长度为 1,然后计算归一化张量之间的点积,最后通过 1 - 点积
转化为余弦距离。
torch.nn.functional.normalize():用于对输入张量在指定维度上进行 Lp 范数归一化。默认情况下,它使用 L2 范数(欧几里得范数)对向量进行归一化。
公式:
- v 是输入张量的某一维度上的向量。
- ||v||_p 是 Lp 范数。
- ϵ 是一个小值,用于避免除以零。
参数名 | 类型 | 描述 |
---|---|---|
input | Tensor | 输入张量。 |
p | float | 范数的指数值,默认为 2 (L2 范数)。 |
dim | int 或 tuple | 归一化的维度,默认为 1 。 |
eps | float | 用于避免除以零的小值,默认为 1e-12 。 |
out | Tensor(可选) | 输出张量。如果指定,操作不可微分。 |
torch.sum():计算输入张量在指定维度上的元素和。如果不指定维度,则计算所有元素的和。
参数名 | 类型 | 描述 |
---|---|---|
input | Tensor | 输入张量。 |
dim | int 或 tuple | 求和的维度,默认为 None (计算所有元素的和)。 |
keepdim | bool | 是否保留求和后的维度,默认为 False 。 |
dtype | dtype(可选) | 输出张量的数据类型,默认为 None 。 |
torch.mul():对两个张量进行逐元素乘法。如果输入张量的形状不同,会进行广播操作。
参数名 | 类型 | 描述 |
---|---|---|
input | Tensor | 第一个输入张量。 |
other | Tensor 或 float | 第二个输入张量或标量。 |
out | Tensor(可选) | 输出张量。 |
# 计算余弦距离 1-cos(a,b)
# cos=1时两个向量相同,余弦距离为0;cos=0时,两个向量正交,余弦距离为1
def cosine_distance(self, tensor1, tensor2):
tensor1 = torch.nn.functional.normalize(tensor1, dim=-1)
tensor2 = torch.nn.functional.normalize(tensor2, dim=-1)
cosine = torch.sum(torch.mul(tensor1, tensor2), axis=-1)
return 1 - cosine
计算Triplet_Loss三元组损失:三元组损失可以用于训练神经网络以学习特征向量之间的相对距离,确保对于锚点而言,正样本更接近(较小的余弦距离),而负样本更远(较大的余弦距离)。通过这种方式,可以有效地训练分类器或聚类模型,使得同一类的样本点之间的距离小于不同类的样本点之间的距离。这里的cosine_triplet_loss
的具体实现可以通过余弦距离来衡量样本之间的相似性,并通过margin
参数来控制正负样本之间距离的差距。
squeeze(): 用于移除数组中维度为 1 的维度,从而简化数组的形状。
参数名 | 类型 | 描述 |
---|---|---|
a | 数组 | 输入的数组。 |
axis | 整数或元组(可选) | 指定要移除的维度,默认为 None ,表示移除所有维度为 1 的轴。 |
torch.mean(): PyTorch 中的一个函数,用于计算张量(Tensor)的平均值。它可以计算整个张量所有元素的平均值,或者指定某个维度上的平均值。该函数在数据预处理、统计分析以及神经网络训练过程中计算损失等场景中非常有用
参数名 | 类型 | 描述 |
---|---|---|
input | Tensor | 需要计算平均值的输入张量。 |
dim | int 或 tuple of ints | 指定在哪个维度上计算平均值。例如,dim=0 表示按行计算,dim=1 表示按列计算。默认为 None ,计算所有元素的平均值。 |
keepdim | bool | 是否保持输出张量的维度数量与输入张量一致。默认为 False ,即减少维度数量。 |
out | Tensor (可选) | 指定输出的张量。如果提供,结果将存储在该张量中。 |
def cosine_triplet_loss(self, a, p, n, margin=None):
ap = self.cosine_distance(a, p)
an = self.cosine_distance(a, n)
if margin is None:
diff = ap - an + 0.1
else:
diff = ap - an + margin.squeeze()
return torch.mean(diff[diff.gt(0)]) #greater than
前向传播:处理单个或两个句子的输入。当输入两个句子时,模型会计算它们的向量表示,并根据是否提供标签计算损失或余弦距离;当只提供一个句子时,模型返回该句子的向量表示。这种
squeeze(): 用于移除数组中维度为 1 的维度,从而简化数组的形状。
参数名 | 类型 | 描述 |
---|---|---|
a | 数组 | 输入的数组。 |
axis | 整数或元组(可选) | 指定要移除的维度,默认为 None ,表示移除所有维度为 1 的轴。 |
#sentence : (batch_size, max_length)
def forward(self, sentence1, sentence2=None, target=None):
#同时传入两个句子
if sentence2 is not None:
vector1 = self.sentence_encoder(sentence1) #vec:(batch_size, hidden_size)
vector2 = self.sentence_encoder(sentence2)
#如果有标签,则计算loss
if target is not None:
return self.loss(vector1, vector2, target.squeeze())
#如果无标签,计算余弦距离
else:
return self.cosine_distance(vector1, vector2)
#单独传入一个句子时,认为正在使用向量化能力
else:
return self.sentence_encoder(sentence1)
③ 根据配置选择优化器
根据用户在配置字典中指定的优化器类型("adam" 或 "sgd")来选择并初始化相应的优化器,并设置学习率。
model.parameters():PyTorch 中 torch.nn.Module
类的一个方法,用于获取模型中所有可训练参数的迭代器。这些参数通常是模型的权重和偏置,它们会在训练过程中通过优化器进行更新。
def choose_optimizer(config, model):
optimizer = config["optimizer"]
learning_rate = config["learning_rate"]
if optimizer == "adam":
return Adam(model.parameters(), lr=learning_rate)
elif optimizer == "sgd":
return SGD(model.parameters(), lr=learning_rate)
④ 模型验证
设置配置参数、构造一个Siamese网络模型,并在此模型上运行两个句子的相似度计算。通过输入一对句子的编码和相应的标签,模型可以评估这两个句子的相似性,同时计算损失。最终的输出将是损失值或相似度度量。
torch.LongTensor(): PyTorch 中的一个函数,用于创建一个包含整数(64 位整型)的张量。它是一种特定的张量类型,通常用于存储索引、标签或其他整数值数据。
参数名 | 类型 | 描述 |
---|---|---|
data | 列表、元组或数组 | 输入数据,用于创建张量。 |
dtype | torch.dtype | 张量的数据类型,默认为 torch.int64 (64 位整数)。 |
device | torch.device | 张量存储的设备(如 CPU 或 GPU),默认为 None (使用默认设备)。 |
requires_grad | bool | 是否需要计算梯度,默认为 False 。 |
# -*- coding: utf-8 -*-
import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
"""
建立网络模型结构
"""
class SentenceEncoder(nn.Module):
def __init__(self, config):
super(SentenceEncoder, self).__init__()
hidden_size = config["hidden_size"]
vocab_size = config["vocab_size"] + 1
max_length = config["max_length"]
self.embedding = nn.Embedding(vocab_size, hidden_size, padding_idx=0)
# self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True, bidirectional=True)
self.layer = nn.Linear(hidden_size, hidden_size)
self.dropout = nn.Dropout(0.5)
#输入为问题字符编码
def forward(self, x):
x = self.embedding(x)
#使用lstm
# x, _ = self.lstm(x)
#使用线性层
x = self.layer(x)
x = nn.functional.max_pool1d(x.transpose(1, 2), x.shape[1]).squeeze()
return x
class SiameseNetwork(nn.Module):
def __init__(self, config):
super(SiameseNetwork, self).__init__()
self.sentence_encoder = SentenceEncoder(config)
self.loss = nn.CosineEmbeddingLoss()
# 计算余弦距离 1-cos(a,b)
# cos=1时两个向量相同,余弦距离为0;cos=0时,两个向量正交,余弦距离为1
def cosine_distance(self, tensor1, tensor2):
tensor1 = torch.nn.functional.normalize(tensor1, dim=-1)
tensor2 = torch.nn.functional.normalize(tensor2, dim=-1)
cosine = torch.sum(torch.mul(tensor1, tensor2), axis=-1)
return 1 - cosine
def cosine_triplet_loss(self, a, p, n, margin=None):
ap = self.cosine_distance(a, p)
an = self.cosine_distance(a, n)
if margin is None:
diff = ap - an + 0.1
else:
diff = ap - an + margin.squeeze()
return torch.mean(diff[diff.gt(0)]) #greater than
#sentence : (batch_size, max_length)
def forward(self, sentence1, sentence2=None, target=None):
#同时传入两个句子
if sentence2 is not None:
vector1 = self.sentence_encoder(sentence1) #vec:(batch_size, hidden_size)
vector2 = self.sentence_encoder(sentence2)
#如果有标签,则计算loss
if target is not None:
return self.loss(vector1, vector2, target.squeeze())
#如果无标签,计算余弦距离
else:
return self.cosine_distance(vector1, vector2)
#单独传入一个句子时,认为正在使用向量化能力
else:
return self.sentence_encoder(sentence1)
def choose_optimizer(config, model):
optimizer = config["optimizer"]
learning_rate = config["learning_rate"]
if optimizer == "adam":
return Adam(model.parameters(), lr=learning_rate)
elif optimizer == "sgd":
return SGD(model.parameters(), lr=learning_rate)
if __name__ == "__main__":
from config import Config
Config["vocab_size"] = 10
Config["max_length"] = 4
model = SiameseNetwork(Config)
s1 = torch.LongTensor([[1,2,3,0], [2,2,0,0]])
s2 = torch.LongTensor([[1,2,3,4], [3,2,3,4]])
l = torch.LongTensor([[1],[0]])
y = model(s1, s2, l)
print(y)
# print(model.state_dict())
Ⅳ、模型效果测试 evaluate.py
① 模型初始化
初始化Evaluator类,设置配置、模型和日志记录器。加载验证数据集和训练数据集(用于效果测试),并初始化统计字典以记录正确和错误的数量。
class Evaluator:
def __init__(self, config, model, logger):
self.config = config
self.model = model
self.logger = logger
self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)
# 由于效果测试需要训练集当做知识库,再次加载训练集。
# 事实上可以通过传参把前面加载的训练集传进来更合理,但是为了主流程代码改动量小,在这里重新加载一遍
self.train_data = load_data(config["train_data_path"], config)
self.stats_dict = {"correct":0, "wrong":0} #用于存储测试结果
② 将问题转换为向量表示 ⭐
将训练数据集中的问题转换为向量形式,并记录每个问题编号到标准问题编号的映射关系。这样做的目的是为了在模型评估过程中,通过计算输入问题与知识库中所有问题向量的相似度,来判断模型的预测结果是否正确。归一化处理确保了向量之间的相似度计算更加准确可靠。
初始化字典和列表
- 创建了一个空字典
question_index_to_standard_question_index
,用于存储问题编号到标准问题编号的映射。 - 创建了一个空列表
question_ids
,用于存储所有训练问题的ID。
遍历训练数据集中的问题
- 使用
for
循环遍历训练数据集中的每个标准问题及其对应的问题列表。 - 对于每个问题ID,将其添加到
question_ids
列表中,并在question_index_to_standard_question_index
中记录问题编号到标准问题编号的映射关系。
将问题ID转换为向量
- 使用
torch.no_grad()
上下文管理器,确保在执行这段代码时不会计算梯度,因为这里不需要进行反向传播。 - 使用
torch.stack
将question_ids
列表中的所有问题ID堆叠在一起,形成一个二维的question_matrixs
张量,其中每一行对应一个问题ID。 - 检查CUDA是否可用,如果可用,则将
question_matrixs
移动到GPU上。 - 将
question_matrixs
输入到模型self.model
中,得到问题的向量化表示knwb_vectors
。 - 对
knwb_vectors
中的所有向量进行归一化处理,即将每个向量除以其自身的范数,这样可以使得不同向量之间的相似度计算更加合理。
items():Python 字典的方法,返回字典中所有键值对的视图,每个键值对以元组形式返回。
append():Python 列表的方法,用于在列表末尾添加一个元素。
参数名 | 类型 | 描述 |
---|---|---|
element | 任意类型 | 要添加到列表末尾的元素。 |
torch.no_grad():PyTorch 的上下文管理器,用于禁用梯度计算,通常在模型推理或评估时使用,以减少内存消耗并加速计算。
torch.stack():用于将多个张量沿着新的维度进行堆叠,所有输入张量的形状必须相同。
参数名 | 类型 | 描述 |
---|---|---|
tensors | 张量序列 | 要堆叠的张量序列。 |
dim | int | 指定堆叠的维度,默认为 0 。 |
torch.cuda.is_available():用于检查当前系统是否支持 CUDA(即是否有可用的 GPU)。
cuda():PyTorch 张量或模型的方法,用于将张量或模型移动到 GPU 上。
参数名 | 类型 | 描述 |
---|---|---|
device | int 或 str | 指定目标 GPU 设备,默认为 None (使用默认 GPU)。 |
torch.nn.functional.normalize():用于对输入张量在指定维度上进行 Lp 范数(p 是范数的指数值)归一化,默认使用 L2 范数。
参数名 | 类型 | 描述 |
---|---|---|
input | Tensor | 输入张量。 |
p | float | 范数的指数值,默认为 2 (L2 范数)。 |
dim | int | 归一化的维度,默认为 1 。 |
eps | float | 用于避免除以零的小值,默认为 1e-12 。 |
#将知识库中的问题向量化,为匹配做准备
#每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
def knwb_to_vector(self):
self.question_index_to_standard_question_index = {}
self.question_ids = []
for standard_question_index, question_ids in self.train_data.dataset.knwb.items():
for question_id in question_ids:
#记录问题编号到标准问题标号的映射,用来确认答案是否正确
self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_index
self.question_ids.append(question_id)
with torch.no_grad():
question_matrixs = torch.stack(self.question_ids, dim=0)
if torch.cuda.is_available():
question_matrixs = question_matrixs.cuda()
self.knwb_vectors = self.model(question_matrixs)
#将所有向量都作归一化 v / |v|
self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
return
③ 评估模型表现
1.记录日志并初始化统计字典。
2.将模型设置为评估模式,并将知识库转换为向量表示。
3.遍历验证数据集,处理每一批次的数据:
如果有GPU可用,将数据移动到GPU。
使用模型进行预测,不计算梯度。
更新统计信息。
4.显示最终的统计结果。
logger.info():Python 的 logging
模块中用于记录信息级别(info level)日志的函数。它通常用于记录程序运行时的关键信息,如状态、进度等,而不是调试信息或错误。
参数名 | 类型 | 描述 |
---|---|---|
msg | str | 要记录的日志信息。 |
*args | 任意类型 | 用于格式化日志信息的参数。 |
**kwargs | dict | 可选参数,如 exc_info 、stack_info 等,用于附加异常或堆栈信息。 |
model.eval():PyTorch 中用于将模型设置为评估模式的方法。在评估模式下,模型会禁用 Dropout
和 Batch Normalization
等训练时的特定行为,以确保测试结果的稳定性。
enumerate():Python 的内置函数,用于在遍历可迭代对象(如列表、元组、字符串)时同时获取索引和值。
参数名 | 类型 | 描述 |
---|---|---|
iterable | 可迭代对象 | 要遍历的对象(如列表、元组、字符串)。 |
start | int | 索引的起始值,默认为 0 。 |
torch.cuda.is_available():PyTorch 中用于检查当前系统是否支持 CUDA(即是否有可用的 GPU)的函数。
参数名 | 类型 | 描述 |
---|---|---|
device | int 或 str | 指定目标 GPU 设备,默认为 None (使用默认 GPU)。 |
cuda():PyTorch 中用于将张量或模型移动到 GPU 上的方法。
torch.no_grad():PyTorch 的上下文管理器,用于禁用梯度计算,通常在模型推理或评估时使用,以减少内存消耗并加速计算。
def eval(self, epoch):
self.logger.info("开始测试第%d轮模型效果:" % epoch)
self.stats_dict = {"correct":0, "wrong":0} #清空前一轮的测试结果
self.model.eval()
self.knwb_to_vector()
for index, batch_data in enumerate(self.valid_data):
if torch.cuda.is_available():
batch_data = [d.cuda() for d in batch_data]
input_id, labels = batch_data #输入变化时这里需要修改,比如多输入,多输出的情况
with torch.no_grad():
test_question_vectors = self.model(input_id) #不输入labels,使用模型当前参数进行预测
self.write_stats(test_question_vectors, labels)
self.show_stats()
return
④ 计算输入问题向量与知识库问题向量的相似度
断言检查:首先通过 assert
确保 labels
和 test_question_vectors
的长度一致。
矩阵乘法:对于每个测试问题向量 test_question_vector
,通过 torch.mm
函数计算其与知识库中所有问题向量 self.knwb_vectors
的相似度。
torch.mm
是 PyTorch 中的矩阵乘法函数,用于计算两个矩阵的乘积
test_question_vector.unsqueeze(0)
:将 test_question_vector
的形状从 [vec_size]
扩展为 [1, vec_size]
,以便与 self.knwb_vectors.T
进行矩阵乘法。
self.knwb_vectors.T
:知识库问题向量的转置,形状为 [vec_size, n]
,其中 n
是知识库中问题的数量。
res
:矩阵乘法的结果,形状为 [1, n]
,表示测试问题与知识库中每个问题的相似度。
命中问题标号:通过 torch.argmax
找到相似度最高的索引 hit_index
,即命中问题的标号。
标准问编号转换:将 hit_index
转换为标准问编号self.question_index_to_standard_question_index[hit_index]
。
统计更新:如果命中问题的标准问编号与标签 label
一致,则增加
self.stats_dict["correct"]
的计数;否则增加 self.stats_dict["wrong"]
的计数。
assert:是 Python 中的调试工具,用于检查某个条件是否为真。如果条件为假,程序会抛出 AssertionError
并终止执行。
参数名 | 类型 | 描述 |
---|---|---|
condition | bool | 要检查的条件表达式。如果为假,则抛出 AssertionError 。 |
message | str | 可选参数,用于在断言失败时显示的错误信息。 |
zip():Python 的内置函数,用于将多个可迭代对象(如列表、元组)的元素按索引配对,返回一个 zip 对象。
参数名 | 类型 | 描述 |
---|---|---|
iterables | 可迭代对象 | 要配对的多个可迭代对象。 |
unsqueeze(): 是 PyTorch 中的函数,用于在指定维度上插入一个大小为 1 的维度,从而改变张量的形状。
参数名 | 类型 | 描述 |
---|---|---|
input | Tensor | 输入张量。 |
dim | int | 要插入新维度的位置,范围是 [-input.dim()-1, input.dim()] 。 |
.T:PyTorch 和 NumPy 中的属性,用于返回张量或矩阵的转置。
torch.mm(): PyTorch 中的函数,用于计算两个二维张量(矩阵)的矩阵乘法。
参数名 | 类型 | 描述 |
---|---|---|
input | Tensor | 第一个矩阵,形状为 (m, n) 。 |
mat2 | Tensor | 第二个矩阵,形状为 (n, p) 。 |
torch.argmax():PyTorch 中的函数,用于返回张量中最大值所在的索引。
参数名 | 类型 | 描述 |
---|---|---|
input | Tensor | 输入张量。 |
dim | int | 指定沿哪个维度计算最大值索引。 |
keepdim | bool | 是否保持输出张量的维度,默认为 False 。 |
squeeze():是 PyTorch 和 NumPy 中的函数,用于移除张量或数组中大小为 1 的维度。
参数名 | 类型 | 描述 |
---|---|---|
input | Tensor | 输入张量。 |
dim | int | 可选参数,指定要移除的维度。 |
def write_stats(self, test_question_vectors, labels):
assert len(labels) == len(test_question_vectors)
for test_question_vector, label in zip(test_question_vectors, labels):
#通过一次矩阵乘法,计算输入问题和知识库中所有问题的相似度
#test_question_vector shape [vec_size] knwb_vectors shape = [n, vec_size]
res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
hit_index = int(torch.argmax(res.squeeze())) #命中问题标号
hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号
if int(hit_index) == int(label):
self.stats_dict["correct"] += 1
else:
self.stats_dict["wrong"] += 1
return
⑤ 展示模型预测的统计信息
- 统计信息提取:
correct = self.stats_dict["correct"]
:从self.stats_dict
中获取预测正确的条目数。wrong = self.stats_dict["wrong"]
:从self.stats_dict
中获取预测错误的条目数。
- 日志输出:
self.logger.info("预测集合条目总量:%d" % (correct + wrong))
:输出预测集合的总条目数,即正确条目数与错误条目数之和。self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong))
:输出预测正确的条目数和预测错误的条目数。self.logger.info("预测准确率:%f" % (correct / (correct + wrong)))
:输出预测的准确率,即正确条目数占总条目数的比例。self.logger.info("--------------------")
:输出分隔线,用于区分不同的日志信息。
logger.info(): 是 Python 的 logging
模块中用于记录信息级别(info level)日志的函数。它通常用于记录程序运行时的关键信息,如状态、进度等,而不是调试信息或错误。
参数名 | 类型 | 描述 |
---|---|---|
msg | str | 要记录的日志信息。 |
*args | 任意类型 | 用于格式化日志信息的参数。 |
**kwargs | dict | 可选参数,如 exc_info 、stack_info 等,用于附加异常或堆栈信息。 |
# -*- coding: utf-8 -*-
import torch
from loader import load_data
"""
模型效果测试
"""
class Evaluator:
def __init__(self, config, model, logger):
self.config = config
self.model = model
self.logger = logger
self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)
# 由于效果测试需要训练集当做知识库,再次加载训练集。
# 事实上可以通过传参把前面加载的训练集传进来更合理,但是为了主流程代码改动量小,在这里重新加载一遍
self.train_data = load_data(config["train_data_path"], config)
self.stats_dict = {"correct":0, "wrong":0} #用于存储测试结果
#将知识库中的问题向量化,为匹配做准备
#每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
def knwb_to_vector(self):
self.question_index_to_standard_question_index = {}
self.question_ids = []
for standard_question_index, question_ids in self.train_data.dataset.knwb.items():
for question_id in question_ids:
#记录问题编号到标准问题标号的映射,用来确认答案是否正确
self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_index
self.question_ids.append(question_id)
with torch.no_grad():
question_matrixs = torch.stack(self.question_ids, dim=0)
if torch.cuda.is_available():
question_matrixs = question_matrixs.cuda()
self.knwb_vectors = self.model(question_matrixs)
#将所有向量都作归一化 v / |v|
self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
return
def eval(self, epoch):
self.logger.info("开始测试第%d轮模型效果:" % epoch)
self.stats_dict = {"correct":0, "wrong":0} #清空前一轮的测试结果
self.model.eval()
self.knwb_to_vector()
for index, batch_data in enumerate(self.valid_data):
if torch.cuda.is_available():
batch_data = [d.cuda() for d in batch_data]
input_id, labels = batch_data #输入变化时这里需要修改,比如多输入,多输出的情况
with torch.no_grad():
test_question_vectors = self.model(input_id) #不输入labels,使用模型当前参数进行预测
self.write_stats(test_question_vectors, labels)
self.show_stats()
return
def write_stats(self, test_question_vectors, labels):
assert len(labels) == len(test_question_vectors)
for test_question_vector, label in zip(test_question_vectors, labels):
#通过一次矩阵乘法,计算输入问题和知识库中所有问题的相似度
#test_question_vector shape [vec_size] knwb_vectors shape = [n, vec_size]
res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
hit_index = int(torch.argmax(res.squeeze())) #命中问题标号
hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号
if int(hit_index) == int(label):
self.stats_dict["correct"] += 1
else:
self.stats_dict["wrong"] += 1
return
def show_stats(self):
correct = self.stats_dict["correct"]
wrong = self.stats_dict["wrong"]
self.logger.info("预测集合条目总量:%d" % (correct +wrong))
self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong))
self.logger.info("预测准确率:%f" % (correct / (correct + wrong)))
self.logger.info("--------------------")
return
Ⅴ、训练主流程 main.py
① 配置日志记录
logging.basicConfig():用于配置日志系统的基本设置,包括日志级别、输出格式、输出目标(如控制台或文件)等。它通常在程序初始化时调用一次。
参数名 | 类型 | 描述 |
---|---|---|
level | int | 设置日志级别,如 logging.DEBUG 、logging.INFO 等。 |
format | str | 设置日志输出格式,如 '%(asctime)s - %(levelname)s - %(message)s' 。 |
filename | str | 设置日志输出到文件,指定文件名。 |
filemode | str | 设置文件打开模式,默认为 'a' (追加模式)。 |
handlers | list | 设置自定义的日志处理器。 |
logging.getLogger(): 用于获取一个日志记录器(Logger)对象。每个记录器都有一个名称(name),可以用来区分不同的日志记录器。
参数名 | 类型 | 描述 |
---|---|---|
name | str | 日志记录器的名称。如果未提供或为 None ,则返回根记录器(root logger)。 |
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
② 创建保存模型的目录
检查指定的模型保存路径是否存在,如果不存在,则创建该路径作为目录。
os.path.isdir(): Python 中 os.path
模块的函数,用于检查指定的路径是否为一个存在的目录。如果路径存在且是一个目录,则返回 True
,否则返回 False
。
参数名 | 类型 | 描述 |
---|---|---|
path | str | 表示文件系统路径的类路径对象(可以是相对路径或绝对路径)。 |
os.mkdir():Python 中 os
模块的函数,用于创建一个新的目录。如果目录已经存在或路径无效,会抛出 FileExistsError
或 OSError
参数名 | 类型 | 描述 |
---|---|---|
path | str | 要创建的目录路径。 |
mode | int | 可选参数,设置目录的权限(八进制模式),默认为 0o777 。 |
#创建保存模型的目录
if not os.path.isdir(config["model_path"]):
os.mkdir(config["model_path"])
③ 加载文件
torch.cuda.is_available():PyTorch 中用于检查当前系统是否支持 CUDA(即是否有可用的 GPU)的函数。
参数名 | 类型 | 描述 |
---|---|---|
device | int 或 str | 指定目标 GPU 设备,默认为 None (使用默认 GPU)。 |
cuda():PyTorch 张量或模型的方法,用于将张量或模型移动到 GPU 上。
#加载训练数据
train_data = load_data(config["train_data_path"], config)
#加载模型
model = SiameseNetwork(config)
# 标识是否使用gpu
cuda_flag = torch.cuda.is_available()
if cuda_flag:
logger.info("gpu可以使用,迁移模型至gpu")
model = model.cuda()
#加载优化器
optimizer = choose_optimizer(config, model)
#加载效果测试类
evaluator = Evaluator(config, model, logger)
④ 训练的核心过程 ⭐
-
训练循环:
for epoch in range(config["epoch"])
:遍历每个训练周期(epoch),config["epoch"]
是训练的总周期数。epoch += 1
:将当前周期数加1,用于日志记录。model.train()
:将模型设置为训练模式,启用Dropout和Batch Normalization等训练特定行为。
-
数据加载与处理:
for index, batch_data in enumerate(train_data)
:遍历训练数据集的每个批次(batch)。optimizer.zero_grad()
:清空优化器的梯度缓存,避免梯度累积。if cuda_flag: batch_data = [d.cuda() for d in batch_data]
:如果启用了CUDA(GPU加速),将数据移动到GPU上。
-
模型前向传播与损失计算:
input_id1, input_id2, labels = batch_data
:从批次数据中提取两个输入(input_id1
和input_id2
)以及对应的标签(labels
)。
-
反向传播与优化:
train_loss.append(loss.item())
:将当前批次的损失值记录下来。loss.backward()
:执行反向传播,计算梯度。optimizer.step()
:更新模型参数,优化损失函数。
-
日志记录与评估:
logger.info("epoch average loss: %f" % np.mean(train_loss))
:记录当前周期的平均损失值。evaluator.eval(epoch)
:调用评估器对模型进行评估,通常是在验证集上计算准确率或其他指标。
model.train():将模型设置为训练模式。在训练模式下,模型会启用 Dropout
和 Batch Normalization
等层的行为,以确保模型在训练时能够正常工作
enumerate():Python 的内置函数,用于在遍历可迭代对象时同时获取索引和值
参数名 | 类型 | 描述 |
---|---|---|
iterable | 可迭代对象 | 要遍历的对象(如列表、元组、字符串)。 |
start | int | 索引的起始值,默认为 0 。 |
optimizer.zero_grad():将优化器中所有参数的梯度清零,避免梯度累积
cuda():将张量或模型移动到 GPU 上,以利用 GPU 的并行计算能力加速计算
参数名 | 类型 | 描述 |
---|---|---|
device | int 或 str | 指定目标 GPU 设备,默认为 None (使用默认 GPU)。 |
append():在列表末尾添加一个新元素
参数名 | 类型 | 描述 |
---|---|---|
element | 任意类型 | 要添加到列表末尾的元素。 |
item():将包含单个元素的张量转换为 Python 标量(如 int
或 float
)
loss.backward():计算损失函数对模型参数的梯度,用于反向传播
optimizer.step(): 根据计算出的梯度更新模型参数
#训练
for epoch in range(config["epoch"]):
epoch += 1
model.train()
logger.info("epoch %d begin" % epoch)
train_loss = []
for index, batch_data in enumerate(train_data):
optimizer.zero_grad()
if cuda_flag:
batch_data = [d.cuda() for d in batch_data]
input_id1, input_id2, labels = batch_data #输入变化时这里需要修改,比如多输入,多输出的情况
loss = model(input_id1, input_id2, labels)
train_loss.append(loss.item())
# if index % int(len(train_data) / 2) == 0:
# logger.info("batch loss %f" % loss)
loss.backward()
optimizer.step()
logger.info("epoch average loss: %f" % np.mean(train_loss))
evaluator.eval(epoch)
⑤ 保存模型
os.path.join():用于将多个路径片段拼接成一个完整的路径,并自动根据操作系统选择正确的路径分隔符(如 Windows 使用 \
,Linux 和 macOS 使用 /
)
参数名 | 类型 | 描述 |
---|---|---|
path | str | 初始路径片段。 |
*paths | str | 需要拼接的后续路径片段,可以接受任意数量的参数。 |
torch.save():用于将 PyTorch 对象(如模型、张量、字典等)保存到磁盘文件中,通常用于保存模型的权重或训练状态
参数名 | 类型 | 描述 |
---|---|---|
obj | 任意对象 | 需要保存的对象,如模型、张量、字典等。 |
f | str 或文件对象 | 保存的目标文件路径或文件对象。 |
model.state_dict(): 返回一个包含模型所有可学习参数(如权重和偏置)的有序字典,通常用于保存或加载模型参数
model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)
torch.save(model.state_dict(), model_path)
模型训练主程序
# -*- coding: utf-8 -*-
import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import SiameseNetwork, choose_optimizer
from evaluate import Evaluator
from loader import load_data
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
"""
模型训练主程序
"""
def main(config):
#创建保存模型的目录
if not os.path.isdir(config["model_path"]):
os.mkdir(config["model_path"])
#加载训练数据
train_data = load_data(config["train_data_path"], config)
#加载模型
model = SiameseNetwork(config)
# 标识是否使用gpu
cuda_flag = torch.cuda.is_available()
if cuda_flag:
logger.info("gpu可以使用,迁移模型至gpu")
model = model.cuda()
#加载优化器
optimizer = choose_optimizer(config, model)
#加载效果测试类
evaluator = Evaluator(config, model, logger)
#训练
for epoch in range(config["epoch"]):
epoch += 1
model.train()
logger.info("epoch %d begin" % epoch)
train_loss = []
for index, batch_data in enumerate(train_data):
optimizer.zero_grad()
if cuda_flag:
batch_data = [d.cuda() for d in batch_data]
input_id1, input_id2, labels = batch_data #输入变化时这里需要修改,比如多输入,多输出的情况
loss = model(input_id1, input_id2, labels)
train_loss.append(loss.item())
# if index % int(len(train_data) / 2) == 0:
# logger.info("batch loss %f" % loss)
loss.backward()
optimizer.step()
logger.info("epoch average loss: %f" % np.mean(train_loss))
evaluator.eval(epoch)
model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)
torch.save(model.state_dict(), model_path)
return
if __name__ == "__main__":
main(Config)
Ⅵ、预测文件 predict.py
① 初始化预测器对象
- 模型初始化:
self.config = config
:将传入的配置信息保存到类属性中。self.model = model
:将传入的模型保存到类属性中。self.train_data = knwb_data
:将知识库数据保存到类属性中。
- 设备选择:
if torch.cuda.is_available(): self.model = model.cuda()
:如果 GPU 可用,则将模型移动到 GPU 上。else: self.model = model.cpu()
:如果 GPU 不可用,则将模型移动到 CPU 上。
- 模型模式设置:
self.model.eval()
:将模型设置为评估模式,禁用 Dropout 和 Batch Normalization 等训练特定行为。
- 知识库向量化:
self.knwb_to_vector()
:调用knwb_to_vector
方法,将知识库数据转换为向量形式,通常用于后续的相似度计算或检索。
torch.cuda.is_available():检查当前系统是否支持 CUDA(即是否有可用的 NVIDIA GPU),并且 PyTorch 是否已编译为支持 CUDA。如果返回 True
,则表示可以使用 GPU 加速计算;如果返回 False
,则表示只能使用 CPU。
model.cuda():将模型从 CPU 移动到 GPU 上,以便利用 GPU 的并行计算能力加速训练和推理。
参数名 | 类型 | 描述 |
---|---|---|
device | int 或 str | 指定目标 GPU 设备,默认为 None (使用默认 GPU) |
model.cpu(): 将模型从 GPU 移动回 CPU 上
model.eval():将模型设置为评估模式,禁用 Dropout
和 Batch Normalization
等层的随机行为。
class Predictor:
def __init__(self, config, model, knwb_data):
self.config = config
self.model = model
self.train_data = knwb_data
if torch.cuda.is_available():
self.model = model.cuda()
else:
self.model = model.cpu()
self.model.eval()
self.knwb_to_vector()
② 将知识库中的问题向量化
-
初始化映射和列表:
self.question_index_to_standard_question_index = {}
:创建一个空字典,用于记录问题编号到标准问题编号的映射。self.question_ids = []
:创建一个空列表,用于存储问题编号。
-
加载词汇表和模式:
self.vocab = self.train_data.dataset.vocab
:从训练数据集中加载词汇表。self.schema = self.train_data.dataset.schema
:从训练数据集中加载模式。self.index_to_standard_question = dict((y, x) for x, y in self.schema.items())
:创建一个字典,将索引映射到标准问题。
-
记录问题编号到标准问题编号的映射:
- 遍历知识库中的每个标准问题及其对应的问题编号。
- 对于每个问题编号,记录其到标准问题编号的映射,并将其添加到
self.question_ids
列表中。
-
问题向量化:
with torch.no_grad():
:禁用梯度计算,因为这是推理阶段,不需要更新模型参数。question_matrixs = torch.stack(self.question_ids, dim=0)
:将问题编号列表堆叠成一个矩阵。if torch.cuda.is_available(): question_matrixs = question_matrixs.cuda()
:如果 GPU 可用,则将矩阵移动到 GPU 上。self.knwb_vectors = self.model(question_matrixs)
:通过模型将问题矩阵转换为向量。self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
:对所有向量进行归一化处理,使得每个向量的长度为 1。
dict():用于创建字典
参数名 | 类型 | 描述 |
---|---|---|
**kwargs | 关键字参数 | 关键字参数会被视为字典中的键值对。 |
mapping | 映射对象 | 映射对象中的元素会被复制到新创建的字典中。 |
iterable | 可迭代对象 | 可迭代对象的元素通常是长度为二的元组,每个元组的第一个元素作为键,第二个元素作为值。 |
items():返回字典中所有键值对的视图
列表.append():在列表末尾添加一个元素
参数名 | 类型 | 描述 |
---|---|---|
item | 任意对象 | 要添加到列表末尾的元素。 |
torch.no_grad():禁用梯度计算,适用于模型评估或推理阶段
torch.stack():沿指定维度连接张量序列,所有张量必须具有相同的大小
参数名 | 类型 | 描述 |
---|---|---|
tensors | 张量序列 | 需要连接的张量序列。 |
dim | int | 指定连接的维度,必须在 0 和所需连接的张量维度之间。 |
out | Tensor | 输出张量,默认为 None 。 |
torch.cuda.is_available():检查当前系统是否支持 CUDA(即是否有可用的 NVIDIA GPU),并且 PyTorch 是否已编译为支持 CUDA。如果返回 True
,则表示可以使用 GPU 加速计算;如果返回 False
,则表示只能使用 CPU。
cuda():将张量从 CPU 移动到 GPU 上,以便利用 GPU 的并行计算能力加速训练和推理。
参数名 | 类型 | 描述 |
---|---|---|
device | int 或 str | 指定目标 GPU 设备,默认为 None (使用默认 GPU) |
torch.nn.functional.normalize(): 对输入张量沿指定维度进行归一化
参数名 | 类型 | 描述 |
---|---|---|
input | Tensor | 输入张量。 |
p | float | 计算范数的类型,默认为 2(L2 范数)。 |
dim | int | 计算范数的维度。 |
eps | float | 防止分母为 0 的小数,默认为 1e-12 。 |
out | Tensor | 输出张量,默认为 None 。 |
#将知识库中的问题向量化,为匹配做准备
#每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
def knwb_to_vector(self):
self.question_index_to_standard_question_index = {}
self.question_ids = []
self.vocab = self.train_data.dataset.vocab
self.schema = self.train_data.dataset.schema
self.index_to_standard_question = dict((y, x) for x, y in self.schema.items())
for standard_question_index, question_ids in self.train_data.dataset.knwb.items():
for question_id in question_ids:
#记录问题编号到标准问题标号的映射,用来确认答案是否正确
self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_index
self.question_ids.append(question_id)
with torch.no_grad():
question_matrixs = torch.stack(self.question_ids, dim=0)
if torch.cuda.is_available():
question_matrixs = question_matrixs.cuda()
self.knwb_vectors = self.model(question_matrixs)
#将所有向量都作归一化 v / |v|
self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
return
③ 文本编码
- 根据配置中的
vocab_path
值,选择不同的分词方式:- 如果
vocab_path
为"words.txt"
,则使用jieba
对文本进行分词,并将每个分词结果映射到词汇表(self.vocab
)中的 ID。 - 否则,将文本按字符逐个映射到词汇表中的 ID。
- 如果
- 如果词汇表中不存在某个词或字符,则使用
"[UNK]"
(未知词)的 ID 作为默认值。
jieba.cut():是 jieba
库中的函数,用于对中文文本进行分词。它支持多种分词模式,包括精确模式、全模式和搜索引擎模式。
参数名 | 类型 | 描述 |
---|---|---|
sentence | str | 需要分词的中文字符串。 |
cut_all | bool | 是否使用全模式,默认为 False (精确模式)。 |
HMM | bool | 是否使用隐马尔可夫模型(HMM)进行新词发现,默认为 True |
字典.get():Python 字典对象的方法,用于安全地获取字典中指定键的值。如果键不存在,返回默认值(默认为 None
),而不会引发 KeyError
异常。
参数名 | 类型 | 描述 |
---|---|---|
key | 任意类型 | 需要检索的键。 |
default | 任意类型 | 如果键不存在时返回的默认值,默认为 None 。 |
列表.append():在列表末尾添加一个元素
参数名 | 类型 | 描述 |
---|---|---|
item | 任意对象 | 要添加到列表末尾的元素。 |
def encode_sentence(self, text):
input_id = []
if self.config["vocab_path"] == "words.txt":
for word in jieba.cut(text):
input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
else:
for char in text:
input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
return input_id
④ 匹配相似的标准问题
-
输入处理:
input_id = self.encode_sentence(sentence)
:调用encode_sentence
方法将输入的句子转换为 ID 序列。input_id = torch.LongTensor([input_id])
:将 ID 序列转换为 PyTorch 的LongTensor
类型,并增加一个维度以适配模型输入。
-
设备选择:
if torch.cuda.is_available(): input_id = input_id.cuda()
:如果 GPU 可用,则将输入数据移动到 GPU 上。
-
模型推理:
with torch.no_grad():
:禁用梯度计算,因为这是推理阶段,不需要更新模型参数。test_question_vector = self.model(input_id)
:将输入数据传递给模型,生成句子的向量表示。res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
:计算输入句子向量与知识库中所有问题向量的相似度(通过矩阵乘法)。hit_index = int(torch.argmax(res.squeeze()))
:找到相似度最高的知识库问题的索引。hit_index = self.question_index_to_standard_question_index[hit_index]
:将命中问题的索引转换为标准问题的索引。
torch.LongTensor(): PyTorch 中的一个函数,用于创建一个包含整数(64 位整型)的张量。它是一种特定的张量类型,通常用于存储索引、标签或其他整数值数据。
参数名 | 类型 | 描述 |
---|---|---|
data | 列表、元组或数组 | 输入数据,用于创建张量。 |
dtype | torch.dtype | 张量的数据类型,默认为 torch.int64 (64 位整数)。 |
device | torch.device | 张量存储的设备(如 CPU 或 GPU),默认为 None (使用默认设备)。 |
requires_grad | bool | 是否需要计算梯度,默认为 False 。 |
torch.cuda.is_available():检查当前系统是否支持 CUDA(即是否有可用的 NVIDIA GPU),并且 PyTorch 是否已编译为支持 CUDA。如果返回 True
,则表示可以使用 GPU 加速计算;如果返回 False
,则表示只能使用 CPU。
cuda():将张量从 CPU 移动到 GPU 上,以便利用 GPU 的并行计算能力加速训练和推理。
参数名 | 类型 | 描述 |
---|---|---|
device | int 或 str | 指定目标 GPU 设备,默认为 None (使用默认 GPU) |
torch.no_grad():禁用梯度计算,适用于模型评估或推理阶段
squeeze():是 PyTorch 和 NumPy 中的函数,用于移除张量或数组中大小为 1 的维度。
参数名 | 类型 | 描述 |
---|---|---|
input | Tensor | 输入张量。 |
dim | int | 可选参数,指定要移除的维度。 |
unsqueeze(): 是 PyTorch 中的函数,用于在指定维度上插入一个大小为 1 的维度,从而改变张量的形状。
参数名 | 类型 | 描述 |
---|---|---|
input | Tensor | 输入张量。 |
dim | int | 要插入新维度的位置,范围是 [-input.dim()-1, input.dim()] 。 |
torch.argmax():PyTorch 中的函数,用于返回张量中最大值所在的索引。
参数名 | 类型 | 描述 |
---|---|---|
input | Tensor | 输入张量。 |
dim | int | 指定沿哪个维度计算最大值索引。 |
keepdim | bool | 是否保持输出张量的维度,默认为 False 。 |
def predict(self, sentence):
input_id = self.encode_sentence(sentence)
input_id = torch.LongTensor([input_id])
if torch.cuda.is_available():
input_id = input_id.cuda()
with torch.no_grad():
test_question_vector = self.model(input_id) #不输入labels,使用模型当前参数进行预测
res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
hit_index = int(torch.argmax(res.squeeze())) #命中问题标号
hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号
return self.index_to_standard_question[hit_index]
⑤ 实现与用户的交互问答
torch.load():PyTorch 中用于加载由 torch.save()
保存的模型、张量或其他对象的函数。它可以将保存在文件中的数据加载到程序中,以便进行推理、继续训练或其他操作
参数名 | 类型/值 | 说明 |
---|---|---|
f | 类文件对象或字符串 | 类文件对象(必须实现 read() 、readline() 、tell() 和 seek() ),或包含文件名的字符串 |
map_location | 函数、torch.device 、字符串或字典 | 指定如何重新映射存储位置。例如,将 GPU 保存的模型加载到 CPU 上 |
pickle_module | 模块(默认:pickle ) | 用于反序列化的模块,必须与序列化时使用的模块匹配。 |
**pickle_load_args | 可选关键字参数 | 传递给
的可选参数 |
input():Python 内置函数,用于从标准输入(通常是键盘)读取用户输入的数据。它会将用户输入的内容作为字符串返回。
参数名 | 类型 | 说明 |
---|---|---|
prompt | 字符串 | 可选参数,用于在等待用户输入时显示的提示信息。如果未提供,则不显示任何提示。 |
# -*- coding: utf-8 -*-
import torch
from loader import load_data
from config import Config
from model import SiameseNetwork, choose_optimizer
"""
模型效果测试
"""
class Predictor:
def __init__(self, config, model, knwb_data):
self.config = config
self.model = model
self.train_data = knwb_data
if torch.cuda.is_available():
self.model = model.cuda()
else:
self.model = model.cpu()
self.model.eval()
self.knwb_to_vector()
#将知识库中的问题向量化,为匹配做准备
#每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
def knwb_to_vector(self):
self.question_index_to_standard_question_index = {}
self.question_ids = []
self.vocab = self.train_data.dataset.vocab
self.schema = self.train_data.dataset.schema
self.index_to_standard_question = dict((y, x) for x, y in self.schema.items())
for standard_question_index, question_ids in self.train_data.dataset.knwb.items():
for question_id in question_ids:
#记录问题编号到标准问题标号的映射,用来确认答案是否正确
self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_index
self.question_ids.append(question_id)
with torch.no_grad():
question_matrixs = torch.stack(self.question_ids, dim=0)
if torch.cuda.is_available():
question_matrixs = question_matrixs.cuda()
self.knwb_vectors = self.model(question_matrixs)
#将所有向量都作归一化 v / |v|
self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
return
def encode_sentence(self, text):
input_id = []
if self.config["vocab_path"] == "words.txt":
for word in jieba.cut(text):
input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
else:
for char in text:
input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
return input_id
def predict(self, sentence):
input_id = self.encode_sentence(sentence)
input_id = torch.LongTensor([input_id])
if torch.cuda.is_available():
input_id = input_id.cuda()
with torch.no_grad():
test_question_vector = self.model(input_id) #不输入labels,使用模型当前参数进行预测
res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
hit_index = int(torch.argmax(res.squeeze())) #命中问题标号
hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号
return self.index_to_standard_question[hit_index]
if __name__ == "__main__":
knwb_data = load_data(Config["train_data_path"], Config)
model = SiameseNetwork(Config)
model.load_state_dict(torch.load("model_output/epoch_10.pth"))
pd = Predictor(Config, model, knwb_data)
while True:
# sentence = "固定宽带服务密码修改"
sentence = input("请输入问题:")
res = pd.predict(sentence)
print(res)
二、实现方式 ② 交互型文本匹配
同时输入两句话,对两句话之间进行分割,然后同时送入这个模型,然后过模型后取出【CLS】token 位置对应的向量送入这个模型,过模型后取出【CLS】token 位置对应的向量做一个二分类
Bert中进行匹配这两句话判断是否是上下文关系,而文本匹配模型中,将数据换为文本匹配的数据,最终匹配两句话的语义相似度
表示型文本匹配:两句话分别单独进入这个模型,两句话分别编码,在编码的过程中互不影响
交互式文本匹配:两句话一同送入这个模型,计算交互时与self-attention计算类似,对比可以看到另一句话的信息,对比发现两句话的主要区别,例:今天下雨了 今天下雪了
representation layer:表示层输出是否匹配,二分类任务
interaction layer:交互层进行信息融合,常以attention的方式
Embedding layer:嵌入层,权重共享
将两句话同时输入这个模型,让模型看到,让模型进行对比发现文本的重点
1.交互型文本匹配 —— 代码示例🚀
Ⅰ、配置文件 config.py
① 路径相关参数
model_path:指定训练后模型的保存路径。若加载预训练模型,需提供模型文件的具体存储位置
schema_path:定义数据结构的配置文件路径,通常用于数据预处理或验证输入格式(如JSON/YAML文件)
train_data_path:指向训练集的数据文件或目录。若为目录,需包含多个训练文件(如文本、图像等)
valid_data_path:指向验证集的数据文件或目录。若为目录,需包含多个训练文件(如文本、图像等)
vocab_path:词表文件路径,用于自然语言处理任务中定义词汇的映射关系(如将单词转为ID)
② 模型结构参数
max_length:序列数据的最大长度(如文本的单词数)。超过此长度的序列会被截断或填充
hidden_size:神经网络隐藏层的维度大小,影响模型的表达能力。例如,LSTM或Transformer中每层的神经元数量
③ 训练控制参数
epoch:训练过程中遍历整个数据集的次数。适当增加轮次可提升模型性能,但可能过拟合
batch_size:每次输入模型的样本数量。较大的批次可加速训练,但需更多显存
epoch_data_size:每轮训练中采样的数量
positive_sample_rate:正样本在批次中的占比,常用于不平衡数据任务(如分类)。需结合负样本比例调整
④ 优化和学习参数
optimizer:选择优化算法(如SGD、Adam、AdamW),影响参数更新策略
learning_rate:控制参数更新的步长。学习率过高可能导致震荡,过低则收敛缓慢
# -*- coding: utf-8 -*-
"""
配置参数信息
"""
Config = {
"model_path": "model_output",
"schema_path": "../data/schema.json",
"train_data_path": "../data/train.json",
"valid_data_path": "../data/valid.json",
"pretrain_model_path":r"F:\人工智能NLP\NLP资料\week6 语言模型\bert-base-chinese",
"vocab_path":r"F:\人工智能NLP\NLP资料\week6 语言模型\bert-base-chinese\vocab.txt",
"max_length": 20,
"hidden_size": 256,
"epoch": 10,
"batch_size": 128,
"epoch_data_size": 10000, #每轮训练中采样数量
"positive_sample_rate":0.5, #正样本比例
"optimizer": "adam",
"learning_rate": 1e-3,
}
Ⅱ、数据加载 loader.py
① 设置日志级别
logging.getLogger():Python 中 logging
模块的核心函数,用于获取或创建一个日志记录器(Logger)实例。每个日志记录器都有一个唯一的名称,用于标识和配置日志记录行为。如果未提供名称,则返回根记录器
参数名 | 类型 | 说明 |
---|---|---|
name | 字符串 | 可选参数,指定日志记录器的名称。如果未提供或为 None ,则返回根记录器。 |
setLevel():Logger
对象的方法,用于设置日志记录器的日志级别。只有等于或高于该级别的日志消息才会被处理,低于该级别的日志消息将被忽略
参数名 | 类型 | 说明 |
---|---|---|
level | 整数 | 指定日志级别,常用值包括 logging.DEBUG 、logging.INFO 、logging.WARNING 、logging.ERROR 和 logging.CRITICAL |
logging.getLogger("transformers").setLevel(logging.ERROR)
② 初始化 def __init__()
self.config = config
:将传入的配置字典保存到实例变量中。
self.path = data_path
:将数据路径保存到实例变量中。
self.tokenizer = load_vocab(config["vocab_path"])
:加载词汇表文件,初始化分词器。config["vocab_path"]
是词汇表文件的路径。
self.config["vocab_size"] = len(self.tokenizer.vocab)
:计算词汇表的大小,并将其更新到配置字典中。
self.schema = load_schema(config["schema_path"])
:加载模式文件(如数据格式定义),config["schema_path"]
是模式文件的路径。
self.train_data_size = config["epoch_data_size"]
:设置每个 epoch 的采样数据量。由于采用随机采样,需要指定一个固定的采样数量,否则可以一直采样self.max_length = config["max_length"]
**:设置输入序列的最大长度。
self.data_type = None
:用于标识加载的数据类型是训练集还是测试集,初始值为 None
,后续可以通过方法设置为 "train"
或 "test"
。
self.load()
:调用 load
方法,加载数据或执行其他初始化操作。
def __init__(self, data_path, config):
self.config = config
self.path = data_path
self.tokenizer = load_vocab(config["vocab_path"])
self.config["vocab_size"] = len(self.tokenizer.vocab)
self.schema = load_schema(config["schema_path"])
self.train_data_size = config["epoch_data_size"] #由于采取随机采样,所以需要设定一个采样数量,否则可以一直采
self.max_length = config["max_length"]
self.data_type = None #用来标识加载的是训练集还是测试集 "train" or "test"
self.load()
③ 从指定路径文件加载数据
- 打开文件(
self.path
),逐行读取并解析为 JSON 对象。 - 根据数据的类型(字典或列表)进行分类:
- 训练集:如果数据是字典类型,提取
questions
和target
,将问题存储到self.knwb
中,键为标签(通过self.schema
映射)。 - 测试集:如果数据是列表类型,提取问题和标签,将标签转换为
torch.LongTensor
并存储到self.data
中。
- 训练集:如果数据是字典类型,提取
defaultdict():是 collections
模块中的一个类,继承自 dict
。它允许在访问不存在的键时返回一个默认值,而不是抛出 KeyError
异常。
参数名 | 类型 | 说明 |
---|---|---|
default_factory | 可调用对象 | 指定默认值的生成函数,如 list 、int 、set 等。如果未提供,默认为 None 。 |
json.loads():将 JSON 格式的字符串解析为 Python 对象(如字典、列表等)。
参数名 | 类型 | 说明 |
---|---|---|
s | 字符串 | 要解析的 JSON 字符串。 |
encoding | 字符串 | 字符编码(Python 3 中已弃用)。 |
cls | 类 | 自定义解码类,默认为 None 。 |
object_hook | 函数 | 用于自定义 JSON 对象转换为 Python 对象的函数。 |
parse_float | 函数 | 自定义将 JSON 中的浮点数转换为特定类型的函数。 |
parse_int | 函数 | 自定义将 JSON 中的整数转换为特定类型的函数。 |
parse_constant | 函数 | 自定义将 JSON 中的常量(如 Infinity )转换为特定类型的函数。 |
object_pairs_hook | 函数 | 用于处理 JSON 对象中的键值对的函数,默认返回字典。 |
isinstance():检查一个对象是否是指定类型或其子类的实例。
参数名 | 类型 | 说明 |
---|---|---|
object | 对象 | 需要检查的对象。 |
classinfo | 类型或元组 | 可以是一个类型(如 int 、str 等),也可以是这些类型的元组。 |
append():在列表的末尾添加一个元素。
参数名 | 类型 | 说明 |
---|---|---|
element | 任意类型 | 要添加到列表末尾的元素。 |
assert:用于调试,检查一个条件是否为真。如果条件为假,则抛出 AssertionError
异常。
参数名 | 类型 | 说明 |
---|---|---|
condition | 布尔表达式 | 需要检查的条件。 |
message | 字符串 | 可选参数,当条件为假时抛出的错误信息。 |
torch.LongTensor():创建一个包含整数的张量(Tensor),元素类型为 64 位整数
参数名 | 类型 | 说明 |
---|---|---|
data | 列表或数组 | 用于初始化张量的数据。 |
dtype | 类型 | 指定张量的数据类型,默认为 torch.long 。 |
device | 设备 | 指定张量存储的设备(如 cpu 或 gpu ),默认为 cpu 。 |
requires_grad | 布尔值 | 指定是否需要计算张量的梯度,默认为 False 。 |
def load(self):
self.data = []
self.knwb = defaultdict(list)
with open(self.path, encoding="utf8") as f:
for line in f:
line = json.loads(line)
#加载训练集
if isinstance(line, dict):
self.data_type = "train"
questions = line["questions"]
label = line["target"]
for question in questions:
self.knwb[self.schema[label]].append(question)
#加载测试集
else:
self.data_type = "test"
assert isinstance(line, list)
question, label = line
label_index = torch.LongTensor([self.schema[label]])
self.data.append([question, label_index])
return
④ 拼接两句话转成向量
-
拼接与编码:
- 使用
self.tokenizer.encode
方法对text1
和text2
进行拼接和编码。 - 拼接后的文本会被转换为模型可接受的输入格式(如
input_ids
)。
- 使用
-
参数说明:
truncation='longest_first'
:如果拼接后的文本长度超过max_length
,则从较长的部分开始截断。max_length=self.max_length
:指定编码后的最大长度,超过的部分会被截断。padding='max_length'
:如果拼接后的文本长度不足max_length
,则用填充符(如[PAD]
)补齐。
-
返回结果:
- 返回编码后的
input_id
,通常是一个包含 token ID 的列表或张量。
- 返回编码后的
tokenizer.encode():transformers
库中 Tokenizer
类的一个方法,用于将文本转换为模型可接受的 token ID 序列。它通常用于自然语言处理任务中,将输入文本编码为模型输入格式。
参数名 | 类型 | 说明 |
---|---|---|
text | str 或 List[str] | 需要编码的文本,可以是单个字符串或字符串列表。 |
text_pair | str 或 List[str] | 可选参数,第二个文本(如句子对任务中的第二句话)。 |
add_special_tokens | bool | 是否添加特殊标记(如 [CLS] 和 [SEP] ),默认为 True 。 |
max_length | int | 指定编码后的最大长度,超过的部分会被截断。 |
truncation | bool 或 str | 指定截断策略,如 'longest_first' 、'only_first' 或 'only_second' 。 |
padding | bool 或 str | 指定填充策略,如 'max_length' 或 'longest' 。 |
return_tensors | str | 指定返回的 tensor 类型,如 'pt' (PyTorch)或 'tf' (TensorFlow)。 |
return_token_type_ids | bool | 是否返回 token_type_ids (用于区分句子对任务中的两个句子)。 |
return_attention_mask | bool | 是否返回 attention_mask (用于标识有效 token)。 |
#每次加载两个文本,输出他们的拼接后编码
def encode_sentence(self, text1, text2):
input_id = self.tokenizer.encode(text1, text2,
truncation='longest_first',
max_length=self.max_length,
padding='max_length',
)
return input_id
⑤ 返回数据集的长度 def __len__()
该函数的作用是返回对象的“长度”,具体行为取决于对象的 data_type
属性。
如果 data_type
为 "train"
,则返回 self.config["epoch_data_size"]
的值,这通常表示训练数据的规模。
如果 data_type
为 "test"
,则返回 self.data
的长度,即测试数据的元素个数。
如果 data_type
不是 "train"
或 "test"
,则会触发 AssertionError
,并提示 self.data_type
的值。
assert:Python 中的调试工具,用于检查某个条件是否为真。如果条件为假,assert
会抛出 AssertionError
异常。
len(): Python 内置函数,用于返回对象的长度或元素数量。它适用于多种数据类型,包括字符串、列表、元组、字典、集合等。
参数名 | 类型 | 描述 |
---|---|---|
obj | 可迭代对象 | 需要计算长度的对象,如字符串、列表、元组、字典、集合等。 |
def __len__(self):
if self.data_type == "train":
return self.config["epoch_data_size"]
else:
assert self.data_type == "test", self.data_type
return len(self.data)
⑥ 通过索引访问数据集中的样本 def __getitem__()
判断数据类型: 如果是训练数据集,则调用self.random_train_sample()
方法来随机生成一个训练样本。这是因为训练数据集中的样本是通过随机采样生成的,而不是固定的数据。
如果不是训练数据集,则认为是测试数据集:直接返回self.data
列表中索引为index
的样本。
def __getitem__(self, index):
if self.data_type == "train":
return self.random_train_sample() #随机生成一个训练样本
else:
return self.data[index]
⑦ 随机生成训练样本 ⭐
正样本生成:从所有标准问题中随机选择一个标准问题 p
。如果该标准问题下至少有两个问题,则从中随机选取两个问题 s1
和 s2
。使用 encode_sentence
方法将 s1
和 s2
编码为模型输入格式,并返回编码结果及标签 1
(表示正样本)。如果标准问题下不足两个问题,则重新随机选择。
负样本生成:从所有标准问题中随机选择两个不同的标准问题 p
和 n
。分别从 p
和 n
中随机选取一个问题 s1
和 s2
。使用 encode_sentence
方法将 s1
和 s2
编码为模型输入格式,并返回编码结果及标签 0
(表示负样本)。
概率控制:使用 random.random()
和 self.config["positive_sample_rate"]
控制生成正样本的概率。
list():将可迭代对象(如字符串、元组、集合等)转换为列表。
参数名 | 类型 | 说明 |
---|---|---|
iterable | 可迭代对象 | 需要转换为列表的对象,如字符串、元组、集合等。如果未提供,则返回空列表。 |
字典.keys():返回字典中所有键的视图对象(dict_keys
),可以转换为列表
random.random():生成一个范围在 [0.0, 1.0)
之间的随机浮点数。
random.choice():从非空序列(如列表、元组、字符串等)中随机选择一个元素。
参数名 | 类型 | 说明 |
---|---|---|
seq | 序列 | 非空序列,如列表、元组、字符串等。 |
random.sample():从序列中随机选择指定数量的唯一元素,返回一个新列表。
参数名 | 类型 | 说明 |
---|---|---|
population | 序列 | 需要从中选择的序列,如列表、元组、字符串等。 |
k | 整数 | 需要选择的元素数量。 |
torch.LongTensor():创建一个包含 64 位整数的张量(Tensor)
参数名 | 类型 | 说明 |
---|---|---|
data | 列表或数组 | 用于初始化张量的数据,如列表或数组。 |
#依照一定概率生成负样本或正样本
#负样本从随机两个不同的标准问题中各随机选取一个
#正样本从随机一个标准问题中随机选取两个
def random_train_sample(self):
standard_question_index = list(self.knwb.keys())
#随机正样本
if random.random() <= self.config["positive_sample_rate"]:
p = random.choice(standard_question_index)
#如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
if len(self.knwb[p]) < 2:
return self.random_train_sample()
else:
s1, s2 = random.sample(self.knwb[p], 2)
input_ids = self.encode_sentence(s1, s2)
input_ids = torch.LongTensor(input_ids)
return [input_ids, torch.LongTensor([1])]
#随机负样本
else:
p, n = random.sample(standard_question_index, 2)
s1 = random.choice(self.knwb[p])
s2 = random.choice(self.knwb[n])
input_ids = self.encode_sentence(s1, s2)
input_ids = torch.LongTensor(input_ids)
return [input_ids, torch.LongTensor([0])]
⑧ 数据处理
加载词汇表:
加载词汇表:使用 vocab_path
参数指定词汇表文件的路径。通过 BertTokenizer
类加载词汇表,并初始化分词器。
返回分词器:返回初始化好的 BertTokenizer
分词器,用于后续的文本编码和分词操作。
#加载字表或词表
def load_vocab(vocab_path):
tokenizer = BertTokenizer(vocab_path)
return tokenizer
加载schema:
加载模式文件:使用 schema_path
参数指定模式文件的路径。打开文件并读取其内容。
解析 JSON 数据:使用 json.loads
函数将文件内容解析为 Python 对象(通常是字典)。
返回解析结果:返回解析后的 Python 对象,用于后续的处理或配置。
open():用于打开文件,并返回一个文件对象,以便进行读取或写入操作。
参数名 | 类型 | 说明 |
---|---|---|
file | 字符串 | 文件路径,可以是相对路径或绝对路径。 |
mode | 字符串 | 打开文件的模式,如 'r' (只读)、'w' (只写)、'a' (追加)等。 |
encoding | 字符串 | 文件编码格式,如 'utf-8' 。 |
errors | 字符串 | 指定编码错误处理方式,如 'ignore' 或 'strict' 。 |
newline | 字符串 | 控制换行符的行为,如 None (默认)或 '\n' 。 |
closefd | 布尔值 | 是否在关闭文件时关闭文件描述符,默认为 True 。 |
opener | 函数 | 自定义文件打开器。 |
json.loads():将 JSON 格式的字符串解析为 Python 对象(如字典、列表等)。
参数名 | 类型 | 说明 |
---|---|---|
s | 字符串 | 要解析的 JSON 字符串。 |
encoding | 字符串 | 字符编码(Python 3 中已弃用)。 |
cls | 类 | 自定义解码类,默认为 None 。 |
object_hook | 函数 | 自定义将 JSON 对象转换为其他类型的 Python 对象。 |
parse_float | 函数 | 自定义将 JSON 中的浮点数转换为特定类型。 |
parse_int | 函数 | 自定义将 JSON 中的整数转换为特定类型。 |
文件对象.read(): 从文件中读取指定数量的字节或整个文件内容。
参数名 | 类型 | 说明 |
---|---|---|
size | 整数 | 要读取的字节数。如果未指定或为负数,则读取整个文件。 |
#加载schema
def load_schema(schema_path):
with open(schema_path, encoding="utf8") as f:
return json.loads(f.read())
封装数据加载器:
创建数据生成器:使用 DataGenerator
类(假设已定义)从 data_path
和 config
中加载数据,并生成数据集对象 dg
。
封装为 DataLoader:使用 DataLoader
将 dg
封装为可迭代的数据加载器 dl
。batch_size
从 config
中获取,用于指定每个批次的样本数量。shuffle
参数控制是否在每个训练周期开始时打乱数据顺序。
返回 DataLoader:返回封装好的 DataLoader
对象 dl
,用于后续的训练或验证。
DataLoader():PyTorch 中的一个标准工具,用于高效地加载和处理数据。它支持批量加载、数据打乱、多线程加载等功能,是深度学习训练中常用的数据加载器。
参数名 | 类型 | 描述 |
---|---|---|
dataset | 数据集对象 | 要加载的数据集,必须实现 __len__ 和 __getitem__ 方法。 |
batch_size | 整数 | 每个批次的大小,默认为 1 。 |
shuffle | 布尔值 | 是否在每个 epoch 开始时打乱数据顺序,默认为 False 。 |
sampler | 采样器对象 | 自定义采样器,用于控制数据的采样方式,默认为 None 。 |
batch_sampler | 批次采样器对象 | 自定义批次采样器,用于控制批次的采样方式,默认为 None 。 |
num_workers | 整数 | 用于数据加载的子进程数量,默认为 0 (主进程加载)。 |
collate_fn | 函数 | 用于将一个批次的数据合并成一个张量或元组,默认为 None 。 |
pin_memory | 布尔值 | 是否将数据存储在 pin memory 中(用于 GPU 加速),默认为 False 。 |
drop_last | 布尔值 | 如果数据不能完全分成批次,是否删除最后一个不完整的批次,默认为 False 。 |
timeout | 整数 | 数据加载的最大等待时间(秒),默认为 0 (无限制)。 |
worker_init_fn | 函数 | 用于初始化每个数据加载器子进程的函数,默认为 None 。 |
#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
dg = DataGenerator(data_path, config)
dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
return dl
⑨ 加载数据文件
# -*- coding: utf-8 -*-
import json
import re
import os
import torch
import random
import logging
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
from transformers import BertTokenizer
"""
数据加载
"""
logging.getLogger("transformers").setLevel(logging.ERROR)
class DataGenerator:
def __init__(self, data_path, config):
self.config = config
self.path = data_path
self.tokenizer = load_vocab(config["vocab_path"])
self.config["vocab_size"] = len(self.tokenizer.vocab)
self.schema = load_schema(config["schema_path"])
self.train_data_size = config["epoch_data_size"] #由于采取随机采样,所以需要设定一个采样数量,否则可以一直采
self.max_length = config["max_length"]
self.data_type = None #用来标识加载的是训练集还是测试集 "train" or "test"
self.load()
def load(self):
self.data = []
self.knwb = defaultdict(list)
with open(self.path, encoding="utf8") as f:
for line in f:
line = json.loads(line)
#加载训练集
if isinstance(line, dict):
self.data_type = "train"
questions = line["questions"]
label = line["target"]
for question in questions:
self.knwb[self.schema[label]].append(question)
#加载测试集
else:
self.data_type = "test"
assert isinstance(line, list)
question, label = line
label_index = torch.LongTensor([self.schema[label]])
self.data.append([question, label_index])
return
#每次加载两个文本,输出他们的拼接后编码
def encode_sentence(self, text1, text2):
input_id = self.tokenizer.encode(text1, text2,
truncation='longest_first',
max_length=self.max_length,
padding='max_length',
)
return input_id
def __len__(self):
if self.data_type == "train":
return self.config["epoch_data_size"]
else:
assert self.data_type == "test", self.data_type
return len(self.data)
def __getitem__(self, index):
if self.data_type == "train":
return self.random_train_sample() #随机生成一个训练样本
else:
return self.data[index]
#依照一定概率生成负样本或正样本
#负样本从随机两个不同的标准问题中各随机选取一个
#正样本从随机一个标准问题中随机选取两个
def random_train_sample(self):
standard_question_index = list(self.knwb.keys())
#随机正样本
if random.random() <= self.config["positive_sample_rate"]:
p = random.choice(standard_question_index)
#如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
if len(self.knwb[p]) < 2:
return self.random_train_sample()
else:
s1, s2 = random.sample(self.knwb[p], 2)
input_ids = self.encode_sentence(s1, s2)
input_ids = torch.LongTensor(input_ids)
return [input_ids, torch.LongTensor([1])]
#随机负样本
else:
p, n = random.sample(standard_question_index, 2)
s1 = random.choice(self.knwb[p])
s2 = random.choice(self.knwb[n])
input_ids = self.encode_sentence(s1, s2)
input_ids = torch.LongTensor(input_ids)
return [input_ids, torch.LongTensor([0])]
#加载字表或词表
def load_vocab(vocab_path):
tokenizer = BertTokenizer(vocab_path)
return tokenizer
#加载schema
def load_schema(schema_path):
with open(schema_path, encoding="utf8") as f:
return json.loads(f.read())
#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
dg = DataGenerator(data_path, config)
dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
return dl
if __name__ == "__main__":
from config import Config
dg = DataGenerator("../data/valid.json", Config)
Ⅲ、模型定义 model.py
① 从输入张量提取第一个元素并返回
class GetFirst(nn.Module):
def __init__(self):
super(GetFirst, self).__init__()
def forward(self, x):
return x[0]
② 文本匹配网络初始化
输入:输入是一个词索引序列,形状为 (batch_size, max_len)
,其中 max_len
是序列的最大长度。
嵌入层:将词索引映射为稠密向量,输出形状为 (batch_size, max_len, hidden_size)
。
LSTM 编码器:双向 LSTM 编码输入序列,输出形状为 (batch_size, max_len, hidden_size * 2)
。GetFirst()
提取第一个时间步的隐藏状态,输出形状为 (batch_size, hidden_size * 2)
。线性层和 ReLU 激活函数进一步处理,输出形状为 (batch_size, hidden_size)
。
分类层:将隐藏层的输出映射为 2 维向量,输出形状为 (batch_size, 2)
。
损失计算:使用交叉熵损失函数计算模型预测值与真实标签之间的损失。
nn.Embedding():将离散的整数(如单词索引)映射到低维的稠密向量空间,常用于自然语言处理任务。
参数名 | 类型 | 说明 |
---|---|---|
num_embeddings | int | 嵌入字典的大小,即词汇表的大小。 |
embedding_dim | int | 每个嵌入向量的维度。 |
padding_idx | int, optional | 指定一个索引,用于填充(padding),在计算梯度时不会被更新。 |
max_norm | float, optional | 如果设置,嵌入向量的范数会被裁剪到不超过该值。 |
norm_type | float, optional | 用于裁剪范数的类型,默认为 2(L2 范数)。 |
scale_grad_by_freq | bool, optional | 如果为 True,梯度会根据单词在 mini-batch 中的频率进行缩放。 |
sparse | bool, optional | 如果为 True,梯度将会是稀疏张量。 |
nn.Sequential():按顺序组织多个神经网络层或模块,简化模型定义。
参数名 | 类型 | 说明 |
---|---|---|
*args | Module | 按顺序传入多个模块(如层或激活函数)。 |
nn.ReLU():修正线性单元激活函数,将负值置为 0,正值保持不变,用于引入非线性。
参数名 | 类型 | 说明 |
---|---|---|
inplace | bool, optional | 如果为 True,会直接修改输入数据,节省内存。默认为 False。 |
nn.Linear():全连接层,对输入进行线性变换(矩阵乘法和偏置加法)。
参数名 | 类型 | 说明 |
---|---|---|
in_features | int | 输入特征维度。 |
out_features | int | 输出特征维度。 |
bias | bool, optional | 是否添加偏置项,默认为 True。 |
nn.CrossEntropyLoss():用于多分类任务的损失函数,结合了nn.LogSoftmax()
和 nn.NLLLoss()
参数名 | 类型 | 说明 |
---|---|---|
weight | Tensor, optional | 类别权重,用于处理类别不平衡问题。 |
ignore_index | int, optional | 指定一个索引,忽略该类别的损失计算。 |
reduction | str, optional | 指定损失计算的方式,如 'mean' 、'sum' 或 'none' 。默认为 'mean' 。 |
def __init__(self, config):
super(SentenceMatchNetwork, self).__init__()
# 可以用bert,参考下面
# pretrain_model_path = config["pretrain_model_path"]
# self.bert_encoder = BertModel.from_pretrained(pretrain_model_path)
# 常规的embedding + layer
hidden_size = config["hidden_size"]
#20000应为词表大小,这里借用bert的词表,没有用它精确的数字,因为里面有很多无用词,舍弃一部分,不影响效果
self.embedding = nn.Embedding(20000, hidden_size)
#一种多层按顺序执行的写法,具体的层可以换
#unidirection:batch_size, max_len, hidden_size
#bidirection:batch_size, max_len, hidden_size * 2
self.encoder = nn.Sequential(nn.LSTM(hidden_size, hidden_size, bidirectional=True, batch_first=True),
GetFirst(),
nn.ReLU(),
nn.Linear(hidden_size * 2, hidden_size), #batch_size, max_len, hidden_size
nn.ReLU(),
)
self.classify_layer = nn.Linear(hidden_size, 2)
self.loss = nn.CrossEntropyLoss()
③ 计算句子间相似度
输入:接收 input_ids
(两个句子的拼接编码)和可选的 target
(标签)。
嵌入层:将 input_ids
转换为稠密向量 x
。
编码器:通过 LSTM、ReLU 和线性层处理 x
,得到编码后的输出。
池化:对编码后的输出进行最大池化,提取特征。
分类层:将池化后的输出映射为 2 维向量,表示匹配和不匹配的得分。
输出:如果有 target
,计算交叉熵损失并返回。如果没有 target
,返回两句话匹配的概率。
squeeze():移除张量中所有大小为 1 的维度,或根据需要移除特定维度。例如,形状为 (1,3,1,4) 的张量,经过 squeeze()
后将变为 (3,4)。
参数名 | 类型 | 说明 |
---|---|---|
input | Tensor | 输入张量。 |
dim | int, optional | 指定要移除的维度。如果未指定,则移除所有大小为 1 的维度。 |
torch.softmax(): 将输入张量的元素转换为概率分布,适用于多分类任务。输出的每个元素值在 (0,1) 之间,且所有元素的和为 1。
参数名 | 类型 | 说明 |
---|---|---|
input | Tensor | 输入张量。 |
dim | int | 指定在哪个维度上计算 Softmax。例如,dim=1 表示对每一行计算 Softmax。 |
dtype | dtype, optional | 输出张量的数据类型。 |
# 同时传入两个句子的拼接编码
# 输出一个相似度预测,不匹配的概率
def forward(self, input_ids, target=None):
# x = self.bert_encoder(input_ids)[1]
#input_ids = batch_size, max_length
x = self.embedding(input_ids) #x:batch_size, max_length, embedding_size
x = self.encoder(x) #
#x: batch_size, max_len, hidden_size
x = nn.MaxPool1d(x.shape[1])(x.transpose(1,2)).squeeze()
#x: batch_size, hidden_size
x = self.classify_layer(x)
#x: batch_size, 2
#如果有标签,则计算loss
if target is not None:
return self.loss(x, target.squeeze())
#如果无标签,预测相似度
else:
return torch.softmax(x, dim=-1)[:, 1] #如果改为x[:,0]则是两句话不匹配的概率
④ 根据配置选择优化器
根据用户在配置字典中指定的优化器类型("adam" 或 "sgd")来选择并初始化相应的优化器,并设置学习率。
model.parameters():PyTorch 中 torch.nn.Module
类的一个方法,用于获取模型中所有可训练参数的迭代器。这些参数通常是模型的权重和偏置,它们会在训练过程中通过优化器进行更新。
def choose_optimizer(config, model):
optimizer = config["optimizer"]
learning_rate = config["learning_rate"]
if optimizer == "adam":
return Adam(model.parameters(), lr=learning_rate)
elif optimizer == "sgd":
return SGD(model.parameters(), lr=learning_rate)
⑨ 加载数据文件
torch.LongTensor(): PyTorch 中的一个函数,用于创建一个包含整数(64 位整型)的张量。它主要用于处理整数类型的数据,例如索引、标签或其他整数值。
参数名 | 类型 | 说明 |
---|---|---|
data | list, tuple, numpy array | 输入数据,可以是列表、元组或 NumPy 数组,包含整数值。 |
device | torch.device, optional | 指定张量存储的设备(如 CPU 或 GPU)。默认为 CPU。 |
requires_grad | bool, optional | 是否需要对张量计算梯度。默认为 False 。 |
# -*- coding: utf-8 -*-
import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from transformers import BertModel, BertConfig
"""
建立网络模型结构
"""
class GetFirst(nn.Module):
def __init__(self):
super(GetFirst, self).__init__()
def forward(self, x):
return x[0]
class SentenceMatchNetwork(nn.Module):
def __init__(self, config):
super(SentenceMatchNetwork, self).__init__()
# 可以用bert,参考下面
# pretrain_model_path = config["pretrain_model_path"]
# self.bert_encoder = BertModel.from_pretrained(pretrain_model_path)
# 常规的embedding + layer
hidden_size = config["hidden_size"]
#20000应为词表大小,这里借用bert的词表,没有用它精确的数字,因为里面有很多无用词,舍弃一部分,不影响效果
self.embedding = nn.Embedding(20000, hidden_size)
#一种多层按顺序执行的写法,具体的层可以换
#unidirection:batch_size, max_len, hidden_size
#bidirection:batch_size, max_len, hidden_size * 2
self.encoder = nn.Sequential(nn.LSTM(hidden_size, hidden_size, bidirectional=True, batch_first=True),
GetFirst(),
nn.ReLU(),
nn.Linear(hidden_size * 2, hidden_size), #batch_size, max_len, hidden_size
nn.ReLU(),
)
self.classify_layer = nn.Linear(hidden_size, 2)
self.loss = nn.CrossEntropyLoss()
# 同时传入两个句子的拼接编码
# 输出一个相似度预测,不匹配的概率
def forward(self, input_ids, target=None):
# x = self.bert_encoder(input_ids)[1]
#input_ids = batch_size, max_length
x = self.embedding(input_ids) #x:batch_size, max_length, embedding_size
x = self.encoder(x) #
#x: batch_size, max_len, hidden_size
x = nn.MaxPool1d(x.shape[1])(x.transpose(1,2)).squeeze()
#x: batch_size, hidden_size
x = self.classify_layer(x)
#x: batch_size, 2
#如果有标签,则计算loss
if target is not None:
return self.loss(x, target.squeeze())
#如果无标签,预测相似度
else:
return torch.softmax(x, dim=-1)[:, 1] #如果改为x[:,0]则是两句话不匹配的概率
def choose_optimizer(config, model):
optimizer = config["optimizer"]
learning_rate = config["learning_rate"]
if optimizer == "adam":
return Adam(model.parameters(), lr=learning_rate)
elif optimizer == "sgd":
return SGD(model.parameters(), lr=learning_rate)
if __name__ == "__main__":
from config import Config
Config["vocab_size"] = 10
Config["max_length"] = 4
model = SentenceMatchNetwork(Config)
s1 = torch.LongTensor([[1,2,3,0], [2,2,0,0]])
s2 = torch.LongTensor([[1,2,3,4], [3,2,3,4]])
l = torch.LongTensor([[1],[0]])
# y = model(s1, s2, l)
# print(y)
# print(model.state_dict())
Ⅳ、模型效果测试 evaluate.py
① 模型初始化
保存配置、模型和日志:self.config = config
:将传入的配置字典保存到实例变量中。self.model = model
:将传入的模型对象保存到实例变量中。self.logger = logger
:将传入的日志对象保存到实例变量中。
加载验证数据:self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)
:加载验证数据,并通过 load_data
函数封装为 DataLoader
对象。shuffle=False
表示不打乱数据顺序。
加载训练数据:self.train_data = load_data(config["train_data_path"], config)
:加载训练数据,并通过 load_data
函数封装为 DataLoader
对象。这里提到重新加载训练数据是为了将训练集作为知识库进行效果测试。
获取分词器:self.tokenizer = self.train_data.dataset.tokenizer
:从训练数据集中获取分词器对象,用于后续的文本处理。
初始化统计字典:self.stats_dict = {"correct":0, "wrong":0}
:用于存储测试结果的字典,记录正确和错误的样本数量。
def __init__(self, config, model, logger):
self.config = config
self.model = model
self.logger = logger
self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)
# 由于效果测试需要训练集当做知识库,再次加载训练集。
# 事实上可以通过传参把前面加载的训练集传进来更合理,但是为了主流程代码改动量小,在这里重新加载一遍
self.train_data = load_data(config["train_data_path"], config)
self.tokenizer = self.train_data.dataset.tokenizer
self.stats_dict = {"correct":0, "wrong":0} #用于存储测试结果
② 将问题转换为向量表示 ⭐
初始化 self.question_index_to_standard_question_index
字典,用于存储问题编号到标准问题编号的映射。
初始化 self.questions
列表,用于存储所有问题。
遍历 self.train_data.dataset.knwb
,获取每个标准问题编号及其对应的问题列表。对于每个问题,记录其编号到标准问题编号的映射,并将问题添加到 self.questions
列表中。
items():Python 字典的内置方法,用于返回字典中所有键值对的视图对象。每个键值对以元组形式表示,方便遍历或操作字典数据。
len():是 Python 的内置函数,用于返回序列(如字符串、列表、元组)或集合(如字典、集合)中元素的数量。
参数名 | 类型 | 说明 |
---|---|---|
sequence | 序列或集合 | 需要计算长度的对象,如字符串、列表、元组、字典等。 |
列表.append():Python 列表的内置方法,用于在列表末尾添加一个元素。该方法会直接修改原列表,而不是返回一个新列表。
参数名 | 类型 | 说明 |
---|---|---|
element | 任意类型 | 要添加到列表末尾的元素,可以是字符串、数字、列表、元组等。 |
#将知识库中的问题向量化,为匹配做准备
#每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
def knwb_to_vector(self):
self.question_index_to_standard_question_index = {}
self.questions = []
for standard_question_index, questions in self.train_data.dataset.knwb.items():
for question in questions:
#记录问题编号到标准问题标号的映射,用来确认答案是否正确
self.question_index_to_standard_question_index[len(self.questions)] = standard_question_index
self.questions.append(question)
return
③ 评估模型表现
- 日志记录:使用
self.logger.info
记录当前测试的轮次。 - 初始化统计字典:
self.stats_dict
用于记录正确和错误的预测数量。 - 模型设置为评估模式:
self.model.eval()
将模型设置为评估模式,关闭 dropout 和 batch normalization 的随机性。 - 知识库向量化:调用
self.knwb_to_vector()
将知识库中的问题向量化,为匹配做准备。 - 遍历验证数据:
- 对每个批次的数据,提取测试问题和标签;
- 对每个测试问题,计算其与知识库中所有问题的相似度得分;
- 使用 torch.no_grad0) 关闭梯度计算,提高效率将得分转换为列表,并找到最高得分的索引(即最匹配的问题)
- 将预测结果添加到predicts 列表中
- 记录统计结果:调用
self.write_stats
记录预测结果。 - 显示统计结果:调用
self.show_stats
显示测试结果。
logger.info():用于记录信息级别的日志,通常用于记录程序运行时的状态或过程信息。
参数名 | 类型 | 说明 |
---|---|---|
msg | 字符串 | 要记录的日志信息。 |
*args | 可变参数 | 用于格式化日志信息的参数。 |
**kwargs | 关键字参数 | 可选参数,如 exc_info=True 用于记录异常信息。 |
model.eval():将模型设置为评估模式,关闭 Dropout 和 Batch Normalization 的训练模式,确保模型在推理时行为一致。
enumerate():将可迭代对象(如列表、元组、字符串)组合为索引序列,返回一个枚举对象,包含索引和值。
参数名 | 类型 | 说明 |
---|---|---|
iterable | 可迭代对象 | 需要枚举的对象,如列表、元组、字符串等。 |
start | 整数 | 可选参数,指定索引的起始值,默认为 0。 |
列表.append():在列表末尾添加一个元素。
参数名 | 类型 | 说明 |
---|---|---|
element | 任意类型 | 要添加到列表末尾的元素。 |
torch.no_grad():上下文管理器,用于禁用梯度计算,通常在模型评估或推理时使用,以减少内存消耗并加速计算。
torch.LongTensor():创建一个包含 64 位整数的张量。
参数名 | 类型 | 说明 |
---|---|---|
data | 列表、元组 | 输入数据,用于初始化张量。 |
torch.cuda.is_available():检查当前系统是否支持 CUDA(即是否有可用的 GPU)。
cuda():将张量或模型移动到 GPU 上,以利用 GPU 进行计算。
参数名 | 类型 | 说明 |
---|---|---|
device | 整数或设备 | 可选参数,指定目标 GPU 设备,默认为当前设备。 |
.detach():从计算图中分离张量,返回一个不需要梯度的新张量
.cpu():将张量或模型移动到 CPU 上。
.tolist():将张量转换为 Python 列表。
np.argmax(): 返回数组中最大值的索引。
参数名 | 类型 | 说明 |
---|---|---|
array | 数组 | 输入数组。 |
axis | 整数 | 可选参数,指定沿哪个轴查找最大值索引,默认为 None(展平数组)。 |
def eval(self, epoch):
self.logger.info("开始测试第%d轮模型效果:" % epoch)
self.stats_dict = {"correct":0, "wrong":0} #清空前一轮的测试结果
self.model.eval()
self.knwb_to_vector()
for index, batch_data in enumerate(self.valid_data):
test_questions, labels = batch_data
predicts = []
for test_question in test_questions:
input_ids = []
for question in self.questions:
input_ids.append(self.train_data.dataset.encode_sentence(test_question, question))
with torch.no_grad():
input_ids = torch.LongTensor(input_ids)
if torch.cuda.is_available():
input_ids = input_ids.cuda()
scores = self.model(input_ids).detach().cpu().tolist()
hit_index = np.argmax(scores)
# print(hit_index)
predicts.append(hit_index)
self.write_stats(predicts, labels)
self.show_stats()
return
④ 记录模型预测结果
断言检查:assert len(labels) == len(predicts)
:确保预测结果和标签的数量一致,否则抛出异常。
遍历预测结果和标签:使用 zip(predicts, labels)
将预测结果和标签一一对应。
转换预测索引:hit_index = self.question_index_to_standard_question_index[hit_index]
:将预测的问题索引转换为标准问题索引。
统计正确和错误的预测:如果预测的标准问题索引与标签一致,则 self.stats_dict["correct"] += 1
。否则,self.stats_dict["wrong"] += 1
。
返回:方法执行完毕后返回 None
。
zip():用于将多个可迭代对象的元素打包成元组,返回一个可迭代对象
参数名 | 类型 | 说明 |
---|---|---|
*iterables | 可迭代对象 | 一个或多个可迭代对象(如列表、元组、字符串等),用于打包成元组。 |
strict | 布尔值 | 可选参数,默认为 False 。如果为 True ,当可迭代对象长度不一致时会抛出 ValueError 。 |
len():用于返回对象的长度或项目数量,支持多种数据类型
参数名 | 类型 | 说明 |
---|---|---|
obj | 对象 | 需要计算长度的对象,可以是字符串、列表、元组、字典、集合等。 |
int():用于返回对象的长度或项目数量,支持多种数据类型
参数名 | 类型 | 说明 |
---|---|---|
x | 字符串、数字 | 需要转换为整数的对象,可以是字符串、类似字节的对象或数字。 |
base | 整数 | 可选参数,默认为 10。指定 x 的进制,例如 2(二进制)、16(十六进制)等。 |
def write_stats(self, predicts, labels):
assert len(labels) == len(predicts)
for hit_index, label in zip(predicts, labels):
hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号
if int(hit_index) == int(label):
self.stats_dict["correct"] += 1
else:
self.stats_dict["wrong"] += 1
return
⑤ 展示模型预测的统计信息
- 统计信息提取:
correct = self.stats_dict["correct"]
:从self.stats_dict
中获取预测正确的条目数。wrong = self.stats_dict["wrong"]
:从self.stats_dict
中获取预测错误的条目数。
- 日志输出:
self.logger.info("预测集合条目总量:%d" % (correct + wrong))
:输出预测集合的总条目数,即正确条目数与错误条目数之和。self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong))
:输出预测正确的条目数和预测错误的条目数。self.logger.info("预测准确率:%f" % (correct / (correct + wrong)))
:输出预测的准确率,即正确条目数占总条目数的比例。self.logger.info("--------------------")
:输出分隔线,用于区分不同的日志信息。
logger.info(): 是 Python 的 logging
模块中用于记录信息级别(info level)日志的函数。它通常用于记录程序运行时的关键信息,如状态、进度等,而不是调试信息或错误。
参数名 | 类型 | 描述 |
---|---|---|
msg | str | 要记录的日志信息。 |
*args | 任意类型 | 用于格式化日志信息的参数。 |
**kwargs | dict | 可选参数,如 exc_info 、stack_info 等,用于附加异常或堆栈信息。 |
# -*- coding: utf-8 -*-
import torch
from loader import load_data
import numpy as np
"""
模型效果测试
"""
class Evaluator:
def __init__(self, config, model, logger):
self.config = config
self.model = model
self.logger = logger
self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)
# 由于效果测试需要训练集当做知识库,再次加载训练集。
# 事实上可以通过传参把前面加载的训练集传进来更合理,但是为了主流程代码改动量小,在这里重新加载一遍
self.train_data = load_data(config["train_data_path"], config)
self.tokenizer = self.train_data.dataset.tokenizer
self.stats_dict = {"correct":0, "wrong":0} #用于存储测试结果
#将知识库中的问题向量化,为匹配做准备
#每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
def knwb_to_vector(self):
self.question_index_to_standard_question_index = {}
self.questions = []
for standard_question_index, questions in self.train_data.dataset.knwb.items():
for question in questions:
#记录问题编号到标准问题标号的映射,用来确认答案是否正确
self.question_index_to_standard_question_index[len(self.questions)] = standard_question_index
self.questions.append(question)
return
def eval(self, epoch):
self.logger.info("开始测试第%d轮模型效果:" % epoch)
self.stats_dict = {"correct":0, "wrong":0} #清空前一轮的测试结果
self.model.eval()
self.knwb_to_vector()
for index, batch_data in enumerate(self.valid_data):
test_questions, labels = batch_data
predicts = []
for test_question in test_questions:
input_ids = []
for question in self.questions:
input_ids.append(self.train_data.dataset.encode_sentence(test_question, question))
with torch.no_grad():
input_ids = torch.LongTensor(input_ids)
if torch.cuda.is_available():
input_ids = input_ids.cuda()
scores = self.model(input_ids).detach().cpu().tolist()
hit_index = np.argmax(scores)
# print(hit_index)
predicts.append(hit_index)
self.write_stats(predicts, labels)
self.show_stats()
return
def write_stats(self, predicts, labels):
assert len(labels) == len(predicts)
for hit_index, label in zip(predicts, labels):
hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号
if int(hit_index) == int(label):
self.stats_dict["correct"] += 1
else:
self.stats_dict["wrong"] += 1
return
def show_stats(self):
correct = self.stats_dict["correct"]
wrong = self.stats_dict["wrong"]
self.logger.info("预测集合条目总量:%d" % (correct +wrong))
self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong))
self.logger.info("预测准确率:%f" % (correct / (correct + wrong)))
self.logger.info("--------------------")
return
Ⅴ、训练主流程 main.py
① 配置日志记录
logging.basicConfig():用于配置日志系统的基本设置,包括日志级别、输出格式、输出目标(如控制台或文件)等。它通常在程序初始化时调用一次。
参数名 | 类型 | 描述 |
---|---|---|
level | int | 设置日志级别,如 logging.DEBUG 、logging.INFO 等。 |
format | str | 设置日志输出格式,如 '%(asctime)s - %(levelname)s - %(message)s' 。 |
filename | str | 设置日志输出到文件,指定文件名。 |
filemode | str | 设置文件打开模式,默认为 'a' (追加模式)。 |
handlers | list | 设置自定义的日志处理器。 |
logging.getLogger(): 用于获取一个日志记录器(Logger)对象。每个记录器都有一个名称(name),可以用来区分不同的日志记录器。
参数名 | 类型 | 描述 |
---|---|---|
name | str | 日志记录器的名称。如果未提供或为 None ,则返回根记录器(root logger)。 |
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
② 创建保存模型的目录
检查指定的模型保存路径是否存在,如果不存在,则创建该路径作为目录。
os.path.isdir(): Python 中 os.path
模块的函数,用于检查指定的路径是否为一个存在的目录。如果路径存在且是一个目录,则返回 True
,否则返回 False
。
参数名 | 类型 | 描述 |
---|---|---|
path | str | 表示文件系统路径的类路径对象(可以是相对路径或绝对路径)。 |
os.mkdir():Python 中 os
模块的函数,用于创建一个新的目录。如果目录已经存在或路径无效,会抛出 FileExistsError
或 OSError
参数名 | 类型 | 描述 |
---|---|---|
path | str | 要创建的目录路径。 |
mode | int | 可选参数,设置目录的权限(八进制模式),默认为 0o777 。 |
#创建保存模型的目录
if not os.path.isdir(config["model_path"]):
os.mkdir(config["model_path"])
③ 加载文件
torch.cuda.is_available():PyTorch 中用于检查当前系统是否支持 CUDA(即是否有可用的 GPU)的函数。
参数名 | 类型 | 描述 |
---|---|---|
device | int 或 str | 指定目标 GPU 设备,默认为 None (使用默认 GPU)。 |
cuda():PyTorch 张量或模型的方法,用于将张量或模型移动到 GPU 上。
#加载训练数据
train_data = load_data(config["train_data_path"], config)
#加载模型
model = SiameseNetwork(config)
# 标识是否使用gpu
cuda_flag = torch.cuda.is_available()
if cuda_flag:
logger.info("gpu可以使用,迁移模型至gpu")
model = model.cuda()
#加载优化器
optimizer = choose_optimizer(config, model)
#加载效果测试类
evaluator = Evaluator(config, model, logger)
④ 训练的核心过程 ⭐
-
训练循环:
for epoch in range(config["epoch"])
:遍历每个训练周期(epoch),config["epoch"]
是训练的总周期数。epoch += 1
:将当前周期数加1,用于日志记录。model.train()
:将模型设置为训练模式,启用Dropout和Batch Normalization等训练特定行为。
-
数据加载与处理:
for index, batch_data in enumerate(train_data)
:遍历训练数据集的每个批次(batch)。optimizer.zero_grad()
:清空优化器的梯度缓存,避免梯度累积。if cuda_flag: batch_data = [d.cuda() for d in batch_data]
:如果启用了CUDA(GPU加速),将数据移动到GPU上。
-
模型前向传播与损失计算:
input_id1, input_id2, labels = batch_data
:从批次数据中提取两个输入(input_id1
和input_id2
)以及对应的标签(labels
)。
-
反向传播与优化:
train_loss.append(loss.item())
:将当前批次的损失值记录下来。loss.backward()
:执行反向传播,计算梯度。optimizer.step()
:更新模型参数,优化损失函数。
-
日志记录与评估:
logger.info("epoch average loss: %f" % np.mean(train_loss))
:记录当前周期的平均损失值。evaluator.eval(epoch)
:调用评估器对模型进行评估,通常是在验证集上计算准确率或其他指标。
model.train():将模型设置为训练模式。在训练模式下,模型会启用 Dropout
和 Batch Normalization
等层的行为,以确保模型在训练时能够正常工作
enumerate():Python 的内置函数,用于在遍历可迭代对象时同时获取索引和值
参数名 | 类型 | 描述 |
---|---|---|
iterable | 可迭代对象 | 要遍历的对象(如列表、元组、字符串)。 |
start | int | 索引的起始值,默认为 0 。 |
optimizer.zero_grad():将优化器中所有参数的梯度清零,避免梯度累积
cuda():将张量或模型移动到 GPU 上,以利用 GPU 的并行计算能力加速计算
参数名 | 类型 | 描述 |
---|---|---|
device | int 或 str | 指定目标 GPU 设备,默认为 None (使用默认 GPU)。 |
append():在列表末尾添加一个新元素
参数名 | 类型 | 描述 |
---|---|---|
element | 任意类型 | 要添加到列表末尾的元素。 |
item():将包含单个元素的张量转换为 Python 标量(如 int
或 float
)
loss.backward():计算损失函数对模型参数的梯度,用于反向传播
optimizer.step(): 根据计算出的梯度更新模型参数
#训练
for epoch in range(config["epoch"]):
epoch += 1
model.train()
logger.info("epoch %d begin" % epoch)
train_loss = []
for index, batch_data in enumerate(train_data):
optimizer.zero_grad()
if cuda_flag: #如果gpu可用则使用gpu加速
batch_data = [d.cuda() for d in batch_data]
input_ids, labels = batch_data
loss = model(input_ids, labels) #计算loss
train_loss.append(loss.item())
#每轮训练一半的时候输出一下loss,观察下降情况
if index % int(len(train_data) / 2) == 0:
logger.info("batch loss %f" % loss)
loss.backward() #梯度计算
optimizer.step() #梯度更新
logger.info("epoch average loss: %f" % np.mean(train_loss))
evaluator.eval(config["epoch"])
⑤ 保存模型
os.path.join():用于将多个路径片段拼接成一个完整的路径,并自动根据操作系统选择正确的路径分隔符(如 Windows 使用 \
,Linux 和 macOS 使用 /
)
参数名 | 类型 | 描述 |
---|---|---|
path | str | 初始路径片段。 |
*paths | str | 需要拼接的后续路径片段,可以接受任意数量的参数。 |
torch.save():用于将 PyTorch 对象(如模型、张量、字典等)保存到磁盘文件中,通常用于保存模型的权重或训练状态
参数名 | 类型 | 描述 |
---|---|---|
obj | 任意对象 | 需要保存的对象,如模型、张量、字典等。 |
f | str 或文件对象 | 保存的目标文件路径或文件对象。 |
model.state_dict(): 返回一个包含模型所有可学习参数(如权重和偏置)的有序字典,通常用于保存或加载模型参数
model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)
torch.save(model.state_dict(), model_path)
模型训练主程序
# -*- coding: utf-8 -*-
import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import SentenceMatchNetwork, choose_optimizer
from evaluate import Evaluator
from loader import load_data
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
"""
模型训练主程序
"""
def main(config):
#创建保存模型的目录
if not os.path.isdir(config["model_path"]):
os.mkdir(config["model_path"])
#加载训练数据
train_data = load_data(config["train_data_path"], config)
#加载模型
model = SentenceMatchNetwork(config)
# 标识是否使用gpu
cuda_flag = torch.cuda.is_available()
if cuda_flag:
logger.info("gpu可以使用,迁移模型至gpu")
model = model.cuda()
#加载优化器
optimizer = choose_optimizer(config, model)
#加载效果测试类
evaluator = Evaluator(config, model, logger)
#训练
for epoch in range(config["epoch"]):
epoch += 1
model.train()
logger.info("epoch %d begin" % epoch)
train_loss = []
for index, batch_data in enumerate(train_data):
optimizer.zero_grad()
if cuda_flag: #如果gpu可用则使用gpu加速
batch_data = [d.cuda() for d in batch_data]
input_ids, labels = batch_data
loss = model(input_ids, labels) #计算loss
train_loss.append(loss.item())
#每轮训练一半的时候输出一下loss,观察下降情况
if index % int(len(train_data) / 2) == 0:
logger.info("batch loss %f" % loss)
loss.backward() #梯度计算
optimizer.step() #梯度更新
logger.info("epoch average loss: %f" % np.mean(train_loss))
evaluator.eval(config["epoch"])
model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)
torch.save(model.state_dict(), model_path)
return
if __name__ == "__main__":
main(Config)
三、对比:交互型 vs 表示型
1.表示型
Ⅰ、模型结构与交互机制
① 结构特点
采用双塔(Siamese)架构,两个文本分别通过共享参数的编码器(如MLP、CNN、LSTM、Transformer等)独立生成固定维度的语义向量,再通过余弦相似度、点积等方式计算匹配得分。
② 典型模型
DSSM、CDSSM、MV-LSTM、ARC-I等。
③ 交互时机
仅在最终匹配层进行浅层交互(如相似度计算),编码过程中文本间无信息交换。
Ⅱ、表示型模型的优势
① 计算效率高
可预先计算文本向量,在线匹配仅需向量相似度计算,适用于大规模检索场景(如搜索引擎)
② 参数共享
双塔共享编码器参数,减少模型复杂度并提升泛化能力
Ⅲ、表示型模型的局限性
① 语义焦点丢失
全局向量可能受无关词干扰(如长文本中冗余信息淹没关键短语)
② 词级信息缺失
无法捕捉词法、句法层面的局部匹配特征(如近义词替换或语序变化)
Ⅳ、应用场景
适合对响应速度要求高、候选集规模大的场景,如搜索引擎召回、推荐系统粗排
2.交互型
Ⅰ、模型结构与交互机制
① 结构特点
将两个文本拼接后输入单一编码器(如BERT),通过自注意力机制或交叉注意力直接在词 / 短语级别进行细粒度交互,再通过池化或分类器输出匹配结果
② 典型模型
BERT-based模型、ESIM、InferSent等
③ 交互时机
在编码层即引入词级交互,构建交互矩阵或注意力图,捕捉局部语义关联
Ⅱ、交互型模型的优势
① 匹配精度高
通过词级交互矩阵捕捉细粒度语义关联(如同义词、反义词、指代关系),显著提升复杂语义匹配的准确率
② 上下文建模强
自注意力机制可动态加权重要词汇,避免语义漂移
Ⅲ、交互型模型的局限性
① 计算成本高
需实时拼接文本对输入模型,难以支持海量候选集的快速检索
② 交互型模板
需实时拼接文本对输入模型,难以支持海量候选集的快速检索
Ⅳ、应用场景
适合对精度要求高、候选集较小的场景,如问答系统精排、复述检测、语义相似度评测
3.总结
表示型:
优点:训练好的模型可以对知识库内的问题计算向量,在实际查找过程中,只对输入文本做一次向量化,训练速度快
缺点:在向量化的过程中不知道文本重点
交互型:
优点:通过对比把握句子重点
缺点:每次计算都需要两个输入,需要等到问题来了,再去知识库中的问题作拼接,需要调入模型n次,比较耗时,模型效率低。
维度 | 表示型模型 | 交互型模型 |
---|---|---|
交互时机 | 后期(匹配层) | 早期(编码层) |
计算效率 | 高(支持预计算) | 低(需实时计算) |
匹配精度 | 一般 | 高 |
适用场景 | 大规模检索、粗排 | 精细化匹配、精排 |
典型优化方向 | 增强embedding层编码器(如Transformer) | 轻量化交互(如蒸馏、剪枝) |
四、对比学习
主要解决的问题:文本表示 / 图像表示
主要目标:训练一个好的文本 / 图像 编码器(model)
输入一个样本,对样本做一些处理(数据增强),就构造出了一个正样本 / 相似样本,再寻找一些负样本,传入模型让其进行学习,相当于一种无监督的表示型学习方式
五、海量向量查找
假如我们有1亿以上的候选向量
对于一个给定向量,希望查找距离最接近的
如何高效的完成?
快速的向量查找在问答,搜索,推荐等场景下均会使用
1.KD树
原理 —— 空间切割:
事先构造一棵树,步骤:
① 先选取计算维度,计算每个点x、y轴集合的方差,方差大的更加均匀
② 然后再选取当前维度下数据的中值位置
③ 将维度方差大的轴的中值位置作为根节点,轴的两边列为二叉树的左右子树
④ 然后对于子树重复选取点的步骤,直到将所有点全部加入树中
查询步骤:
① 依照建索引方式(左右子树)找到查询点的空间位置
② 向上回退,计算查询点与离其最近的节点距离 和 查询点到各个切割平面的距离
③ 如果到切割平面的距离大于到已知点的距离,就没必要跨过平面计算距离,根据情况判断是否需要查找平面另一侧节点
④ 回退到根节点为止
优点:减少余弦距离的计算次数,带来了效率的提升
2.KD树 —— 代码实现 🚀
Ⅰ、定义KD树的节点类
① 初始化KD树的结点
self.father:父节点
self.left:左子结点
self.right:右子结点
self.feature:当前结点用于分割数据的特征(维度)的索引
self.split:包含分割点向量和其对应的标签(索引) ,在叶子节点中,split
包含了数据点本身及其标签
class Node(object):
def __init__(self):
"""Node class to build tree leaves.
"""
self.father = None
self.left = None
self.right = None
self.feature = None
self.split = None
② 字符串表示方法
返回了节点分割点向量的字符串表示,方便用户查看单个节点的信息。
self.split:包含分割点向量和其对应的标签(索引) ,在叶子节点中,split
包含了数据点本身及其标签
def __str__(self):
return str(self.split[0])
③ 获取当前节点的兄弟节点
如果节点没有父节点(即它本身是根节点),则返回None
。否则,检查当前节点是父节点的左子节点还是右子节点,然后返回另一个子节点作为兄弟节点。
@property:@property
装饰器是实现面向对象编程中属性管理的重要工具,它通过将方法转换为属性访问的形式,既保持了代码的简洁性,又能实现数据校验、动态计算等高级功能
属性式访问:通过将方法伪装成属性,调用时无需添加括号,直接通过对象.属性名
访问
数据校验与控制:通过@属性名.setter
定义设置逻辑,拦截非法赋值操作
只读属性与删除控制:若未定义setter
方法,则属性为只读;通过@属性名.deleter
可自定义删除行为
@property
def brother(self):
"""Find the node's brother.
Returns:
node -- Brother node.
"""
if not self.father:
ret = None
else:
if self.father.left is self:
ret = self.father.right
else:
ret = self.father.left
return ret
Ⅱ、定义KD树类
① 初始化
初始化KDTree类,创建一个根节点。根节点是KD树的起始点,用于后续的搜索和插入操作。
class KDTree(object):
def __init__(self):
"""KD Tree class to improve search efficiency in KNN.
Attributes:
root: the root node of KDTree.
"""
self.root = Node()
② 打印树节点信息
使用广度优先搜索遍历树节点,将节点信息格式化为字符串并返回。
list.pop():移除列表中指定索引位置的元素并返回该元素。若未指定索引,默认移除最后一个元素
参数 | 类型 | 是否可选 | 描述 |
---|---|---|---|
index | int | 是 | 要移除元素的索引(默认-1 ,即最后一个元素)。若索引越界会引发IndexError |
list.append():在列表末尾添加单个元素,直接修改原列表且无返回值
参数 | 类型 | 是否可选 | 描述 |
---|---|---|---|
element | 任意类型 | 否 | 要添加的元素(可以是数字、字符串、列表等复合类型) |
str.join():用指定字符串连接可迭代对象中的元素,生成新字符串。要求可迭代对象内的元素均为字符串类型
参数 | 类型 | 是否可选 | 描述 |
---|---|---|---|
iterable | 可迭代对象 | 否 | 需连接的序列(如列表、元组),非字符串元素会引发TypeError |
def __str__(self):
"""Show the relationship of each node in the KD Tree.
Returns:
str -- KDTree Nodes information.
"""
ret = []
i = 0
que = [(self.root, -1)]
while que:
nd, idx_father = que.pop(0)
ret.append("%d -> %d: %s" % (idx_father, i, str(nd)))
if nd.left:
que.append((nd.left, i))
if nd.right:
que.append((nd.right, i))
i += 1
return "\n".join(ret)
③ 计算中位数索引
计算给定二维数组中指定特征列的中位数所对应的行索引。通过从指定的列收集数据、排序并提取中位数的索引,该方法能有效地用于构建 KD 树等数据结构中,以此提高相似度搜索的效率。
map():将指定函数依次作用于可迭代对象的每个元素,返回一个包含结果的迭代器(需转换为列表或元组等容器类型)
参数 | 类型 | 是否可选 | 描述 |
---|---|---|---|
function | 可调用对象 | 否 | 处理元素的函数(如内置函数、lambda 或自定义函数) |
iterable | 可迭代对象 | 否 | 待处理的数据序列(如列表、元组),可传入多个(函数需匹配参数数量) |
lambda:创建匿名函数,简化一次性使用的简单逻辑,常用于配合 map()
、filter()
等高阶函数
参数 | 类型 | 是否可选 | 描述 |
---|---|---|---|
arguments | 参数列表 | 否 | 函数参数(如 x, y ) |
expression | 表达式 | 否 | 单行表达式,计算结果作为返回值(如 x + y 或 x**2 ) |
sorted():对可迭代对象进行排序,返回新列表(原数据不变)
参数 | 类型 | 是否可选 | 描述 |
---|---|---|---|
iterable | 可迭代对象 | 否 | 待排序的数据(如列表、元组) |
key | 函数 | 是 | 排序依据的函数(常用 lambda ,如 key=lambda x: x["age"] ) 8 |
reverse | 布尔值 | 是 | 是否降序排列(默认 False 升序) |
list(): 将可迭代对象(如字符串、元组、集合)转换为列表
参数 | 类型 | 是否可选 | 描述 |
---|---|---|---|
iterable | 可迭代对象 | 是 | 若省略则创建空列表;若传入则转换元素(如 list("abc") →["a","b","c"] ) |
def _get_median_idx(self, X, idxs, feature):
"""Calculate the median of a column of data.
Arguments:
X {list} -- 2d list object with int or float.
idxs {list} -- 1D list with int.
feature {int} -- Feature number.
sorted_idxs_2d {list} -- 2D list with int.
Returns:
list -- The row index corresponding to the median of this column.
"""
n = len(idxs)
# Ignoring the number of column elements is odd and even.
k = n // 2
# Get all the indexes and elements of column j as tuples.
col = map(lambda i: (i, X[i][feature]), idxs)
# Sort the tuples by the elements' values
# and get the corresponding indexes.
sorted_idxs = map(lambda x: x[0], sorted(col, key=lambda x: x[1]))
# Search the median value.
median_idx = list(sorted_idxs)[k]
return median_idx
④ 计算给定二维列表在指定特征(维度)上的方差
计算指定特征在给定数据集上的方差,这在构建 KD 树时用于选择最优的分割特征(即具有最大方差的特征),以提高后续最近邻搜索的效率。通过这种方法,可以有效地找到数据集中最具区分度的特征,从而优化 KD 树的构建和查询过程。
def _get_variance(self, X, idxs, feature):
"""Calculate the variance of a column of data.
Arguments:
X {list} -- 2d list object with int or float.
idxs {list} -- 1D list with int.
feature {int} -- Feature number.
Returns:
float -- variance
"""
n = len(idxs)
col_sum = col_sum_sqr = 0
for idx in idxs:
xi = X[idx][feature]
col_sum += xi
col_sum_sqr += xi ** 2
# D(X) = E{[X-E(X)]^2} = E(X^2)-[E(X)]^2
return col_sum_sqr / n - (col_sum / n) ** 2
⑤ 指定特征维度
在KD树构建过程中,选择具有最大方差的特征作为当前节点的分割特征。这样做可以确保树的每个层上的数据分布尽可能均匀,从而提高后续的搜索效率。
map():将指定函数依次作用于可迭代对象的每个元素,返回一个包含结果的迭代器(需转换为列表或元组等容器类型)
参数 | 类型 | 是否可选 | 描述 |
---|---|---|---|
function | 可调用对象 | 否 | 处理元素的函数(如内置函数、lambda 或自定义函数) |
iterable | 可迭代对象 | 否 | 待处理的数据序列(如列表、元组),可传入多个(函数需匹配参数数量) |
lambda:创建匿名函数,简化一次性使用的简单逻辑,常用于配合 map()
、filter()
等高阶函数
参数 | 类型 | 是否可选 | 描述 |
---|---|---|---|
arguments | 参数列表 | 否 | 函数参数(如 x, y ) |
expression | 表达式 | 否 | 单行表达式,计算结果作为返回值(如 x + y 或 x**2 ) |
max(): Python 内置的高效函数,用于获取可迭代对象或多个参数中的最大值。其功能灵活,支持多种数据类型和自定义比较逻辑。
参数 | 类型 | 是否可选 | 描述 |
---|---|---|---|
iterable | 可迭代对象 | 是 | 如列表、元组、字符串等(需元素可比较) |
args | 多个参数 | 是 | 直接传入多个参数进行比较(如 max(3,5,7) ) |
key | 函数 | 是 | 自定义比较规则(如 key=lambda x: x["age"] ) |
default | 任意类型 | 是 | 当可迭代对象为空时返回此默认值,否则引发 ValueError |
def _choose_feature(self, X, idxs):
"""Choose the feature which has maximum variance.
Arguments:
X {list} -- 2d list object with int or float.
idxs {list} -- 1D list with int.
Returns:
feature number {int}
"""
m = len(X[0])
variances = map(lambda j: (
j, self._get_variance(X, idxs, j)), range(m))
return max(variances, key=lambda x: x[1])[0]
⑥ 根据给定特征分割数据集
根据特征的中位数将给定的索引列表分割成两个子集。这在构建k-d树(k维空间树)时非常重要,有助于在构造树的过程中有效地将数据划分为更小的、更可管理的部分,从而在后续的查找中提高效率。
通过在指定特征上比较中位数,函数将数据分为小于和大于(或等于)中位数的两部分,这种分割方式是 k-d树构建的关键步骤之一,旨在平衡树的高度,优化查找性能。
list.append(): Python 列表的内置方法,用于在列表末尾添加单个元素。它会直接修改原列表,不返回新列表(返回 None
)
参数 | 类型 | 是否可选 | 说明 |
---|---|---|---|
object | 任意类型 | 否 | 要添加的元素(支持数字、字符串、列表、元组、字典、None 等所有类型) |
def _split_feature(self, X, idxs, feature, median_idx):
"""Split indexes into two arrays according to split point.
Arguments:
X {list} -- 2d list object with int or float.
idx {list} -- Indexes, 1d list object with int.
feature {int} -- Feature number.
median_idx {float} -- Median index of the feature.
Returns:
list -- [left idx, right idx]
"""
idxs_split = [[], []]
split_val = X[median_idx][feature]
for idx in idxs:
# Keep the split point in current node.
if idx == median_idx:
continue
# Split
xi = X[idx][feature]
if xi < split_val: # 根据当前索引 idx 对应的特征值 xi 来判断其相对于 split_val 的位置:
idxs_split[0].append(idx)
else:
idxs_split[1].append(idx)
return idxs_split
Ⅲ、构建KD树
根据给定的数据集和标签构建一棵Kd树。通过选择具有最大方差的特征来进行数据点的分割,并将分割点存储在节点中。构建的过程采用广度优先搜索的方式,逐层构建树的结构。这样构建的Kd树可以有效地用于在多维空间中快速查找最近邻点
列表,pop():移除列表中指定索引位置的元素并返回该元素。若未指定索引,默认移除最后一个元素
参数 | 类型 | 是否可选 | 描述 |
---|---|---|---|
index | int | 是 | 要移除元素的索引(默认 -1 ,即最后一个元素)。若索引越界会引发 IndexError |
str.split():将字符串按指定分隔符拆分为列表,默认按空格分割
参数 | 类型 | 是否可选 | 描述 |
---|---|---|---|
sep | str | 是 | 分隔符(默认空格)。 |
maxsplit | int | 是 | 最大分割次数(默认全部拆分) |
列表.append():在列表末尾添加单个元素,直接修改原列表且无返回值
参数 | 类型 | 是否可选 | 描述 |
---|---|---|---|
element | 任意类型 | 否 | 要添加的元素(支持数字、字符串、列表等所有类型) |
def build_tree(self, X, y):
"""Build a KD Tree. The data should be scaled so as to calculate variances.
Arguments:
X {list} -- 2d list object with int or float.
y {list} -- 1d list object with int or float.
"""
# Initialize with node, indexes
nd = self.root
idxs = range(len(X))
que = [(nd, idxs)]
while que:
nd, idxs = que.pop(0)
n = len(idxs)
# Stop split if there is only one element in this node
if n == 1:
nd.split = (X[idxs[0]], y[idxs[0]])
continue
# Split
feature = self._choose_feature(X, idxs)
median_idx = self._get_median_idx(X, idxs, feature)
idxs_left, idxs_right = self._split_feature(
X, idxs, feature, median_idx)
# Update properties of current node
nd.feature = feature
nd.split = (X[median_idx], y[median_idx])
# Put children of current node in que
if idxs_left != []:
nd.left = Node()
nd.left.father = nd
que.append((nd.left, idxs_left))
if idxs_right != []:
nd.right = Node()
nd.right.father = nd
que.append((nd.right, idxs_right))
Ⅳ、最近邻搜索
① 查找叶子节点
从KD树的根节点开始,沿着树的路径向下搜索,直到找到包含搜索样本Xi
的叶节点,并返回该叶节点。
str.split():将字符串按指定分隔符拆分为列表,默认按空格分割
参数 | 类型 | 是否可选 | 描述 |
---|---|---|---|
sep | str | 是 | 分隔符(默认空格)。 |
maxsplit | int | 是 | 最大分割次数(默认全部拆分) |
def _search(self, Xi, nd):
"""Search Xi from the KDTree until Xi is at an leafnode.
Arguments:
Xi {list} -- 1d list with int or float.
Returns:
node -- Leafnode.
"""
while nd.left or nd.right:
if not nd.left:
nd = nd.right
elif not nd.right:
nd = nd.left
else:
if Xi[nd.feature] < nd.split[0][nd.feature]:
nd = nd.left
else:
nd = nd.right
return nd
② 计算两向量欧几里得距离
从给定的KD树节点中获取与其分割点向量的欧几里得距离。具体来说,它首先从节点对象中提取出分割点向量X0
,然后调用类中的另一个方法get_eu_dist
来计算输入向量Xi
与分割点向量X0
之间的欧几里得距离。
get_eu_dist:
计算两个一维向量之间的欧几里得距离。它通过将两个向量对应位置的元素相减,求平方和,再取平方根来实现。
str.split():将字符串按指定分隔符拆分为列表,默认按空格分割
参数 | 类型 | 是否可选 | 描述 |
---|---|---|---|
sep | str | 是 | 分隔符(默认空格)。 |
maxsplit | int | 是 | 最大分割次数(默认全部拆分) |
sum():对可迭代对象中的数值元素求和,支持自定义起始值
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
iterable | 可迭代对象 | 否 | 无 | 必须为数值类型(整数、浮点数等)的可迭代对象(如列表、元组、集合)。 |
start | 数值 | 是 | 0 | 求和的初始值,会加到最终结果中。 |
zip():将多个可迭代对象的对应元素打包为元组,返回一个迭代器
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
*iterables | 多个可迭代对象 | 否 | 无 | 支持列表、元组、字符串等可迭代对象。 |
strict | 布尔值 | 是 | False | 若设为 True ,强制要求所有可迭代对象长度一致(Python 3.10+)。 |
列表推导式: 通过简洁语法快速生成列表,支持条件筛选和嵌套循环
语法部分 | 类型 | 是否可选 | 描述 |
---|---|---|---|
expression | 表达式 | 否 | 对当前元素的操作(如 x**2 )。 |
item | 变量名 | 否 | 从可迭代对象中逐个提取元素。 |
iterable | 可迭代对象 | 否 | 数据来源(如 range(5) )。 |
if condition | 条件表达式 | 是 | 筛选元素的条件(如 x % 2 == 0 )。 |
def _get_eu_dist(self, Xi, nd):
"""Calculate euclidean distance between Xi and node.
Arguments:
Xi {list} -- 1d list with int or float.
nd {node}
Returns:
float -- Euclidean distance.
"""
X0 = nd.split[0]
return self.get_eu_dist(Xi, X0)
def get_eu_dist(self, arr1, arr2):
"""Calculate the Euclidean distance of two vectors.
Arguments:
arr1 {list} -- 1d list object with int or float
arr2 {list} -- 1d list object with int or float
Returns:
float -- Euclidean distance
"""
return sum((x1 - x2) ** 2 for x1, x2 in zip(arr1, arr2)) ** 0.5
③ 计算点到分割超平面的距离
计算给定点 Xi
到 KD 树节点 nd
所表示的超平面的欧几里得距离。具体来说,它通过比较点 Xi
在某个特征上的值与节点 nd
的分割点在该特征上的值,来得到点 Xi
到超平面的距离。这个距离用于判断在最近邻搜索过程中是否需要访问兄弟节点。
nd.feature:当前结点用于分割数据的特征(维度)的索引
str.split():将字符串按指定分隔符分割为列表,支持限制分割次数
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
sep | 字符串 | 是 | 所有空字符 | 分隔符(如 "," 、"*" ),若未指定则按空格、换行符等分割 |
maxsplit | 整数 | 是 | 不限分割次数 | 指定最大分割次数,剩余未分割部分作为列表最后一个元素 |
abs():返回数值的绝对值,支持整数、浮点数和复数
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
number | 数值(含复数) | 否 | 无 | 接受整数、浮点数或复数(返回复数的模) |
def _get_hyper_plane_dist(self, Xi, nd):
"""Calculate euclidean distance between Xi and hyper plane.
Arguments:
Xi {list} -- 1d list with int or float.
nd {node}
Returns:
float -- Euclidean distance.
"""
j = nd.feature
X0 = nd.split[0]
return abs(Xi[j] - X0[j])
④ 执行最近邻搜索
基于KD树执行最近邻搜索。首先,通过递归的方式找到待查找向量Xi
所属的叶子节点nd_best
,然后在此基础上进行回溯,检查是否在兄弟子树中存在距离更近的节点,最终返回距离Xi
最近的节点。
float(): 将字符串或数字转换为浮点数。如果未提供参数,则返回 0.0
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
x | 字符串或数字 | 是 | 无 | 要转换为浮点数的字符串或数字。若未提供参数,则返回 0.0 。 |
列表.pop():移除列表中指定索引处的元素并返回该元素。若未指定索引,则移除并返回最后一个元素。
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
index | 整数 | 是 | -1 | 要移除元素的索引。若未提供参数,则移除并返回最后一个元素。 |
列表.append():在列表末尾添加一个元素。
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
obj | 任意类型 | 否 | 无 | 要添加到列表末尾的元素。 |
def nearest_neighbour_search(self, Xi):
"""Nearest neighbour search and backtracking.
Arguments:
Xi {list} -- 1d list with int or float.
Returns:
node -- The nearest node to Xi.
"""
# The leaf node after searching Xi.
dist_best = float("inf")
nd_best = self._search(Xi, self.root)
que = [(self.root, nd_best)]
while que:
nd_root, nd_cur = que.pop(0)
# Calculate distance between Xi and root node
dist = self._get_eu_dist(Xi, nd_root)
# Update best node and distance.
if dist < dist_best:
dist_best, nd_best = dist, nd_root
while nd_cur is not nd_root:
# Calculate distance between Xi and current node
dist = self._get_eu_dist(Xi, nd_cur)
# Update best node, distance and visit flag.
if dist < dist_best:
dist_best, nd_best = dist, nd_cur
# If it's necessary to visit brother node.
if nd_cur.brother and dist_best > \
self._get_hyper_plane_dist(Xi, nd_cur.father):
_nd_best = self._search(Xi, nd_cur.brother)
que.append((nd_cur.brother, _nd_best))
# Back track.
nd_cur = nd_cur.father
return nd_best
Ⅴ、传统搜索方法
在一个给定的数据集matrix
中找到与目标向量arr1
最近的邻居。通过逐个计算目标向量与数据集中每个向量之间的欧几里得距离,并将这些距离与其对应的索引存储在一个列表中,最后对这个列表进行排序以找到距离最近的向量。
enumerate():将可迭代对象组合为一个索引序列,返回枚举对象(包含索引和值)。
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
iterable | 可迭代对象 | 否 | 无 | 需要枚举的可迭代对象(如列表、字符串等)。 |
start | 整数 | 是 | 0 | 索引的起始值。 |
sum():对可迭代对象中的数值元素求和,支持自定义起始值。
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
iterable | 可迭代对象 | 否 | 无 | 包含数值元素的可迭代对象(如列表、元组等)。 |
start | 数值 | 是 | 0 | 求和的初始值,会加到最终结果中。 |
zip():将多个可迭代对象的对应元素打包为元组,返回一个迭代器。
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
*iterables | 多个可迭代对象 | 否 | 无 | 支持列表、元组、字符串等可迭代对象。 |
strict | 布尔值 | 是 | False | 若设为 True ,强制要求所有可迭代对象长度一致(Python 3.10+)。 |
列表.append():在列表末尾添加一个元素。
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
obj | 任意类型 | 否 | 无 | 要添加到列表末尾的元素。 |
sorted():对所有可迭代对象进行排序,返回一个新的列表。
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
iterable | 可迭代对象 | 否 | 无 | 需要排序的可迭代对象(如列表、元组等)。 |
key | 函数 | 是 | 无 | 用于排序的函数(如 lambda x: x[1] )。 |
reverse | 布尔值 | 是 | False | 若为 True ,则降序排序;若为 False ,则升序排序。 |
#传统方式,逐个计算并排序
def traditional_search(arr1, matrix):
res = []
for index, arr2 in enumerate(matrix):
score = sum((x1 - x2) ** 2 for x1, x2 in zip(arr1, arr2)) ** 0.5
res.append([score, index])
res = sorted(res, key=lambda x:x[0])
return matrix[res[0][1]]
Ⅵ、性能对比
构建KD树并使用KD树搜索方法与传统的逐个计算并排序的方法来查找最近邻向量,并比较这两种方法在相同数据集上搜索100次的耗时。
vec_dim:定义向量的维度为 8
matrix:创建了一个1000行、8列的随机数矩阵matrix
np.random.random():生成一个或多个介于 0 和 1 之间的随机浮点数,返回一个 NumPy 数组或单个浮点数。
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
size | 整数或元组 | 是 | None | 输出的形状。若为 None ,则返回单个浮点数;否则返回指定形状的数组。 |
list():将可迭代对象(如字符串、元组、集合等)转换为列表。
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
iterable | 可迭代对象 | 是 | 无 | 要转换为列表的可迭代对象。若未提供参数,则返回空列表。 |
range():生成一个不可变的整数序列,通常用于循环控制或生成数字序列。
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
start | 整数 | 是 | 0 | 序列的起始值(包含)。 |
stop | 整数 | 否 | 无 | 序列的结束值(不包含)。 |
step | 整数 | 是 | 1 | 步长,表示每次递增或递减的值。 |
len():返回对象(如字符串、列表、字典等)的长度或元素个数。
参数 | 类型 | 是否可选 | 默认值 | 描述 |
---|---|---|---|---|
obj | 可迭代对象 | 否 | 无 | 要计算长度的对象(如字符串、列表、字典等)。 |
time.time():返回当前时间的时间戳(从 1970 年 1 月 1 日 00:00:00 UTC 开始到现在的秒数)。
import time
import numpy as np
'''
基于kd树的向量快速查找
'''
class Node(object):
def __init__(self):
"""Node class to build tree leaves.
"""
self.father = None
self.left = None
self.right = None
self.feature = None
self.split = None
def __str__(self):
return str(self.split[0])
@property
def brother(self):
"""Find the node's brother.
Returns:
node -- Brother node.
"""
if not self.father:
ret = None
else:
if self.father.left is self:
ret = self.father.right
else:
ret = self.father.left
return ret
class KDTree(object):
def __init__(self):
"""KD Tree class to improve search efficiency in KNN.
Attributes:
root: the root node of KDTree.
"""
self.root = Node()
def __str__(self):
"""Show the relationship of each node in the KD Tree.
Returns:
str -- KDTree Nodes information.
"""
ret = []
i = 0
que = [(self.root, -1)]
while que:
nd, idx_father = que.pop(0)
ret.append("%d -> %d: %s" % (idx_father, i, str(nd)))
if nd.left:
que.append((nd.left, i))
if nd.right:
que.append((nd.right, i))
i += 1
return "\n".join(ret)
def _get_median_idx(self, X, idxs, feature):
"""Calculate the median of a column of data.
Arguments:
X {list} -- 2d list object with int or float.
idxs {list} -- 1D list with int.
feature {int} -- Feature number.
sorted_idxs_2d {list} -- 2D list with int.
Returns:
list -- The row index corresponding to the median of this column.
"""
n = len(idxs)
# Ignoring the number of column elements is odd and even.
k = n // 2
# Get all the indexes and elements of column j as tuples.
col = map(lambda i: (i, X[i][feature]), idxs)
# Sort the tuples by the elements' values
# and get the corresponding indexes.
sorted_idxs = map(lambda x: x[0], sorted(col, key=lambda x: x[1]))
# Search the median value.
median_idx = list(sorted_idxs)[k]
return median_idx
def _get_variance(self, X, idxs, feature):
"""Calculate the variance of a column of data.
Arguments:
X {list} -- 2d list object with int or float.
idxs {list} -- 1D list with int.
feature {int} -- Feature number.
Returns:
float -- variance
"""
n = len(idxs)
col_sum = col_sum_sqr = 0
for idx in idxs:
xi = X[idx][feature]
col_sum += xi
col_sum_sqr += xi ** 2
# D(X) = E{[X-E(X)]^2} = E(X^2)-[E(X)]^2
return col_sum_sqr / n - (col_sum / n) ** 2
def _choose_feature(self, X, idxs):
"""Choose the feature which has maximum variance.
Arguments:
X {list} -- 2d list object with int or float.
idxs {list} -- 1D list with int.
Returns:
feature number {int}
"""
m = len(X[0])
variances = map(lambda j: (
j, self._get_variance(X, idxs, j)), range(m))
return max(variances, key=lambda x: x[1])[0]
def _split_feature(self, X, idxs, feature, median_idx):
"""Split indexes into two arrays according to split point.
Arguments:
X {list} -- 2d list object with int or float.
idx {list} -- Indexes, 1d list object with int.
feature {int} -- Feature number.
median_idx {float} -- Median index of the feature.
Returns:
list -- [left idx, right idx]
"""
idxs_split = [[], []]
split_val = X[median_idx][feature]
for idx in idxs:
# Keep the split point in current node.
if idx == median_idx:
continue
# Split
xi = X[idx][feature]
if xi < split_val:
idxs_split[0].append(idx)
else:
idxs_split[1].append(idx)
return idxs_split
def build_tree(self, X, y):
"""Build a KD Tree. The data should be scaled so as to calculate variances.
Arguments:
X {list} -- 2d list object with int or float.
y {list} -- 1d list object with int or float.
"""
# Initialize with node, indexes
nd = self.root
idxs = range(len(X))
que = [(nd, idxs)]
while que:
nd, idxs = que.pop(0)
n = len(idxs)
# Stop split if there is only one element in this node
if n == 1:
nd.split = (X[idxs[0]], y[idxs[0]])
continue
# Split
feature = self._choose_feature(X, idxs)
median_idx = self._get_median_idx(X, idxs, feature)
idxs_left, idxs_right = self._split_feature(
X, idxs, feature, median_idx)
# Update properties of current node
nd.feature = feature
nd.split = (X[median_idx], y[median_idx])
# Put children of current node in que
if idxs_left != []:
nd.left = Node()
nd.left.father = nd
que.append((nd.left, idxs_left))
if idxs_right != []:
nd.right = Node()
nd.right.father = nd
que.append((nd.right, idxs_right))
def _search(self, Xi, nd):
"""Search Xi from the KDTree until Xi is at an leafnode.
Arguments:
Xi {list} -- 1d list with int or float.
Returns:
node -- Leafnode.
"""
while nd.left or nd.right:
if not nd.left:
nd = nd.right
elif not nd.right:
nd = nd.left
else:
if Xi[nd.feature] < nd.split[0][nd.feature]:
nd = nd.left
else:
nd = nd.right
return nd
def _get_eu_dist(self, Xi, nd):
"""Calculate euclidean distance between Xi and node.
Arguments:
Xi {list} -- 1d list with int or float.
nd {node}
Returns:
float -- Euclidean distance.
"""
X0 = nd.split[0]
return self.get_eu_dist(Xi, X0)
def get_eu_dist(self, arr1, arr2):
"""Calculate the Euclidean distance of two vectors.
Arguments:
arr1 {list} -- 1d list object with int or float
arr2 {list} -- 1d list object with int or float
Returns:
float -- Euclidean distance
"""
return sum((x1 - x2) ** 2 for x1, x2 in zip(arr1, arr2)) ** 0.5
def _get_hyper_plane_dist(self, Xi, nd):
"""Calculate euclidean distance between Xi and hyper plane.
Arguments:
Xi {list} -- 1d list with int or float.
nd {node}
Returns:
float -- Euclidean distance.
"""
j = nd.feature
X0 = nd.split[0]
return abs(Xi[j] - X0[j])
def nearest_neighbour_search(self, Xi):
"""Nearest neighbour search and backtracking.
Arguments:
Xi {list} -- 1d list with int or float.
Returns:
node -- The nearest node to Xi.
"""
# The leaf node after searching Xi.
dist_best = float("inf")
nd_best = self._search(Xi, self.root)
que = [(self.root, nd_best)]
while que:
nd_root, nd_cur = que.pop(0)
# Calculate distance between Xi and root node
dist = self._get_eu_dist(Xi, nd_root)
# Update best node and distance.
if dist < dist_best:
dist_best, nd_best = dist, nd_root
while nd_cur is not nd_root:
# Calculate distance between Xi and current node
dist = self._get_eu_dist(Xi, nd_cur)
# Update best node, distance and visit flag.
if dist < dist_best:
dist_best, nd_best = dist, nd_cur
# If it's necessary to visit brother node.
if nd_cur.brother and dist_best > \
self._get_hyper_plane_dist(Xi, nd_cur.father):
_nd_best = self._search(Xi, nd_cur.brother)
que.append((nd_cur.brother, _nd_best))
# Back track.
nd_cur = nd_cur.father
return nd_best
#传统方式,逐个计算并排序
def traditional_search(arr1, matrix):
res = []
for index, arr2 in enumerate(matrix):
score = sum((x1 - x2) ** 2 for x1, x2 in zip(arr1, arr2)) ** 0.5
res.append([score, index])
res = sorted(res, key=lambda x:x[0])
return matrix[res[0][1]]
vec_dim = 8
matrix = np.random.random((1000, vec_dim))
kd_tree = KDTree()
kd_tree.build_tree(matrix, list(range(len(matrix))))
# x = np.random.random((vec_dim))
# print(kd_tree.nearest_neighbour_search(x))
# print(traditional_search(x, matrix))
start_time = time.time()
for i in range(100):
x = np.random.random((vec_dim))
best = kd_tree.nearest_neighbour_search(x)
print("kd树搜索耗时:%s"%(time.time() - start_time))
start_time = time.time()
for i in range(100):
x = np.random.random((vec_dim))
best = traditional_search(x, matrix)
print("穷举搜索耗时:%s"%(time.time() - start_time))
2.Annoy
也是依据空间分割的原理来做,空间分割的过程相当于Kmeans聚类
重复分割过程,直到每个空间内的点个数小于设定值
可以同时在多个接近的分支上查找 或 通过不同初始划分,生成多个树