# -*- coding = utf-8 -*- # @Time: 18:02 # @Author:ZYP # @File:util.py # @mail:zypsunshine1@gmail.com # @Software: PyCharm # ========================================================================================= # 工具类 # 用于加载停用词、数据库、word2vec、fasttext模型 # ========================================================================================= import os import time import math import json import jieba import numpy as np import requests from collections import defaultdict from textrank4zh import TextRank4Keyword jieba.initialize() stop_word_path = '/home/zc-nlp-zyp/work_file/ssd_data/program/check_paper/fasttext_train/data/total_stopwords.txt' class Logging: def __init__(self): pass def log(*args, **kwargs): format = '%Y/%m/%d-%H:%M:%S' format_h = '%Y-%m-%d' value = time.localtime(int(time.time())) dt = time.strftime(format, value) dt_log_file = time.strftime(format_h, value) log_file = 'gunicornLogs/access-%s-%s' % (str(os.getpid()), dt_log_file) + ".log" if not os.path.exists(log_file): with open(os.path.join(log_file), 'w', encoding='utf-8') as f: print(dt, *args, file=f, **kwargs) else: with open(os.path.join(log_file), 'a+', encoding='utf-8') as f: print(dt, *args, file=f, **kwargs) def load_stopwords(path=stop_word_path): """加载停用词""" with open(path, 'r', encoding='utf-8') as f: stop_words = {i.strip() for i in f.readlines()} return stop_words def cut_text(text_str, tokenizer='jieba'): """使用相应的分词算法对文章进行分词,然后统计每个单词的词频,按照降序返回相应的字典""" word_dict = defaultdict(int) if tokenizer == 'jieba': all_word_list = jieba.cut(text_str) for word in all_word_list: if word not in stop_word: word_dict[word] += 1 # elif tokenizer == 'hanlp': # for i in HanLP.segment(text_str): # if i.word not in stop_word and i.nature != 'w': # word_dict[i.word] += 1 else: print('您输入的 tokenizer 参数有误!') return {k: v for k, v in sorted(word_dict.items(), key=lambda x: x[1], reverse=True)} def l2_normal(tf_idf_dict): """对计算出来的tf-idf字典进行归一化,归一到(0-1)之间""" l2_norm = math.sqrt(sum(map(lambda x: x ** 2, tf_idf_dict.values()))) tf_idf_dict1 = sorted(tf_idf_dict.items(), key=lambda x: x[1] / l2_norm, reverse=True) tf_idf_dict2 = {key: value / l2_norm for key, value in tf_idf_dict1[:15]} return tf_idf_dict2 def get_word_vec(word): """根据相应的词语,使用模型进行提取词语向量,如果不存在词表中返回0,存在词表中返回对应向量""" vec = requests.post('http://192.168.31.74:50001/word2vec', data=json.dumps({'word': word}), timeout=100) if len(vec.text) < 100: vec = requests.post('http://192.168.31.74:50002/fasttext', data=json.dumps({'word': word}), timeout=100) if len(vec.text) < 100: vec = 0 return vec else: json_dict = json.loads(vec.text) res_vec = np.array([float(j) for j in json_dict["vec"].split(',')], dtype=np.float64) return res_vec else: json_dict = json.loads(vec.text) res_vec = np.array([float(j) for j in json_dict["vec"].split(',')], dtype=np.float64) return res_vec stop_word = load_stopwords() tr4w = TextRank4Keyword(stop_words_file=stop_word_path)