# 这是一个示例 Python 脚本。 # 按 Shift+F10 执行或将其替换为您的代码。 # 按 双击 Shift 在所有地方搜索类、文件、工具窗口、操作和设置。 import os import faiss import numpy as np from tqdm import tqdm from sentence_transformers import SentenceTransformer import requests import time from flask import Flask, jsonify, Response, request from openai import OpenAI from flask_cors import CORS import pandas as pd import concurrent.futures import json from threading import Thread import redis app = Flask(__name__) CORS(app) app.config["JSON_AS_ASCII"] = False pool = redis.ConnectionPool(host='localhost', port=63179, max_connections=100, db=1, password="zhicheng123*") redis_ = redis.Redis(connection_pool=pool, decode_responses=True) db_key_query = 'query' db_key_querying = 'querying' batch_size = 32 openai_api_key = "token-abc123" openai_api_base = "http://127.0.0.1:12011/v1" client = OpenAI( api_key=openai_api_key, base_url=openai_api_base, ) models = client.models.list() model = models.data[0].id # model = "1" model_encode = SentenceTransformer('/home/majiahui/project/models-llm/bge-large-zh-v1.5') propmt_connect = '''我是一名中医,你是一个中医的医生的助理,我的患者有一个症状,症状如下: {} 根据这些症状,我通过查找资料,{} 请根据上面的这些资料和方子,并根据每篇文章的转发数确定文章的重要程度,转发数越高的文章,最终答案的参考度越高,反之越低。根据患者的症状和上面的文章的资料的重要程度以及文章和症状的匹配程度,帮我开出正确的药方和治疗方案''' propmt_connect_ziliao = '''在“{}”资料中,有如下相关内容: {}''' def dialog_line_parse(text): """ 将数据输入模型进行分析并输出结果 :param url: 模型url :param text: 进入模型的数据 :return: 模型返回结果 """ url_predict = "http://118.178.228.101:12004/predict" response = requests.post( url_predict, json=text, timeout=100000 ) if response.status_code == 200: return response.json() else: # logger.error( # "【{}】 Failed to get a proper response from remote " # "server. Status Code: {}. Response: {}" # "".format(url, response.status_code, response.text) # ) print("【{}】 Failed to get a proper response from remote " "server. Status Code: {}. Response: {}" "".format(url_predict, response.status_code, response.text)) return {} # ['choices'][0]['message']['content'] # # text = text['messages'][0]['content'] # return_text = { # 'code': 200, # 'id': "1", # 'object': 0, # 'created': 0, # 'model': 0, # 'choices': [ # { # 'index': 0, # 'message': { # 'role': 'assistant', # 'content': text # }, # 'logprobs': None, # 'finish_reason': 'stop' # } # ], # 'usage': 0, # 'system_fingerprint': 0 # } # return return_text def shengcehng_array(data): embs = model_encode.encode(data, normalize_embeddings=True) return embs def Building_vector_database(title, df): # 加载需要处理的数据(有效且未向量化) to_process = df[(df["有效"] == True) & (df["已向量化"] == False)] if len(to_process) == 0: print("无新增数据需要向量化") return # 生成向量数组 new_vectors = shengcehng_array(to_process["总结"].tolist()) # 假设这是你的向量生成函数 # 加载现有向量库和索引 vector_path = f"data_np/{title}.npy" index_path = f"data_np/{title}_index.json" vectors = np.load(vector_path) if os.path.exists(vector_path) else np.empty((0, 1024)) index_data = {} if os.path.exists(index_path): with open(index_path, "r") as f: index_data = json.load(f) # 更新索引和向量库 start_idx = len(vectors) vectors = np.vstack([vectors, new_vectors]) for i, (_, row) in enumerate(to_process.iterrows()): index_data[row["ID"]] = { "row": start_idx + i, "valid": True } # 保存数据 np.save(vector_path, vectors) with open(index_path, "w") as f: json.dump(index_data, f) # 标记已向量化 df.loc[to_process.index, "已向量化"] = True df.to_csv(f"data_file_res/{title}.csv", sep="\t", index=False) def delete_data(title, data_id): # 更新CSV标记 csv_path = f"data_file_res/{title}.csv" df = pd.read_csv(csv_path, sep="\t", dtype={"ID": str}) df.loc[df["ID"] == data_id, "有效"] = False df.to_csv(csv_path, sep="\t", index=False) def check_file_exists(file_path): """ 检查文件是否存在 参数: file_path (str): 要检查的文件路径 返回: bool: 文件存在返回True,否则返回False """ return os.path.isfile(file_path) def ulit_request_file(new_id, sentence, title): file_name_res_save = f"data_file_res/{title}.csv" # 初始化或读取CSV文件 if os.path.exists(file_name_res_save): df = pd.read_csv(file_name_res_save, sep="\t") # # 检查是否已存在相同正文 # if sentence in df["正文"].values: # print("正文已存在,跳过处理") # return df else: df = pd.DataFrame(columns=["ID", "正文", "总结", "有效", "已向量化", "向量"]) new_row = { "ID": new_id, "正文": sentence, "总结": None, "有效": True, "已向量化": False, "向量": None, } df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) # 筛选需要处理的记录 to_process = df[(df["总结"].isna()) & (df["有效"] == True)] # 调用API生成总结(示例保留原有逻辑) data_list = [] for _, row in to_process.iterrows(): data_list.append({ "model": "gpt-4-turbo", "messages": [{ "role": "user", "content": f"{row['正文']}\n以上这条中可能包含了一些病情或者症状,请帮我归纳这条中所对应的病情或者症状是哪些,总结出来,不需要很长,简单归纳即可,直接输出症状或者病情,可以包含一些形容词来辅助描述,不需要有辅助词汇" }], "top_p": 0.9, "temperature": 0.6 }) # 并发处理请求 with concurrent.futures.ThreadPoolExecutor(200) as executor: results = list(executor.map(dialog_line_parse, data_list)) # 更新总结字段 for idx, result in zip(to_process.index, results): summary = result['choices'][0]['message']['content'] df.at[idx, "总结"] = summary # df.loc[df.index[2], "总结"] = None # df.loc[df.index[3], "总结"] = None # df.loc[df.index[4], "总结"] = None # df.loc[df.index[5], "总结"] = None df_ce = df[(df["有效"] == True) & (df["总结"].notnull())] for idx in df_ce.index: a = shengcehng_array([df_ce.at[idx, "总结"]]) df.at[idx, "向量"] = json.dumps(a[0].tolist()) df.at[idx, "已向量化"] = True df.to_csv(file_name_res_save, sep="\t", index=False) def main(question, title, top): db_dict = { "1": "yetianshi" } ''' 定义文件路径 ''' ''' 加载文件 ''' ''' 文本分割 ''' ''' 构建向量数据库 1. 正常匹配 2. 把文本使用大模型生成一个问题之后再进行匹配 ''' ''' 根据提问匹配上下文 ''' d = 1024 db_type_list = title.split(",") paper_list_str = "" for title_dan in db_type_list: embs = shengcehng_array([question]) index = faiss.IndexFlatIP(d) # buid the index # 查找向量 file_name_res_save = f"data_file_res/{title_dan}.csv" df = pd.read_csv(file_name_res_save, sep="\t", encoding="utf-8") df_ce = df[df["有效"] == True] print(df_ce.shape) data_np = [] for idx in df_ce.index: data_np.append(json.loads(df.loc[idx, "向量"])) vectors = np.array(data_np, dtype=object) # data_str = pd.read_csv(file_name_res_save, sep="\t", encoding="utf-8").values.tolist() index.add(vectors) D, I = index.search(embs, int(top)) print(I) reference_list = [] for i,j in zip(I[0], D[0]): reference_list.append([df_ce.loc[df_ce.index[i], "正文"], j]) for i,j in enumerate(reference_list): paper_list_str += "第{}篇\n{},此篇文章跟问题的相关度为{}%\n".format(str(i+1), j[0], j[1]) ''' 构造prompt ''' print("paper_list_str", paper_list_str) propmt_connect_ziliao_input = [] for i in db_type_list: propmt_connect_ziliao_input.append(propmt_connect_ziliao.format(i, paper_list_str)) propmt_connect_ziliao_input_str = ",".join(propmt_connect_ziliao_input) propmt_connect_input = propmt_connect.format(question, propmt_connect_ziliao_input_str) ''' 生成回答 ''' return model_generate_stream(propmt_connect_input) def classify(): # 调用模型,设置最大batch_size while True: if redis_.llen(db_key_query) == 0: # 若队列中没有元素就继续获取 time.sleep(3) continue query = redis_.lpop(db_key_query).decode('UTF-8') # 获取query的text data_dict = json.loads(query) if data_dict["state"] == "1": new_id = data_dict["id"] sentence = data_dict["sentence"] title = data_dict["title"] ulit_request_file(new_id, sentence, title) def add_dan_data(new_id, sentence, title): file_name_res_save = f"data_file_res/{title}.csv" # 初始化或读取CSV文件 df = pd.read_csv(file_name_res_save, sep="\t") if sentence in df["正文"].values: print("正文已存在,跳过处理") return False else: ulit_request_file(new_id, sentence, title) return True def updata_dan_data(new_id, sentence, title): file_name_res_save = f"data_file_res/{title}.csv" df = pd.read_csv(file_name_res_save, sep="\t") # 筛选需要处理的记录 propmt_connect = { "model": "gpt-4-turbo", "messages": [{ "role": "user", "content": f"{sentence}\n以上这条中可能包含了一些病情或者症状,请帮我归纳这条中所对应的病情或者症状是哪些,总结出来,不需要很长,简单归纳即可,直接输出症状或者病情,可以包含一些形容词来辅助描述,不需要有辅助词汇" }], "top_p": 0.9, "temperature": 0.6 } result = dialog_line_parse(propmt_connect) print(result) summary = result['choices'][0]['message']['content'] # 更新总结,正文字段 df.loc[df["ID"] == new_id, "总结"] = summary df.loc[df["ID"] == new_id, "正文"] = sentence a = shengcehng_array([summary]) df.loc[df["ID"] == new_id, "向量"] = json.dumps(a[0].tolist()) df.to_csv(file_name_res_save, sep="\t", index=False) def model_generate_stream(prompt): messages = [ {"role": "user", "content": prompt} ] stream = client.chat.completions.create(model=model, messages=messages, stream=True) printed_reasoning_content = False printed_content = False for chunk in stream: reasoning_content = None content = None # Check the content is reasoning_content or content if hasattr(chunk.choices[0].delta, "reasoning_content"): reasoning_content = chunk.choices[0].delta.reasoning_content elif hasattr(chunk.choices[0].delta, "content"): content = chunk.choices[0].delta.content if reasoning_content is not None: if not printed_reasoning_content: printed_reasoning_content = True print("reasoning_content:", end="", flush=True) print(reasoning_content, end="", flush=True) elif content is not None: if not printed_content: printed_content = True print("\ncontent:", end="", flush=True) # Extract and print the content # print(content, end="", flush=True) print(content) yield content @app.route("/upload_file", methods=["POST"]) def upload_file(): print(request.remote_addr) sentence = request.json['sentence'] title = request.json["title"] new_id = request.json["id"] state = request.json["state"] # 1: 批量新增 2:单条新增 3:单挑修改 4: 单条删除 ''' { "1": "csv", "2": "xlsx", "3": "txt", "4": "pdf" } ''' state_res = "" if state == "1": redis_.rpush(db_key_query, json.dumps({ "id": new_id, "sentence": sentence, "state": state, "title": title })) # 加入redis state_res = "上传完成,正在排队处理数据" elif state == "2": info_bool = add_dan_data(new_id, sentence, title) if info_bool == True: state_res = "上传完成" else: state_res = "上传失败,库中有重复数据" elif state == "3": updata_dan_data(new_id, sentence, title) state_res = "修改完成" elif state == "4": delete_data(title, new_id) state_res = "删除完成" return_json = { "code": 200, "info": state_res } return jsonify(return_json) # 返回结果 @app.route("/upload_file_check", methods=["POST"]) def upload_file_check(): print(request.remote_addr) new_id = request.json["id"] data_list = redis_.lrange(db_key_query, 0, -1) # 0 表示开始,-1 表示结束(全部) # 解析 JSON 数据 data_list_id_ = [] for item in data_list: data = json.loads(item.decode("utf-8")) # Redis 返回的是 bytes,需要 decode + json.loads data_list_id_.append(data["id"]) if new_id in data_list_id_: return_json = { "code": 200, "info": "上传中" } return jsonify(return_json) else: return_json = { "code": 200, "info": "已入库" } return jsonify(return_json) @app.route("/search", methods=["POST"]) def search(): print(request.remote_addr) texts = request.json["texts"] title = request.json["title"] top = request.json["top"] response = main(texts, title, top) return Response(response, mimetype='text/plain; charset=utf-8') # 返回结果 t = Thread(target=classify) t.start() if __name__ == "__main__": app.run(host="0.0.0.0", port=27000, threaded=True, debug=False)