本文目录
- 前述
- 一、环境依赖
- 二、数据准备
- 1. 数据加载
- 程序解析
- word_tokenize()将字符串分割为一个个的单词,并由列表保存。
- 2. 构建单词表
- 程序解析
- (1)将列表里每个子列表的所有单词合并到一个新列表(没有子列表)中。
- (2)Counter()-- 统计列表(迭代对象)各元素出现次数,并按次数从多到少排序。
- (3)获取出现频率最高的前 50000 个元素及其个数,返回一个含有多个元组的列表。
- (4) 将含有多个元组的列表,转为字典。
- (5) 将字典的元素和索引互换位置。
- 3. 将英文、中文单词列表转为单词索引列表
- 程序解析
- (1)将en句子中的各个单词在字典里的索引,组成一个索引列表。
- (2)将句子的索引列表按照句子长度进行排序--从短到长。
- 4. 获取所有语句中的最大长度,如果语句小于该长度则填充。
- 5. 划分batch
- 三、模型搭建
前述
基础请查看:Transformer基础查看地址!
一、环境依赖
nltk==3.5
numpy==1.18.5
seaborn==0.11.1
matplotlib==3.3.2
psyco==1.6
zhtools==0.0.5
#torch==1.12.1 安装torch时使用下面的命令
pip install torch==1.12.1+cu113 torchvision==0.13.1+cu113 torchaudio==0.12.1 --extra-index-url https://download.pytorch.org/whl/cu113 -i https://pypi.tuna.tsinghua.edu.cn/simple
代码导入包 :
import copy
import math
import matplotlib.pyplot as plt
import numpy as np
import os
import seaborn as sns
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import Counter
from langconv import Converter
from nltk import word_tokenize
from torch.autograd import Variable
二、数据准备
数据集可以去网络上下载,下面的是train.txt文件部分内容,前面为英文,后面为繁体中文,中间以'\t'
隔开。其他数据文件也相同。
这里数据集是英文和繁体中文,所以第一步我们需要将繁体中文变为简体中文。
转换代码如下:
def cht_to_chs(sent):
"""" zh-hans" 是一个语言代码,用于指代中文(汉语)的简体字形式。在国际化和本地化领域,语言代码用于标识特定语言或语言变体。
在这里,"zh" 表示汉语(中文),"hans" 表示简体字形式。因此,"zh-hans" 表示简体中文。"""
sent = Converter("zh-hans").convert(sent)
sent = sent.encode("utf-8")
return sent
1. 数据加载
作用:读取数据路径下的完整句子,将每个句子分割为一个一个的单词,并存到子列表中。返回含有子列表的列表,
"""参数参数path 为数据的路径,如下
train_file= 'nmt/en-cn/train.txt' # 训练集
dev_file= "nmt/en-cn/dev.txt" # 验证集
load_data(train_file)
"""
def load_data(path):
"""
读取英文、中文数据
对每条样本分词并构建包含起始符和终止符的单词列表
"""
en = [] #定义英文列表
cn = [] #定义中文列表
with open(path, mode="r", encoding="utf-8") as f: #只读的形式打开文件路径,文件描述符为f。
for line in f.readlines(): #按行读取
sent_en, sent_cn = line.strip().split("\t") #以‘\t’进行分割,前面的赋给sent_en,后面的赋给sent_cn 。
sent_en = sent_en.lower() #将英文转换为小写。
sent_cn = cht_to_chs(sent_cn) #将繁体中文转为简体中文。
sent_en = ["BOS"] + word_tokenize(sent_en) + ["EOS"]
# 中文按字符切分
sent_cn = ["BOS"] + [char for char in sent_cn] + ["EOS"]
en.append(sent_en) #将切割好的英文 存入英文列表。包含['BOS', 'i', 'love', 'you', 'EOS']
cn.append(sent_cn) #将切割好的中文 存入中文列表。
return en, cn #返回两个单词列表
"""
输出列表格式如下:
en = [
['BOS','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,
['BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']
]
"""
程序解析
word_tokenize()将字符串分割为一个个的单词,并由列表保存。
from nltk import word_tokenize
sent_en="He knows better than to marry her."
print(word_tokenize(sent_en))
# 输出为:['He', 'knows', 'better', 'than', 'to', 'marry', 'her', '.']
2. 构建单词表
"""输入
sentences= [
['BOS','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,
['BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']
]
"""
PAD = 0 # padding占位符的索引
UNK = 1 # 未登录词标识符的索引
def build_dict(sentences, max_words=5e4):
"""
构造分词后的列表数据
构建单词-索引映射(key为单词,value为id值)
"""
# 统计数据集中单词词频
word_count = Counter([word for sent in sentences for word in sent])
# 按词频保留前max_words个单词构建词典
# 添加UNK和PAD两个单词
ls = word_count.most_common(int(max_words))
total_words = len(ls) + 2
word_dict = {word [0]: index + 2 for index, word in enumerate(ls)}
word_dict['UNK'] = UNK
word_dict['PAD'] = PAD
# 构建id2word映射
index_dict = {v: k for k, v in word_dict.items()}
return word_dict, total_words, index_dict
"""
输出:
word_dict ={'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'Natural': 10, 'is': 11, 'fascinating': 12, 'UNK': 1, 'PAD': 0}
index_dict= {2: 'BOS', 3: 'language', 4: 'processing', 5: '.', 6: 'EOS', 7: 'I', 8: 'love', 9: 'natural', 10: 'Natural', 11: 'is', 12: 'fascinating', 1: 'UNK', 0: 'PAD'}
"""
程序解析
(1)将列表里每个子列表的所有单词合并到一个新列表(没有子列表)中。
将sentences里面每句话的每个单词组合形成一个新的列表。
sentences = [
['BOS','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,
['BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']
]
word_list = [word for sent in sentences for word in sent]
"""
另一种写法:
word_list = []
for sent in sentences:
for word in sent:
word_list.append(word)
"""
print(word_list )
"""
输出: ['BOS', 'I', 'love', 'natural', 'language', 'processing', '.', 'EOS', 'BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']
"""
(2)Counter()-- 统计列表(迭代对象)各元素出现次数,并按次数从多到少排序。
from collections import Counter
#Python 中的一个内置数据结构
# 定义一个列表
word_list = ['BOS', 'I', 'love', 'natural', 'language', 'processing', '.', 'EOS', 'BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']
# 使用 Counter 统计列表中各元素的出现次数
word_count = Counter(word_list)
print(word_count )
"""
输出: Counter({'BOS': 2, 'language': 2, 'processing': 2, '.': 2, 'EOS': 2, 'I': 1, 'love': 1, 'natural': 1, 'Natural': 1, 'is': 1, 'fascinating': 1})
"""
(3)获取出现频率最高的前 50000 个元素及其个数,返回一个含有多个元组的列表。
如: [ (元素1,频次),(元素2,频次),… ]
from collections import Counter
word_count = Counter({'BOS': 2, 'language': 2, 'processing': 2, '.': 2, 'EOS': 2, 'I': 1, 'love': 1, 'natural': 1, 'Natural': 1, 'is': 1, 'fascinating': 1})
ls = word_count.most_common(int(5e4))#返回列表中频率最高的元素和它们的计数,按照计数从高到低排序。频率最高的前 50000 个元素。
print(ls)
"""
输出:
[('BOS', 2), ('language', 2), ('processing', 2), ('.', 2), ('EOS', 2), ('I', 1), ('love', 1), ('natural', 1), ('Natural', 1), ('is', 1), ('fascinating', 1)]
"""
(4) 将含有多个元组的列表,转为字典。
如:word_dict ={元素1:索引,元素2,索引…}
enumerate(可迭代元素),返回的第一个值为索引,第二个值为元素。
ls = [('BOS', 2), ('language', 2), ('processing', 2), ('.', 2), ('EOS', 2), ('I', 1), ('love', 1), ('natural', 1), ('Natural', 1), ('is', 1), ('fascinating', 1)]
word_dict = {word [0]: index + 2 for index, word in enumerate(ls)}
# word是('BOS',2), word[0]='BOS' , word[1]=2
#index 是该元组的索引,也就是 0
"""另一种写法:
word_dict = {}
for index, word in enumerate(ls):
word_dict[ word[0] ] = index + 2
print(word_dict)
"""
print(word_dict) #存放元素及其索引号
"""
输出: {'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'Natural': 10, 'is': 11, 'fascinating': 12}
"""
word_dict['UNK'] = 1
word_dict['PAD'] = 0
print(word_dict)
"""
输出:{'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'Natural': 10, 'is': 11, 'fascinating': 12, 'UNK': 1, 'PAD': 0}
"""
(5) 将字典的元素和索引互换位置。
word_dict= {'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'Natural': 10, 'is': 11, 'fascinating': 12, 'UNK': 1, 'PAD': 0}
index_dict = {v: k for k, v in word_dict.items()}
print(index_dict)
"""
输出:{2: 'BOS', 3: 'language', 4: 'processing', 5: '.', 6: 'EOS', 7: 'I', 8: 'love', 9: 'natural', 10: 'Natural', 11: 'is', 12: 'fascinating', 1: 'UNK', 0: 'PAD'}
"""
3. 将英文、中文单词列表转为单词索引列表
"""
输入:
en = [
['BOS','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,
['BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']
]
cn= 同上格式相同
en_dict= {'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'Natural': 10, 'is': 11, 'fascinating': 12, 'UNK': 1, 'PAD': 0 }
cn_dict= 同上格式相同
"""
def word2id(en, cn, en_dict, cn_dict, sort=True):
"""
将英文、中文单词列表转为单词索引列表
`sort=True`表示以英文语句长度排序,以便按批次填充时,同批次语句填充尽量少
"""
length = len(en) #计算长度
# 单词映射为索引
out_en_ids = [[en_dict.get(word, UNK) for word in sent] for sent in en]
out_cn_ids = [[cn_dict.get(word, UNK) for word in sent] for sent in cn]
# 按照语句长度排序
def len_argsort(seq):
"""
传入一系列语句数据(分好词的列表形式),
按照语句长度排序后,返回排序后原来各语句在数据中的索引下标
"""
return sorted(range(len(seq)), key=lambda x: len(seq[x]))
# 按相同顺序对中文、英文样本排序
if sort:
# 以英文语句长度排序
sorted_index = len_argsort(out_en_ids)
out_en_ids = [out_en_ids[idx] for idx in sorted_index]
out_cn_ids = [out_cn_ids[idx] for idx in sorted_index]
return out_en_ids, out_cn_ids
"""
输出:
out_en_ids= [
[2, 7, 8, 9, 3, 4, 5, 6],
[2, 10, 3, 4, 11, 12, 5, 6]
]
out_cn_ids= 同上格式相同
"""
程序解析
(1)将en句子中的各个单词在字典里的索引,组成一个索引列表。
from nltk import word_tokenize
PAD = 0 # padding占位符的索引
UNK = 1 # 未登录词标识符的索引
en = [
['BOS','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,
['BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']
]
en_dict= {'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'Natural': 10, 'is': 11, 'fascinating': 12, 'UNK': 1, 'PAD': 0 }
out_en_ids = [[en_dict.get(word, UNK) for word in sent] for sent in en]
"""
另一种写法:
out_en_ids = []
for sent in en:
# 将句子中的每个单词转换为对应的单词ID
# 如果单词不在字典中,则使用UNK表示
word_ids = []
for word in sent:
word_id = en_dict.get(word, UNK)
word_ids.append(word_id)
out_en_ids.append(word_ids)
"""
print(out_en_ids)
"""
输出: [
[2, 7, 8, 9, 3, 4, 5, 6],
[2, 10, 3, 4, 11, 12, 5, 6]
]
2:en第一个句子的第一个单词'BOS'在en_dict字典的索引值为2.
7:en第一个句子的第二个单词'I'在en_dict字典的索引值为7.
8:en第一个句子的第三个单词'love'在en_dict字典的索引值为8.
......
"""
(2)将句子的索引列表按照句子长度进行排序–从短到长。
作用: 以便后续分batch做padding时,同批次各句子需要padding的长度相近减少padding量。
from nltk import word_tokenize
UNK=1
en = [
['BOS', 'I', 'love', 'natural', 'language', 'processing', '.', 'EOS'],
['BOS', 'This', 'is', 'a', 'test', 'sentence', '.', 'EOS','I','love'],
['BOS', 'test', '.', 'EOS']
]
en_dict = {'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'This': 10, 'is': 11, 'a': 12, 'test': 13, 'sentence': 14, 'UNK': 1, 'PAD': 0}
def word2id(en, en_dict, sort=True):
length = len(en) # 计算长度
out_en_ids = [[en_dict.get(word, UNK) for word in sent] for sent in en]
"""
out_en_ids = [
[2, 7, 8, 9, 3, 4, 5, 6],
[2, 10, 3, 4, 11, 12, 5, 6]
]
"""
# 按照语句长度排序
def len_argsort(seq):
return sorted(range(len(seq)), key=lambda x: len(seq[x]))
if sort:
sorted_index = len_argsort(out_en_ids) #获得按句子长度排序后的索引号,如sorted_index =[2, 0, 1]
out_en_ids = [out_en_ids[idx] for idx in sorted_index] #按照排序后的索引列表的索引,将索引列表排序。
return out_en_ids
print(word2id(en,en_dict))
"""
sort=False 索引列表按en句子的顺序排列
out_en_ids= [[2, 7, 8, 9, 3, 4, 5, 6], [2, 10, 11, 12, 13, 14, 5, 6, 7, 8], [2, 13, 5, 6] ]
sort=True 索引列表按en句子的长度排列,从短到长排列
out_en_ids= [[2, 13, 5, 6], [2, 7, 8, 9, 3, 4, 5, 6], [2, 10, 11, 12, 13, 14, 5, 6, 7, 8] ]
"""
4. 获取所有语句中的最大长度,如果语句小于该长度则填充。
PAD=0
def seq_padding(X, padding=PAD):
"""
按批次(batch)对数据填充、长度对齐
"""
# 计算该批次各条样本语句长度
L = [len(x) for x in X]
# 获取该批次样本中语句长度最大值
ML = max(L)
# 遍历该批次样本,如果语句长度小于最大长度,则用padding填充
return np.array([
np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X
])
"""写法二: """
def seq_padding(X, padding=PAD):
"""
按批次(batch)对数据填充、长度对齐
"""
# 计算该批次各条样本语句长度
Length = [len(x) for x in X]
# 获取该批次样本中语句长度最大值
MaxLength = max(Length)
# 遍历该批次样本,如果语句长度小于最大长度,则用padding填充
padded_X = []
for x in X:
if len(x) < MaxLength:
padded_seq = np.concatenate([x, [padding] * (ML - len(x))])
else:
padded_seq = x
padded_X.append(padded_seq)
return np.array(padded_X)
5. 划分batch
"""输入
sentences= [
['BOS','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,
['BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']
]
"""
def split_batch(self, en, cn, batch_size, shuffle=True):
"""
划分批次
`shuffle=True`表示对各批次顺序随机打乱
"""
# 每隔batch_size取一个索引作为后续batch的起始索引
idx_list = np.arange(0, len(en), batch_size)
# 起始索引随机打乱
if shuffle:
np.random.shuffle(idx_list)
# 存放所有批次的语句索引
batch_indexs = []
for idx in idx_list:
"""
形如[array([4, 5, 6, 7]),
array([0, 1, 2, 3]),
array([8, 9, 10, 11]),
...]
"""
# 起始索引最大的批次可能发生越界,要限定其索引
batch_indexs.append(np.arange(idx, min(idx + batch_size, len(en))))
# 构建批次列表
batches = []
for batch_index in batch_indexs:
# 按当前批次的样本索引采样
batch_en = [en[index] for index in batch_index]
batch_cn = [cn[index] for index in batch_index]
# 对当前批次中所有语句填充、对齐长度
# 维度为:batch_size * 当前批次中语句的最大长度
batch_cn = seq_padding(batch_cn)
batch_en = seq_padding(batch_en)
# 将当前批次添加到批次列表
# Batch类用于实现注意力掩码
batches.append(Batch(batch_en, batch_cn))
return batches