基于Transformer的中文文本分类
前言我在github上发现了一个有意思的项目Chinese-Text-Classification-Pytorch使用pytorch复现了基于Transformer的中文文本分类。中文数据集我从THUCNews中抽取了20万条新闻标题文本长度在20到30之间。一共10个类别每类2万条。以字为单位输入模型使用了预训练词向量搜狗新闻 WordCharacter 300d。类别财经、房产、股票、教育、科技、社会、时政、体育、游戏、娱乐。数据集在githubChinese-Text-Classification-Pytorch数据集划分数据集数据量训练集18万验证集1万测试集1万代码实现1. 首先我们要对数据信息处理将文字转化成模型可以使用的张量并将我们的数据集分割成训练集、验证集和测试集。utils_fasttext.py实现了以下功能词汇表构建通过build_vocab()函数统计训练数据中的词频过滤低频词min_freq1限制词汇表大小MAX_VOCAB_SIZE10000并为每个词分配索引同时添加UNK未知词和PAD填充特殊标记数据集加载通过load_dataset()函数读取数据文件将文本分词后转换为对应的词索引序列并进行填充pad_size32或截断处理N-gram特征为每个位置计算 Bigram 和 Trigram 哈希特征用于增强模型对局部词序的感知数据集分割通过build_dataset()函数分别加载训练集config.train_path、验证集config.dev_path和测试集config.test_path返回词汇表和三个数据集批量迭代器通过DataSetIterate类将数据按批次batch_size组织成迭代器并将数据转换为 PyTorch 张量torch.LongTensor并移动到指定设备config.deviceimport os import torch import numpy as np from tqdm import tqdm import pickle as pkl import time from datetime import timedelta MAX_VOCAB_SIZE 10000 UNK, PAD UNK, PAD def build_vocab(file_path,tokenizer,max_size,min_freq): vocab_dic{} with open(file_path,r,encodingutf-8) as f: for line in tqdm(f): line.strip() if not line: continue contentline.split(\t)[0] for word in tokenizer(content): vocab_dic[word]vocab_dic.get(word,0)1 vocab_listsorted([_ for _ in vocab_dic.items() if _[1]min_freq],keylambda x: x[1],reverseTrue)[:max_size] vocab_dic{wordcount[0]:idx for idx,wordcount in enumerate(vocab_list)} vocab_dic.update({UNK:len(vocab_dic),PAD:len(vocab_dic)1}) return vocab_dic def build_dataset(config,ues_word): if ues_word: Tokenizerlambda x:x.split( ) else: Tokenizerlambda x:[y for y in x] if os.path.exists(config.vocab_path): vocabpkl.load(open(config.vocab_path,rb)) else: vocabbuild_vocab(config.train_path,Tokenizer,max_sizeMAX_VOCAB_SIZE,min_freq1) pkl.dump(vocab,open(config.vocab_path,wb)) print(fvocab的大小为{len(vocab)}\n) def biGremHash(sequence,t,buckets): t1sequence[t-1] if t-10 else 0 return (t1 * 14918087) % buckets def triGremHash(sequence,t,buckets): t1sequence[t-1] if t-1 0 else 0 t2sequence[t-2] if t-2 0 else 0 return (t2 * 14918087 * 18408749 t1 * 14918087) % buckets def load_dataset(path,pad_size32): contents[] with open(path,r, encodingUTF-8) as f : for line in tqdm(f): lineline.strip() if not line: continue content,labelline.split(\t) words_line[] tokenTokenizer(content) seq_lenlen(token) if seq_lenpad_size: token.extend([PAD]*(pad_size-seq_len)) else: tokentoken[:pad_size] seq_lenpad_size for word in token: words_line.append(vocab.get(word,vocab.get(UNK))) bucketsconfig.n_vocab bigrem[] trigrem[] for i in range(pad_size): bigrem.append(biGremHash(words_line,i,buckets)) trigrem.append(triGremHash(words_line, i, buckets)) contents.append((words_line,int(label),seq_len,bigrem,trigrem)) return contents train load_dataset(config.train_path,config.pad_size) dev load_dataset(config.dev_path, config.pad_size) test load_dataset(config.test_path, config.pad_size) return vocab,train,dev,test class DataSetIterate(object): def __init__(self,batches,batch_size,device): self.batchesbatches self.batch_sizebatch_size self.devicedevice self.n_batcheslen(batches)//batch_size self.residue False # 记录batch数量是否为整数 if len(batches) % self.batch_size ! 0: self.residue True self.index 0 def _to_tensor(self,datas): x torch.LongTensor([_[0] for _ in datas]).to(self.device) y torch.LongTensor([_[1] for _ in datas]).to(self.device) bigram torch.LongTensor([_[3] for _ in datas]).to(self.device) trigram torch.LongTensor([_[4] for _ in datas]).to(self.device) seq_len torch.LongTensor([_[2] for _ in datas]).to(self.device) return (x, seq_len, bigram, trigram), y def __next__(self): #先处理最后一组不满batch——size的情况 if self.residue and self.index self.n_batches: batchesself.batches[self.index*self.batch_size:len(self.batches)] self.index1 batchesself._to_tensor(batches) return batches elif self.indexself.n_batches: self.index0 raise StopIteration else: batches self.batches[self.index * self.batch_size:(self.index1) * self.batch_size] self.index 1 batches self._to_tensor(batches) return batches def __iter__(self): return self def __len__(self): if self.residue: return self.n_batches1 else: return self.n_batches def bulid_iterator(dataset,config): iterDataSetIterate(dataset,config.batch_size,config.device) return iter def get_time_dif(start_time): 获取已使用时间 end_time time.time() time_dif end_time - start_time return timedelta(secondsint(round(time_dif)))2.基于Transformer的中文文本分类模型架构import torch import torch.nn as nn import torch.nn.functional as F import numpy as np import copy from torch.nn.functional import multi_head_attention_forward class Config(object): 配置参数 def __init__(self,dataset,embedding): self.model_nameTransformer self.train_path dataset /data/train.txt # 训练集 self.dev_path dataset /data/dev.txt # 验证集 self.test_path dataset /data/test.txt#测试集 self.class_list[x.strip() for x in open(dataset/data/class.txt, encodingutf-8).readlines()]#分类标签 self.vocab_pathdataset/data/vocab.pkl#词表 self.save_path dataset /saved_dict/ self.model_name .ckpt # 模型训练结果 self.log_path dataset /log/ self.model_name self.embedding_pretrained torch.tensor( np.load(dataset /data/ embedding)[embeddings].astype(float32)) if embedding ! random else None # 预训练词向量 self.device torch.device(cuda if torch.cuda.is_available() else cpu) # 设备 self.dropout 0.5 # 随机失活 self.require_improvement 2000 # 若超过1000batch效果还没提升则提前结束训练 self.num_classes len(self.class_list) # 类别数 self.n_vocab 0 # 词表大小在运行时赋值 self.num_epochs 20 # epoch数 self.batch_size 128 # mini-batch大小 self.pad_size 32 # 每句话处理成的长度(短填长切) self.learning_rate 5e-4 # 学习率 self.embed self.embedding_pretrained.size(1) if self.embedding_pretrained is not None else 300 # 字向量维度 self.dim_model 300 self.hidden 1024 self.last_hidden 512 self.num_head 5 self.num_encoder 2 self.n_gram_vocab8 class Positional_Emcoding(nn.Module): def __init__(self,embed,pad_size,dropout,device): super().__init__() self.devicedevice self.petorch.tensor([[pos/(10000**(i // 2 * 2.0 / embed)) for i in range(embed)] for pos in range(pad_size)]) self.pe[:,0::2]np.sin(self.pe[:,0::2]) self.pe[:,1::2]np.cos(self.pe[:,1::2]) self.dropoutnn.Dropout(dropout) def forward(self,x): outxself.pe outself.dropout(out) return out class Scaled_Dot_Product_Attention(nn.Module): def __init__(self): super().__init__() def forward(self,Q,K,V,scaleNone): attentiontorch.matmul(Q,K.permute(0,2,1)) if scale: attention attention * scale attentionF.softmax(attention,dim-1) contexttorch.matmul(attention,V) return context class Multi_head_attention(nn.Module): def __init__(self,dim_model,num_head,dropout0): super().__init__() self.num_headnum_head assert dim_model%num_head0 self.dim_headdim_model//num_head self.fc_Qnn.Linear(dim_model,num_head*self.dim_head) self.fc_K nn.Linear(dim_model, num_head * self.dim_head) self.fc_V nn.Linear(dim_model, num_head * self.dim_head) self.attentionScaled_Dot_Product_Attention() self.fcnn.Linear(num_head*self.dim_head,dim_model) self.dropoutnn.Dropout(dropout) self.layernormnn.LayerNorm(dim_model) def forward(self,x): batch_sizex.shape[0] Qself.fc_Q(x) Kself.fc_K(x) Vself.fc_V(x) Q Q.view(batch_size*self.num_head,-1,self.dim_head) K K.view(batch_size * self.num_head, -1, self.dim_head) V V.view(batch_size * self.num_head, -1, self.dim_head) scale1/K.size(-1)**(1/2) contextself.attention(Q,K,V,scale) contextcontext.view(batch_size,-1,self.num_head*self.dim_head) outself.fc(context) out self.dropout(out) out out x # 残差连接 out self.layernorm(out) return out class Position_wise_Feed_Forward(nn.Module): def __init__(self,dim_model,hidden,dropout0): super().__init__() self.fc1nn.Linear(dim_model,hidden) self.fc2nn.Linear(hidden,dim_model) self.dropoutnn.Dropout(dropout) self.layer_norm nn.LayerNorm(dim_model) def forward(self,x): out self.fc1(x) out F.relu(out) out self.fc2(out) out self.dropout(out) out out x # 残差连接 out self.layer_norm(out) return out class Encoder(nn.Module): def __init__(self,dim_model,num_head,hidden,dropout): super().__init__() self.attentionMulti_head_attention(dim_model,num_head,dropout) self.feed_forward Position_wise_Feed_Forward(dim_model, hidden, dropout) def forward(self, x): out self.attention(x) out self.feed_forward(out) return out class Model(nn.Module): def __init__(self,config): super().__init__() if config.embedding_pretrained is not None: self.embeddingnn.Embedding.from_pretrained(config.embedding_pretrained,freezeFalse) else: self.embeddingnn.Embedding(config.n_vocab,config.embed, padding_idxconfig.n_vocab - 1) self.positional_embeddingPositional_Emcoding(config.embed,config.pad_size,config.dropout,config.device) self.encoderEncoder(config.dim_model, config.num_head, config.hidden, config.dropout) self.encodersnn.ModuleList([copy.deepcopy(self.encoder) for _ in range(config.num_encoder)]) self.fc1 nn.Linear(config.pad_size * config.dim_model, config.num_classes) def forward(self, x): out self.embedding(x[0]) out self.positional_embedding(out) for encoder in self.encoders: out encoder(out) out out.view(out.size(0), -1) # out torch.mean(out, 1) out self.fc1(out) return out3.模型训练和测试代码import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from sklearn import metrics import time from utils_fasttext import get_time_dif from tensorboardX import SummaryWriter def init_network(model,methodxavier, excludeembedding, seed123): for name,w in model.named_parameters(): if exclude not in name: if weight in name: if method xavier: nn.init.xavier_normal_(w) elif method kaiming: nn.init.kaiming_normal_(w) else: nn.init.normal_(w) elif bias in name: nn.init.constant_(w,0) else: pass def train(config,model,train_iter,dev_iter,test_iter): start_timetime.time() model.train() optimizertorch.optim.Adam(model.parameters(),lrconfig.learning_rate) total_batch0 best_val_lossfloat(inf) last_improve0 flagFalse writerSummaryWriter(log_dirconfig.log_path / time.strftime(%m-%d_%H.%M, time.localtime())) for epoch in range(config.num_epoch): print(Epoch[{}/{}].format(epoch1,config.num_epoch)) for i,(train,labels) in enumerate(train_iter): outputsmodel(train) model.zero_grad() lossF.cross_entropy(outputs,labels) loss.backward() optimizer.step() if total_batch%1000: truelabels.data.cpu() predicttorch.max(outputs,1)[1].cpu() train_accmetrics.accuracy_score(true,predict) dev_acc,dev_lossevaluate(config,model,dev_iter) if dev_lossbest_val_loss: best_val_lossdev_loss last_improvetotal_batch torch.save(model.state_dict(),config.save_path) improve* else: improve time_difget_time_dif(start_time) msgIter: {0:6}, Train Loss: {1:5.2}, Train Acc: {2:6.2%}, Val Loss: {3:5.2}, Val Acc: {4:6.2%}, Time: {5} {6} print(msg.format(total_batch, loss.item(), train_acc, dev_loss, dev_acc, time_dif, improve)) writer.add_scalar(loss/train,loss.item(),total_batch) writer.add_scalar(loss/dev, dev_loss, total_batch) writer.add_scalar(acc/train, train_acc, total_batch) writer.add_scalar(acc/dev, dev_acc, total_batch) model.train() total_batch1 if total_batch - last_improve config.require_improvement: # 验证集loss超过1000batch没下降结束训练 print(No optimization for a long time, auto-stopping...) flag True break if flag: break writer.close() test(config,model,test_iter) def evaluate(config,model,data_iter,testFalse): model.eval() loss_total 0 pretict_allnp.array([],dtypeint) true_allnp.array([],dtypeint) with torch.no_grad(): for texts,labels in data_iter: outputsmodel(texts) lossF.cross_entropy(outputs,labels) loss_totalloss preticttorch.max(outputs.data,1)[1].cpu().numpy() labellabels.data.cpu().numpy() pretict_allnp.append(pretict_all,pretict) true_allnp.append(true_all,labels) accmetrics.accuracy_score(true_all,pretict_all) if test: reportmetrics.classification_report(true_all,pretict_all,target_namesconfig.class_list,digits4) confusionmetrics.confusion_matrix(true_all,pretict_all) return acc,loss_total/len(data_iter),report,confusion return acc, loss_total / len(data_iter) def test(config,model,data_iter): model.load_state_dict(torch.load(config.save_path)) model.eval() start_timetime.time() test_acc, test_loss, test_report, test_confusion evaluate(config, model, data_iter, testTrue) msg Test Loss: {0:5.2}, Test Acc: {1:6.2%} print(msg.format(test_loss, test_acc)) print(Precision, Recall and F1-Score...) print(test_report) print(Confusion Matrix...) print(test_confusion) time_dif get_time_dif(start_time) print(Time usage:, time_dif)4.运行代码import torch import torch.nn as nn import torch.nn.functional as F import numpy as np import copy class Config(object): 配置参数 def __init__(self, dataset, embedding): self.model_name Transformer self.train_path dataset /data/train.txt # 训练集 self.dev_path dataset /data/dev.txt # 验证集 self.test_path dataset /data/test.txt # 测试集 self.class_list [x.strip() for x in open( dataset /data/class.txt, encodingutf-8).readlines()] # 类别名单 self.vocab_path dataset /data/vocab.pkl # 词表 self.save_path dataset /saved_dict/ self.model_name .ckpt # 模型训练结果 self.log_path dataset /log/ self.model_name self.embedding_pretrained torch.tensor( np.load(dataset /data/ embedding)[embeddings].astype(float32))\ if embedding ! random else None # 预训练词向量 self.device torch.device(cuda if torch.cuda.is_available() else cpu) # 设备 self.dropout 0.5 # 随机失活 self.require_improvement 2000 # 若超过1000batch效果还没提升则提前结束训练 self.num_classes len(self.class_list) # 类别数 self.n_vocab 0 # 词表大小在运行时赋值 self.num_epochs 20 # epoch数 self.batch_size 128 # mini-batch大小 self.pad_size 32 # 每句话处理成的长度(短填长切) self.learning_rate 5e-4 # 学习率 self.embed self.embedding_pretrained.size(1) if self.embedding_pretrained is not None else 300 # 字向量维度 self.dim_model 300 self.hidden 1024 self.last_hidden 512 self.num_head 5 self.num_encoder 2 self.n_gram_vocab8 Attention Is All You Need class Model(nn.Module): def __init__(self, config): super(Model, self).__init__() if config.embedding_pretrained is not None: self.embedding nn.Embedding.from_pretrained(config.embedding_pretrained, freezeFalse) else: self.embedding nn.Embedding(config.n_vocab, config.embed, padding_idxconfig.n_vocab - 1) self.postion_embedding Positional_Encoding(config.embed, config.pad_size, config.dropout, config.device) self.encoder Encoder(config.dim_model, config.num_head, config.hidden, config.dropout) self.encoders nn.ModuleList([ copy.deepcopy(self.encoder) # Encoder(config.dim_model, config.num_head, config.hidden, config.dropout) for _ in range(config.num_encoder)]) self.fc1 nn.Linear(config.pad_size * config.dim_model, config.num_classes) # self.fc2 nn.Linear(config.last_hidden, config.num_classes) # self.fc1 nn.Linear(config.dim_model, config.num_classes) def forward(self, x): out self.embedding(x[0]) out self.postion_embedding(out) for encoder in self.encoders: out encoder(out) out out.view(out.size(0), -1) # out torch.mean(out, 1) out self.fc1(out) return out class Encoder(nn.Module): def __init__(self, dim_model, num_head, hidden, dropout): super(Encoder, self).__init__() self.attention Multi_Head_Attention(dim_model, num_head, dropout) self.feed_forward Position_wise_Feed_Forward(dim_model, hidden, dropout) def forward(self, x): out self.attention(x) out self.feed_forward(out) return out class Positional_Encoding(nn.Module): def __init__(self, embed, pad_size, dropout, device): super(Positional_Encoding, self).__init__() self.device device self.pe torch.tensor([[pos / (10000.0 ** (i // 2 * 2.0 / embed)) for i in range(embed)] for pos in range(pad_size)]) self.pe[:, 0::2] np.sin(self.pe[:, 0::2]) self.pe[:, 1::2] np.cos(self.pe[:, 1::2]) self.dropout nn.Dropout(dropout) def forward(self, x): out x nn.Parameter(self.pe, requires_gradFalse).to(self.device) out self.dropout(out) return out class Scaled_Dot_Product_Attention(nn.Module): Scaled Dot-Product Attention def __init__(self): super(Scaled_Dot_Product_Attention, self).__init__() def forward(self, Q, K, V, scaleNone): Args: Q: [batch_size, len_Q, dim_Q] K: [batch_size, len_K, dim_K] V: [batch_size, len_V, dim_V] scale: 缩放因子 论文为根号dim_K Return: self-attention后的张量以及attention张量 attention torch.matmul(Q, K.permute(0, 2, 1)) if scale: attention attention * scale # if mask: # TODO change this # attention attention.masked_fill_(mask 0, -1e9) attention F.softmax(attention, dim-1) context torch.matmul(attention, V) return context class Multi_Head_Attention(nn.Module): def __init__(self, dim_model, num_head, dropout0.0): super(Multi_Head_Attention, self).__init__() self.num_head num_head assert dim_model % num_head 0 self.dim_head dim_model // self.num_head self.fc_Q nn.Linear(dim_model, num_head * self.dim_head) self.fc_K nn.Linear(dim_model, num_head * self.dim_head) self.fc_V nn.Linear(dim_model, num_head * self.dim_head) self.attention Scaled_Dot_Product_Attention() self.fc nn.Linear(num_head * self.dim_head, dim_model) self.dropout nn.Dropout(dropout) self.layer_norm nn.LayerNorm(dim_model) def forward(self, x): batch_size x.size(0) Q self.fc_Q(x) K self.fc_K(x) V self.fc_V(x) Q Q.view(batch_size * self.num_head, -1, self.dim_head) K K.view(batch_size * self.num_head, -1, self.dim_head) V V.view(batch_size * self.num_head, -1, self.dim_head) # if mask: # TODO # mask mask.repeat(self.num_head, 1, 1) # TODO change this scale K.size(-1) ** -0.5 # 缩放因子 context self.attention(Q, K, V, scale) context context.view(batch_size, -1, self.dim_head * self.num_head) out self.fc(context) out self.dropout(out) out out x # 残差连接 out self.layer_norm(out) return out class Position_wise_Feed_Forward(nn.Module): def __init__(self, dim_model, hidden, dropout0.0): super(Position_wise_Feed_Forward, self).__init__() self.fc1 nn.Linear(dim_model, hidden) self.fc2 nn.Linear(hidden, dim_model) self.dropout nn.Dropout(dropout) self.layer_norm nn.LayerNorm(dim_model) def forward(self, x): out self.fc1(x) out F.relu(out) out self.fc2(out) out self.dropout(out) out out x # 残差连接 out self.layer_norm(out) return out
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2561543.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!