中文命名实体识别
本文通过people_daily_ner数据集,介绍两段式训练过程,第一阶段是训练下游任务模型,第二阶段是联合训练下游任务模型和预训练模型,来实现中文命名实体识别任务。
一.任务和数据集介绍
1.命名实体识别任务
NER(Named Entity Recognition)和Pos(Part-of-Speech)是2类典型的标记分类问题。NER是信息抽取基础,识别文本中的实体(比如人名、地点、组织结构名等),本质就是预测每个字对应的标记。DL兴起前,主要是HMM和CRF等模型,现在基本是DL模型。可根据需要设置标注方式,常见方式有BIO、BIESO等。NER数据样例如下所示:

2.数据集介绍
本文使用中文命名实体识别数据集people_daily_ner,样例数据如下所示:

people_daily_ner数据集标签对照表如下所示:

- O:表示不属于一个命名实体。
- B-PER:表示人名的开始。
- I-PER:表示人名的中间和结尾部分。
- B-ORG:表示组织机构名的开始。
- I-ORG:表示组织机构名的中间和结尾部分。
- B-LOC:表示地名的开始。
- I-LOC:表示地名的中间和结尾部分。
3.模型架构
本文使用hfl/rbt3模型[2],参数量约3800万。基本思路为使用一个预训练模型从文本中抽取数据特征,再对每个字的数据特征做分类任务,最终得到和原文一一对应的标签序列(BIO)。
二.准备数据集
1.使用编码工具
使用hfl/rbt3编码器编码工具如下所示:
def load_encode_tool(pretrained_model_name_or_path): """ 加载编码工具 """ tokenizer = AutoTokenizer.from_pretrained(Path(f'{pretrained_model_name_or_path}')) return tokenizer if __name__ == '__main__': # 测试编码工具 pretrained_model_name_or_path = r'L:/20230713_HuggingFaceModel/rbt3' tokenizer = load_encode_tool(pretrained_model_name_or_path) print(tokenizer) # 测试编码句子 out = tokenizer.batch_encode_plus( batch_text_or_text_pairs=[ [ '海', '钓', '比', '赛', '地', '点', '在', '厦', '门', '与', '金', '门', '之', '间', '的', '海', '域', '。'], [ '这', '座', '依', '山', '傍', '水', '的', '博', '物', '馆', '由', '国', '内', '′', '一', '流', '的', '设', '计', '师', '主', '持', '设', '计', '。']], truncation=True, # 截断 padding='max_length', # [PAD] max_length=20, # 最大长度 return_tensors='pt', # 返回pytorch张量 is_split_into_words=True # 按词切分 ) # 查看编码输出 for k, v in out.items(): print(k, v.shape) # 将编码还原为句子 print(tokenizer.decode(out['input_ids'][0])) print(tokenizer.decode(out['input_ids'][1]))
输出结果如下所示:
BertTokenizerFast(name_or_path='L:\20230713_HuggingFaceModel\rbt3', vocab_size=21128, model_max_length=1000000000000000019884624838656, is_fast=True, padding_side='right', truncation_side='right', special_tokens={'unk_token': '[UNK]', 'sep_token': '[SEP]', 'pad_token': '[PAD]', 'cls_token': '[CLS]', 'mask_token': '[MASK]'}, clean_up_tokenization_spaces=True) input_ids torch.Size([2, 20]) token_type_ids torch.Size([2, 20]) attention_mask torch.Size([2, 20]) [CLS] 海 钓 比 赛 地 点 在 厦 门 与 金 门 之 间 的 海 域 。 [SEP] [CLS] 这 座 依 山 傍 水 的 博 物 馆 由 国 内 一 流 的 设 计 [SEP]
需要说明参数is_split_into_words=True让编码器跳过分词步骤,即告诉编码器输入句子是分好词的,不用再进行分词。
2.定义数据集
定义数据集代码如下所示:
`class Dataset(torch.utils.data.Dataset):
def init(self, split):
# 在线加载数据集
# dataset = load_dataset(path='people_daily_ner', split=split)
# dataset.save_to_disk(dataset_dict_path='L:/20230713_HuggingFaceModel/peoples_daily_ner')
# 离线加载数据集
dataset = load_from_disk(dataset_path='L:/20230713_HuggingFaceModel/peoples_daily_ner')[split]
# print(dataset.features['ner_tags'].feature.num_classes) #7
# print(dataset.features['ner_tags'].feature.names) # ['O','B-PER','I-PER','B-ORG','I-ORG','B-LOC','I-LOC']
self.dataset = dataset
def len(self):
return len(self.dataset)
def getitem(self, i):
tokens = self.dataset[i]['tokens']
labels = self.dataset[i]['ner_tags']
return tokens, labels
if name == 'main':
# 测试编码工具
pretrained_model_name_or_path = r'L:/20230713_HuggingFaceModel/rbt3'
tokenizer = load_encode_tool(pretrained_model_name_or_path)
# 加载数据集
dataset = Dataset('train')
tokens, labels = dataset[0]
print(tokens, labels, dataset)
print(len(dataset))
`
输出结果如下所示:
['海', '钓', '比', '赛', '地', '点', '在', '厦', '门', '与', '金', '门', '之', '间', '的', '海', '域', '。'] [0, 0, 0, 0, 0, 0, 0, 5, 6, 0, 5, 6, 0, 0, 0, 0, 0, 0] <__main__.Dataset object at 0x0000027B01DC3940> 20865
其中,20865表示训练数据集的大小。在people_daily_ner数据集中,每条数据包括两个字段,即tokens和ner_tags,分别代表句子和标签,在getitem()函数中把这两个字段取出并返回即可。
3.定义计算设备
device = 'cpu' if torch.cuda.is_available(): device = 'cuda' print(device)
4.定义数据整理函数
def collate_fn(data): tokens = [i[0] for i in data] labels = [i[1] for i in data] inputs = tokenizer.batch_encode_plus(tokens, # 文本列表 truncation=True, # 截断 padding=True, # [PAD] max_length=512, # 最大长度 return_tensors='pt', # 返回pytorch张量 is_split_into_words=True) # 分词完成,无需再次分词 # 求一批数据中最长的句子长度 lens = inputs['input_ids'].shape[1] # 在labels的头尾补充7,把所有的labels补充成统一的长度 for i in range(len(labels)): labels[i] = [7] + labels[i] labels[i] += [7] * lens labels[i] = labels[i][:lens] # 把编码结果移动到计算设备上 for k, v in inputs.items(): inputs[k] = v.to(device) # 把统一长度的labels组装成矩阵,移动到计算设备上 labels = torch.tensor(labels).to(device) return inputs, labels
形参data表示一批数据,主要是对句子和标签进行编码,这里会涉及到一个填充的问题。标签的开头和尾部填充7,因为0-6都有物理意义),而句子开头会被插入[CLS]标签。无论是句子还是标签,最终都被转换为矩阵。测试数据整理函数如下所示:
data = [ ( ['海', '钓', '比', '赛', '地', '点', '在', '厦', '门', '与', '金', '门', '之', '间', '的', '海', '域', '。'], [0, 0, 0, 0, 0, 0, 0, 5, 6, 0, 5, 6, 0, 0, 0, 0, 0, 0] ), ( ['这', '座', '依', '山', '傍', '水', '的', '博', '物', '馆', '由', '国', '内', '一', '流', '的', '设', '计', '师', '主', '持', '设', '计', ',', '整', '个', '建', '筑', '群', '精', '美', '而', '恢', '宏', '。'], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] )] inputs, labels = collate_fn(data) for k, v in inputs.items(): print(k, v.shape) print('labels', labels.shape)
输出结果如下所示:
input_ids torch.Size([2, 37]) token_type_ids torch.Size([2, 37]) attention_mask torch.Size([2, 37]) labels torch.Size([2, 37])
5.定义数据集加载器
loader = torch.utils.data.DataLoader(dataset=dataset, batch_size=16, collate_fn=collate_fn, shuffle=True, drop_last=True)
通过数据集加载器查看一批样例数据,如下所示:
for i, (inputs, labels) in enumerate(loader): break print(tokenizer.decode(inputs['input_ids'][0])) print(labels[0]) for k, v in inputs.items(): print(k, v.shape)
输出结果如下所示:
[CLS] 这 种 输 液 器 不 必 再 悬 吊 药 瓶 , 改 用 气 压 推 动 液 体 流 动 , 自 闭 防 回 流 , 安 全 、 简 便 、 抗 污 染 , 堪 称 输 液 器 历 史 上 的 一 次 革 命 。 [SEP] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] tensor([7, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7], device='cuda:0') input_ids torch.Size([16, 87]) token_type_ids torch.Size([16, 87]) attention_mask torch.Size([16, 87])
三.定义模型
1.加载预训练模型
`# 加载预训练模型
pretrained = AutoModel.from_pretrained(Path(f'{pretrained_model_name_or_path}'))
统计参数量
print(sum(i.numel() for i in pretrained.parameters()) / 10000)
测试预训练模型
pretrained.to(device)
`
2.定义下游任务模型
先介绍一个两段式训练的概念,通常是先单独对下游任务模型进行训练,然后再连同预训练模型和下游任务模型一起进行训练的模式。
`class Model(torch.nn.Module):
def init(self):
super().init()
# 标识当前模型是否处于tuning模式
self.tuning = False
# 当处于tuning模式时backbone应该属于当前模型的一部分,否则该变量为空
self.pretrained = None
# 当前模型的神经网络层
self.rnn = torch.nn.GRU(input_size=768, hidden_size=768, batch_first=True)
self.fc = torch.nn.Linear(in_features=768, out_features=8)
def forward(self, inputs):
# 根据当前模型是否处于tuning模式而使用外部backbone或内部backbone计算
if self.tuning:
out = self.pretrained(inputs).last_hidden_state
else:
with torch.no_grad():
out = pretrained(inputs).last_hiddenstate
# backbone抽取的特征输入RNN网络进一步抽取特征
out, = self.rnn(out)
# RNN网络抽取的特征最后输入FC神经网络分类
out = self.fc(out).softmax(dim=2)
return out
# 切换下游任务模型的tuning模式
def fine_tuning(self, tuning):
self.tuning = tuning
# tuning模式时,训练backbone的参数
if tuning:
for i in pretrained.parameters():
i.requires_grad = True
pretrained.train()
self.pretrained = pretrained
# 非tuning模式时,不训练backbone的参数
else:
for i in pretrained.parameters():
i.requiresgrad(False)
pretrained.eval()
self.pretrained = None
`
(1)tuning表示当前模型是否处于微调模型,pretrained表示微调模式时预训练模型属于当前模型。 (2)在init()中定义了下游任务模型的2个层,分别为GRU网络和全连接神经网络层,GRU作用是进一步抽取特征,提高模型预测正确率。 (3)fine_tuning()用来切换训练模式pretrained.train()和评估模式pretrained.eval()。
四.训练和测试
1.模型训练
def train(epochs): lr = 2e-5 if model.tuning else 5e-4 # 根据模型的tuning模式设置学习率 optimizer = AdamW(model.parameters(), lr=lr) # 优化器 criterion = torch.nn.CrossEntropyLoss() # 损失函数 scheduler = get_scheduler(name='linear', num_warmup_steps=0, num_training_steps=len(loader) * epochs, optimizer=optimizer) # 学习率衰减策略 model.train() for epoch in range(epochs): for step, (inputs, labels) in enumerate(loader): # 模型计算 # [b,lens] -> [b,lens,8] outs = model(inputs) # 对outs和labels变形,并且移除PAD # outs -> [b, lens, 8] -> [c, 8] # labels -> [b, lens] -> [c] outs, labels = reshape_and_remove_pad(outs, labels, inputs['attention_mask']) # 梯度下降 loss = criterion(outs, labels) # 计算损失 loss.backward() # 反向传播 optimizer.step() # 更新参数 scheduler.step() # 更新学习率 optimizer.zero_grad() # 清空梯度 if step % (len(loader) * epochs // 30) == 0: counts = get_correct_and_total_count(labels, outs) accuracy = counts[0] / counts[1] accuracy_content = counts[2] / counts[3] lr = optimizer.state_dict()['param_groups'][0]['lr'] print(epoch, step, loss.item(), lr, accuracy, accuracy_content) torch.save(model, 'model/中文命名实体识别.model')
训练过程基本步骤如下所示:
(1)从数据集加载器中获取一个批次的数据。
(2)让模型计算预测结果。
(2)使用工具函数对预测结果和labels进行变形,移除预测结果和labels中的PAD。
(4)计算loss并执行梯度下降优化模型参数。
(5)每隔一定的steps,输出一次模型当前的各项数据,便于观察。
(6)每训练完一个epoch,将模型的参数保存到磁盘。
接下来介绍两段式训练过程,第一阶段是训练下游任务模型,第二阶段是联合训练下游任务模型和预训练模型如下所示:
`# 两段式训练第一阶段,训练下游任务模型
model.fine_tuning(False)
print(sum(p.numel() for p in model.parameters() / 10000))
train(1)
两段式训练第二阶段,联合训练下游任务模型和预训练模型
model.fine_tuning(True)
print(sum(p.numel() for p in model.parameters() / 10000))
train(5)
`
2.模型测试
模型测试基本思路:从磁盘加载模型,然后切换到评估模式,将模型移动到计算设备,从测试集中取批次数据,输入模型中,统计正确率。
def test(): # 加载训练完的模型 model_load = torch.load('model/中文命名实体识别.model') model_load.eval() # 切换到评估模式 model_load.to(device) # 测试数据集加载器 loader_test = torch.utils.data.DataLoader(dataset=Dataset('validation'), batch_size=128, collate_fn=collate_fn, shuffle=True, drop_last=True) correct = 0 total = 0 correct_content = 0 total_content = 0 # 遍历测试数据集 for step, (inputs, labels) in enumerate(loader_test): # 测试5个批次即可,不用全部遍历 if step == 5: break print(step) # 计算 with torch.no_grad(): # [b, lens] -> [b, lens, 8] -> [b, lens] outs = model_load(inputs) # 对outs和labels变形,并且移除PAD # fouts -> [b, lens, 8] -> [c, 8] # labels -> [b, lens] -> [c] outs, labels = reshape_and_remove_pad(outs, labels, inputs['attention_mask']) # 统计正确数量 counts = get_correct_and_total_count(labels, outs) correct += counts[0] total += counts[1] correct_content += counts[2] total_content += counts[3] print(correct / total, correct_content / total_content)
3.预测任务
def predict(): # 加载模型 model_load = torch.load('model/中文命名实体识别.model') model_load.eval() model_load.to(device) # 测试数据集加载器 loader_test = torch.utils.data.DataLoader(dataset=Dataset('validation'), batch_size=32, collate_fn=collate_fn, shuffle=True, drop_last=True) # 取一个批次的数据 for i, (inputs, labels) in enumerate(loader_test): break # 计算 with torch.no_grad(): # [b, lens] -> [b, lens, 8] -> [b, lens] outs = model_load(inputs).argmax(dim=2) for i in range(32): # 移除PAD select = inputs['attention_mask'][i] == 1 input_id = inputs['input_ids'][i, select] out = outs[i, select] label = labels[i, select] # 输出原句子 print(tokenizer.decode(input_id).replace(' ', '')) # 输出tag for tag in [label, out]: s = '' for j in range(len(tag)): if tag[j] == 0: s += '.' continue s += tokenizer.decode(input_id[j]) s += str(tag[j].item()) print(s) print('=====================')
参考文献
[1] HuggingFace自然语言处理详解:基于BERT中文模型的任务实战
[2] https://huggingface.co/hfl/rbt3
[3] https://huggingface.co/datasets/peoples\_daily\_ner/tree/main
[4] https://github.com/OYE93/Chinese-NLP-Corpus/
[5] https://github.com/ai408/nlp-engineering/blob/main/20230625\_HuggingFace自然语言处理详解/第10章:中文命名实体识别.py
[6] 中文命名实体识别:https://z0yrmerhgi8.feishu.cn/wiki/YpuuwA6q0iBQEWkFo5tcwV9rnje
(文:NLP工程化)