# -*- coding = utf-8 -*- # @Time: 2023/3/16 18:27 # @Author:ZYP # @File:SearchSimSentence.py # @mail:zypsunshine1@gmail.com # @Software: PyCharm # ========================================================================================= # 句子之间的查重 # · 句子之间通过 word2vec、fasttext 词向量模型,进行查重 # · 句子间通过 深度学习模型 进行查重 # ========================================================================================= import os import re import json import numpy as np from pyhanlp import HanLP from collections import defaultdict from CheckPaper.LoadRoformer import return_sent_vec from CheckPaper.util import stop_word, get_word_vec from sklearn.metrics.pairwise import cosine_similarity os.environ['JAVA_HOME'] = '/home/zc-nlp-zyp/work_file/software/jdk1.8.0_341' def text2vec(paper_dict): """ 先将句子进行分词,然后使用word2vec模型进行向量转化,最后加权 :param paper_dict: 句子详细信息 :return:返回{句子:向量} """ in_check_str = paper_dict['title'] + '。' + paper_dict['abst_zh'] + '。' + paper_dict['content'] in_check_sent_list = re.split(r'。|,|:|;|!|?', in_check_str) sent_dict = {} for sent in in_check_sent_list: word_list = HanLP.segment(sent) sent_vec = [] value_sum = 0.0 for i in [word.word for word in word_list if word.word not in stop_word and word.nature != '\w']: if i in paper_dict['title']: weight = 0.5 elif i in paper_dict['abst_zh']: weight = 0.3 else: weight = 0.2 word_vec = get_word_vec(i) if word_vec == 0: continue vec = (weight * word_vec).tolist() value_sum += weight sent_vec.append(vec) # sent_vec = np.sum(np.array(sent_vec), axis=0) / len(sent_vec) # [1, 300] sent_vec = np.sum(np.array(sent_vec), axis=0) / value_sum # [1, 300] sent_dict[sent] = sent_vec.tolist() return sent_dict def deal_in_paper(paper_dict): """将句子进行分句,然后返回对应的句子列表""" in_check_str = paper_dict['title'] + '。' + paper_dict['abst_zh'] + '。' + paper_dict['content'] in_check_sent_list = re.split(r'。|,|:|;|!|?', in_check_str) return in_check_sent_list def check_repeat_by_model(paper_dict, sim_paper_id_dict, threshold): """ 送检文章 与 相似文章进行查重 :param threshold: 重复率阈值 :param paper_dict: 处理好相应格式的 送检文章(字典形式) :param sim_paper_id_dict: 查出来同类的相似文章 格式:{doc_id:(相似度得分,文档路径)} :return: 返回每一篇文章相似度大于 85 % 的句子 {doc_id:{原句子:[doc_id 下的相似句]}} """ res_dict = defaultdict(dict) # { # doc_id1:{ # 送检句子1:[相似句1,相似句2,相似句3 ...], # 送检句子2:[相似句1,相似句2,相似句3 ...] # } # doc_id2:{ # 送检句子1:[相似句1,相似句2,相似句3 ...], # 送检句子2:[相似句1,相似句2,相似句3 ...] # } # ... # } in_check_sent_list = deal_in_paper(paper_dict) # 加载模型,将句子转化成向量 in_check_sent, in_check_vec = return_sent_vec(in_check_sent_list) for doc_id, (_, path) in sim_paper_id_dict.items(): with open(path, 'r', encoding='utf-8') as f: json_dict = json.load(f) check_sent_list = deal_in_paper(json_dict) out_check_sent, out_check_vec = return_sent_vec(check_sent_list) sim_matrix = cosine_similarity(in_check_vec, out_check_vec) for index, i in enumerate(sim_matrix): sim_id = np.where(np.where(i >= threshold, 1, 0) == 1)[0].tolist() if len(sim_id) != 0: res_dict[doc_id] = { res_dict[doc_id][in_check_sent[index]].append(out_check_sent[j]) if in_check_sent[i] in res_dict[ doc_id].keys() else res_dict[doc_id][in_check_sent[index]]: [out_check_sent[j]] for j in sim_id } return res_dict def check_repeat_by_word2vec(paper_dict, sim_paper_id_dict, threshold): """ 送检文章 与 相似文章进行查重 :param threshold: 重复率阈值 :param paper_dict: 处理好相应格式的 送检文章(字典形式) :param sim_paper_id_dict: 查出来同类的相似文章 格式:{doc_id:(相似度得分,文档路径)} :return: 返回每一篇文章相似度大于 85 % 的句子 {doc_id:{原句子:[doc_id 下的相似句]}} """ in_sent_dict = text2vec(paper_dict) # {sent1:vec1, sent2:vec2, sent3:vec3...} check_dict = {} # {doc_id1:{sent1:vec1,sent2:vec2...},doc_id2:{sent1:vec1,sent2:vec2...}...} for doc_id, (_, path) in sim_paper_id_dict.items(): with open(path, 'r', encoding='utf-8') as f: text_dict = json.load(f) sent_dict_ = text2vec(text_dict) check_dict[doc_id] = sent_dict_ in_sent_list = [sent for sent, vec in in_sent_dict.items()] in_sent_vec_list = [vec for sent, vec in in_sent_dict.items()] # [sent_num_in, 300] total_result = {} # {doc_id:{sent1:[sim_sent1, sim_sent2...], sent2:[sim_sent1, sim_sent2...]...}} for doc_id, sent_dict in check_dict.items(): result = {sent: [] for sent, _ in in_sent_dict.items()} # {sent:[(doc_id,sim_sent)]} every_sent_list = [sent for sent, vec in sent_dict.items()] every_sent_vec_list = [vec for sent, vec in sent_dict.items()] # [sent_num_every, 300] sim_score = cosine_similarity(np.array(in_sent_vec_list), np.array(every_sent_vec_list)) # [sent_num_in, sent_num_every] for check_index, sim_array in enumerate(sim_score): sim_id = np.where(np.where(sim_array >= threshold, 1, 0) == 1)[0] if len(sim_id) != 0: for i in sim_id: result[in_sent_list[check_index]].append(every_sent_list[i]) else: pass total_result[doc_id] = result return total_result