安装TorchCRF
!pip install TorchCRF==1.0.6
构建BiLSTM-CRF
# encoding:utf-8
import torch
import torch.nn as nn
from TorchCRF import CRF
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
import numpy as np
import torch
import torch.nn as nn
import torch.optim as op
from torch.utils.data import DataLoader
import torch
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import classification_report
# 命名体识别数据
class NERDataset(Dataset):
def __init__(self, X, Y, *args, **kwargs):
self.data = [{'x': X[i], 'y': Y[i]} for i in range(X.shape[0])]
def __getitem__(self, index):
return self.data[index]
def __len__(self):
return len(self.data)
# LSTM_CRF模型
class NERLSTM_CRF(nn.Module):
def __init__(self, config):
super(NERLSTM_CRF, self).__init__()
self.embedding_dim = config.embedding_dim
self.hidden_dim = config.hidden_dim
self.vocab_size = config.vocab_size
self.num_tags = config.num_tags
self.embeds = nn.Embedding(self.vocab_size, self.embedding_dim)
self.dropout = nn.Dropout(config.dropout)
self.lstm = nn.LSTM(
self.embedding_dim,
self.hidden_dim // 2,
num_layers=1,
bidirectional=True,
batch_first=True, # 该属性设置后,需要特别注意数据的形状
)
self.linear = nn.Linear(self.hidden_dim, self.num_tags)
# CRF 层
self.crf = CRF(self.num_tags)
def forward(self, x, mask):
embeddings = self.embeds(x)
feats, hidden = self.lstm(embeddings)
emissions = self.linear(self.dropout(feats))
outputs = self.crf.viterbi_decode(emissions, mask)
return outputs
def log_likelihood(self, x, labels, mask):
embeddings = self.embeds(x)
feats, hidden = self.lstm(embeddings)
emissions = self.linear(self.dropout(feats))
loss = -self.crf.forward(emissions, labels, mask)
return torch.sum(loss)
# ner chinese
加载数据
tag2id = {"PAD":0, "B-LOC":1,"I-LOC":2, "B-PER":3, "I-PER":4, "B-ORG":5, "I-ORG":6, "O":7}
id2tag = {v:k for k, v in tag2id.items()}
tlp ="/kaggle/input/ner-bilstm-crf/data_txt/{}"
def load_text(name, word2id=None):
lines = []
if not word2id:
word2id, flag = {"pad":0, "unk":1}, True
else:
flag = False
with open(tlp.format(name), encoding="utf-8") as fp:
buff = []
for line in fp:
if not line.strip():
lines.append(buff)
buff = []
else:
buff.append(line.strip())
x_train, y_train, maxlen= [],[], 60
for line in lines:
x, y = [], []
for v in line:
w,t = v.split("\t")
if w not in word2id and flag:
word2id[w]=len(word2id)
x.append(word2id.get(w, 1))
y.append(tag2id[t])
if len(x)<maxlen:
x = x+[0]*(maxlen-len(x))
y = y+[0]*(maxlen-len(y))
x = x[:maxlen]
y = y[:maxlen]
x_train.append(x)
y_train.append(y)
return np.array(x_train), np.array(y_train), word2id
# return np.array(x_train[:10000]), np.array(y_train[:10000]), word2id
def load_data():
x_train, y_train, word2id = load_text("train.txt")
x_train, x_valid, y_train, y_valid = train_test_split(x_train, y_train, test_size=0.1)
print("train:", x_train.shape, y_train.shape)
print("valid:", x_valid.shape, y_valid.shape)
x_test, y_test, _ = load_text("test.txt", word2id)
print("test len:", len(x_test))
print(id2tag)
return word2id, tag2id, x_train, x_test, x_valid, y_train, y_test, y_valid, id2tag
def main():
word = load_data()
print(len(word))
if __name__ == '__main__':
main()
实体类型解码
#from read_file_pkl import load_data
word2id, tag2id, x_train, x_test, x_valid, y_train, y_test, y_valid, id2tag = load_data()
# 用于将实体类别解码,单字组合成单词
def parse_tags(text, path):
tags = [id2tag[idx] for idx in path]
begin = 0
end = 0
res = []
for idx, tag in enumerate(tags):
# 将连续的 同类型 的字连接起来
if tag.startswith("B"):
begin = idx
elif tag.startswith("E"):
end = idx
word = text[begin:end + 1]
label = tag[2:]
res.append((word, label))
elif tag=='O':
res.append((text[idx], tag))
return res
class Config:
embedding_dim = 100
hidden_dim = 200
vocab_size = len(word2id)
num_tags = len(tag2id)
dropout = 0.2
lr = 0.001
weight_decay = 1e-5
def utils_to_train():
device = torch.device('cpu')
max_epoch = 1
batch_size = 32
num_workers = 4
train_dataset = NERDataset(x_train, y_train)
valid_dataset = NERDataset(x_valid, y_valid)
test_dataset = NERDataset(x_test, y_test)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
valid_data_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_data_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
config = Config()
model = NERLSTM_CRF(config).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = op.Adam(model.parameters(), lr=config.lr, weight_decay=config.weight_decay)
return max_epoch, device, train_data_loader, valid_data_loader, test_data_loader, optimizer, model
BiSTM-CRF训练
word2id = load_data()[0]
max_epoch, device, train_data_loader, valid_data_loader, test_data_loader, optimizer, model = utils_to_train()
# 中文命名体识别
class ChineseNER(object):
def train(self):
for epoch in range(max_epoch):
# 训练模式
model.train()
for index, batch in enumerate(train_data_loader):
# 梯度归零
optimizer.zero_grad()
# 训练数据-->gpu
x = batch['x'].to(device)
mask = (x > 0).to(device)
y = batch['y'].to(device)
# 前向计算计算损失
loss = model.log_likelihood(x, y, mask)
# 反向传播
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(parameters=model.parameters(),
max_norm=10)
# 更新参数
optimizer.step()
if index % 200 == 0:
print('epoch:%5d,------------loss:%f' %
(epoch, loss.item()))
# 验证损失和精度
aver_loss = 0
preds, labels = [], []
for index, batch in enumerate(valid_data_loader):
# 验证模式
model.eval()
# 验证数据-->gpu
val_x, val_y = batch['x'].to(device), batch['y'].to(device)
val_mask = (val_x > 0).to(device)
predict = model(val_x, val_mask)
# 前向计算损失
loss = model.log_likelihood(val_x, val_y, val_mask)
aver_loss += loss.item()
# 统计非0的,也就是真实标签的长度
leng = []
res = val_y.cpu()
for i in val_y.cpu():
tmp = []
for j in i:
if j.item() > 0:
tmp.append(j.item())
leng.append(tmp)
for index, i in enumerate(predict):
preds += i[:len(leng[index])]
for index, i in enumerate(val_y.tolist()):
labels += i[:len(leng[index])]
# 损失值与评测指标
aver_loss /= (len(valid_data_loader) * 64)
precision = precision_score(labels, preds, average='macro')
recall = recall_score(labels, preds, average='macro')
f1 = f1_score(labels, preds, average='macro')
report = classification_report(labels, preds)
print(report)
torch.save(model.state_dict(), 'params1.data_target_pkl')
# 预测,输入为单句,输出为对应的单词和标签
def predict(self, input_str=""):
model.load_state_dict(torch.load("../models/ner/params1.data_target_pkl"))
model.eval()
if not input_str:
input_str = input("请输入文本: ")
input_vec = []
for char in input_str:
if char not in word2id:
input_vec.append(word2id['[unknown]'])
else:
input_vec.append(word2id[char])
# convert to tensor
sentences = torch.tensor(input_vec).view(1, -1).to(device)
mask = sentences > 0
paths = model(sentences, mask)
res = parse_tags(input_str, paths[0])
return res
# 在测试集上评判性能
def test(self, test_dataloader):
model.load_state_dict(torch.load("../models/ner/params1.data_target_pkl"))
aver_loss = 0
preds, labels = [], []
for index, batch in enumerate(test_dataloader):
# 验证模式
model.eval()
# 验证数据-->gpu
val_x, val_y = batch['x'].to(device), batch['y'].to(device)
val_mask = (val_x > 0).to(device)
predict = model(val_x, val_mask)
# 前向计算损失
loss = model.log_likelihood(val_x, val_y, val_mask)
aver_loss += loss.item()
# 统计非0的,也就是真实标签的长度
leng = []
for i in val_y.cpu():
tmp = []
for j in i:
if j.item() > 0:
tmp.append(j.item())
leng.append(tmp)
for index, i in enumerate(predict):
preds += i[:len(leng[index])]
for index, i in enumerate(val_y.tolist()):
labels += i[:len(leng[index])]
# 损失值与评测指标
aver_loss /= len(test_dataloader)
precision = precision_score(labels, preds, average='macro')
recall = recall_score(labels, preds, average='macro')
f1 = f1_score(labels, preds, average='macro')
report = classification_report(labels, preds)
print(report)
if __name__ == '__main__':
cn = ChineseNER()
cn.train()
环境 | TPU VM v3-8 | |||
Precision | recall | F1-score | support | |
1 | 0.86 | 0.77 | 0.82 | 3209 |
2 | 0.82 | 0.76 | 0.79 | 4417 |
3 | 0.90 | 0.77 | 0.83 | 1591 |
4 | 0.90 | 0.80 | 0.85 | 3052 |
5 | 0.83 | 0.68 | 0.75 | 1896 |
6 | 0.84 | 0.79 | 0.82 | 7566 |
7 | 0.98 | 0.99 | 0.99 | 167824 |
Accuracy | 0.97 | 189555 | ||
Macro avg | 0.88 | 0.80 | 0.83 | 189555 |
Weighted avg | 0.97 | 0.97 | 0.97 | 189555 |
表1 BiLSTM-CRF训练效果
总体上,模型的Accuracy(整体正确率)为0.97,说明模型的预测效果较好。Macro avg和Weighted avg是分别对各个类别进行平均后的指标,Macro avg对每个类别的指标进行平均,Weighted avg对每个类别的指标进行加权平均。考虑了样本数的影响,类别7的Precision、recall和F1-score都较高,说明该类别的样本预测效果最好;而类别5的指标相对较低,说明该类别的样本预测效果最差。
操作异常问题与解决方案
1、ModuleNotFoundError: No module named “TorchCRF”
解决办法:pip install TorchCRF
2、 IndexError: tensors used as indices must be long, byte or bool tensors
解决办法:先卸载TorchCRF然后安装适配版本的TorchCRF
pip uninstall TorchCRF
pip install TorchCRF==1.0.6
3、IndexError:CUDA kernel errors might be asynchronously reported at some other API call,so the stacktrace below m
解决办法:添加以下代码,指定GPU运行
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
6、BrokenPipeError: [Errno 32] Broken pipe
解决办法显存爆满,应该换用更大显存的GPU,因此选择使用kaggle运行
7、RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cpu
解决办法:换用TPU进行运行
总结
本实验使用BiLSTM-CRF对文本进行分类,最终分类结果达到97%,分类效果较好。LSTM是一种能够对序列数据进行建模的循环神经网络,能够捕捉输入文本的上下文信息。而双向LSTM则是同时考虑正向和反向的上下文信息,进一步提高了模型的上下文信息捕捉能力。RF(Conditional Random Field)是一种无向图模型,能够对序列标注结果进行后验推断,从而更好地处理序列标注问题。
在文本分类中,可以将每个单词的特征作为节点,利用CRF进行动态规划解码,找出最优的分类结果。