From c751f371a79ee36f88508785f81eab65dcd1dcbf Mon Sep 17 00:00:00 2001 From: "majiahui@haimaqingfan.com" Date: Mon, 29 Sep 2025 14:16:49 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=B5=8B=E8=AF=95scokt?= =?UTF-8?q?=E8=AF=B7=E6=B1=82=EF=BC=8C=E5=B9=B6=E6=9B=B4=E6=94=B9=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=EF=BC=8C=E7=BA=BF=E4=B8=8A=E8=B7=91=E9=80=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 19 +++++ main.py | 251 +++++++++++++++++++++++++++++++++++----------------------- main_scokt.py | 80 ++++++++++++------- 3 files changed, 226 insertions(+), 124 deletions(-) diff --git a/README.md b/README.md index e69de29..ac056fb 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,19 @@ +### 知识库增删改查 +``` +python main.py +``` + +### 知识库流式问答socket +``` +python main_scokt.py +``` + +### 仿deepseek流式问答 socket (非 rag相关内容) +``` +python main_scoket_deepspeek.py +``` + +### 具体接口可以参考接口文档 + 接口文档.docx + https://console-docs.apipost.cn/preview/55b7d541588142d1/f4645422856c695a + https://console-docs.apipost.cn/preview/f03a79d844523711/2f4079d715d28b32 \ No newline at end of file diff --git a/main.py b/main.py index af2b5c7..5afd6d1 100644 --- a/main.py +++ b/main.py @@ -16,8 +16,10 @@ from flask_cors import CORS import pandas as pd import concurrent.futures import json +import torch +import uuid - +# flask配置 app = Flask(__name__) CORS(app) app.config["JSON_AS_ASCII"] = False @@ -30,10 +32,13 @@ client = OpenAI( base_url=openai_api_base, ) +# 模型配置 models = client.models.list() model = models.data[0].id # model = "1" model_encode = SentenceTransformer('/home/majiahui/project/models-llm/bge-large-zh-v1.5') + +# 提示配置 propmt_connect = '''我是一名中医,你是一个中医的医生的助理,我的患者有一个症状,症状如下: {} 根据这些症状,我通过查找资料,{} @@ -97,12 +102,23 @@ def dialog_line_parse(text): def shengcehng_array(data): + ''' + 模型生成向量 + :param data: + :return: + ''' embs = model_encode.encode(data, normalize_embeddings=True) return embs def Building_vector_database(title, df): + ''' + 次函数暂时弃用 + :param title: + :param df: + :return: + ''' # 加载需要处理的数据(有效且未向量化) to_process = df[(df["有效"] == True) & (df["已向量化"] == False)] @@ -143,23 +159,21 @@ def Building_vector_database(title, df): df.to_csv(f"data_file_res/{title}.csv", sep="\t", index=False) -def delete_data(title, data_id): +def delete_data(title, new_id): + ''' + 假删除,只是标记有效无效 + :param title: + :param new_id: + :return: + ''' + new_id = str(new_id) # 更新CSV标记 csv_path = f"data_file_res/{title}.csv" - df = pd.read_csv(csv_path, sep="\t", dtype={"ID": str}) - df.loc[df["ID"] == data_id, "有效"] = False + df = pd.read_csv(csv_path, sep="\t") + # df.loc[df["ID"] == new_id, "有效"] = False + df.loc[df['ID'] == new_id, "有效"] = False df.to_csv(csv_path, sep="\t", index=False) - - # 更新索引标记 - index_path = f"data_np/{title}_index.json" - if os.path.exists(index_path): - with open(index_path, "r+") as f: - index_data = json.load(f) - if data_id in index_data: - index_data[data_id]["valid"] = False - f.seek(0) - json.dump(index_data, f) - f.truncate() + return "删除完成" def check_file_exists(file_path): @@ -175,106 +189,141 @@ def check_file_exists(file_path): return os.path.isfile(file_path) -def ulit_request_file(new_id, sentence, title): +def ulit_request_file(sentence, title, zongjie): + ''' + 上传文件,生成固定内容,"ID", "正文", "总结", "有效", "向量" + :param sentence: + :param title: + :param zongjie: + :return: + ''' file_name_res_save = f"data_file_res/{title}.csv" - # 初始化或读取CSV文件 + # 初始化或读取CSV文件,如果存在文件,读取文件,并添加行, + # 如果不存在文件,新建DataFrame if os.path.exists(file_name_res_save): df = pd.read_csv(file_name_res_save, sep="\t") # 检查是否已存在相同正文 if sentence in df["正文"].values: - print("正文已存在,跳过处理") - return df + if zongjie == None: + return "正文已存在,跳过处理" + else: + result = df[df['正文'] == sentence] + id_ = result['ID'].values[0] + print(id_) + return ulit_request_file_zongjie(id_, sentence, zongjie, title) else: - df = pd.DataFrame(columns=["ID", "正文", "总结", "有效", "已向量化"]) + df = pd.DataFrame(columns=["ID", "正文", "总结", "有效", "向量"]) # 添加新数据(生成唯一ID) - new_row = { - "ID": str(new_id), - "正文": sentence, - "总结": None, - "有效": True, - "已向量化": False - } - df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) - - # 筛选需要处理的记录 - to_process = df[(df["总结"].isna()) & (df["有效"] == True)] + if zongjie == None: + id_ = str(uuid.uuid1()) + new_row = { + "ID": id_, + "正文": sentence, + "总结": None, + "有效": True, + "向量": None + } + df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) - # 调用API生成总结(示例保留原有逻辑) - data_list = [] - for _, row in to_process.iterrows(): - data_list.append({ + # 需要根据不同的项目修改提示,目的是精简内容,为了方便匹配 + data_dan = { "model": "gpt-4-turbo", "messages": [{ "role": "user", - "content": f"{row['正文']}\n以上这条中可能包含了一些病情或者症状,请帮我归纳这条中所对应的病情或者症状是哪些,总结出来,不需要很长,简单归纳即可,直接输出症状或者病情,可以包含一些形容词来辅助描述,不需要有辅助词汇" + "content": f"{sentence}\n以上这条中可能包含了一些病情或者症状,请帮我归纳这条中所对应的病情或者症状是哪些,总结出来,不需要很长,简单归纳即可,直接输出症状或者病情,可以包含一些形容词来辅助描述,不需要有辅助词汇" }], "top_p": 0.9, "temperature": 0.3 - }) + } + results = dialog_line_parse(data_dan) + summary = results['choices'][0]['message']['content'] - # 并发处理请求 - with concurrent.futures.ThreadPoolExecutor(200) as executor: - results = list(executor.map(dialog_line_parse, data_list)) + # 这是你的向量生成函数,来生成总结的词汇的向量 + new_vectors = shengcehng_array([summary]) + df.loc[df['ID'] == id_, '总结'] = summary + df.loc[df['ID'] == id_, '向量'] = str(new_vectors[0].tolist()) - # 更新总结字段 - for idx, result in zip(to_process.index, results): - summary = result['choices'][0]['message']['content'] - df.at[idx, "总结"] = summary + else: + id_ = str(uuid.uuid1()) + new_row = { + "ID": id_ , + "正文": sentence, + "总结": zongjie, + "有效": True, + "向量": None + } + df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) + new_vectors = shengcehng_array([zongjie]) # 假设这是你的向量生成函数 + df.loc[df['ID'] == id_, '总结'] = zongjie + df.loc[df['ID'] == id_, '向量'] = str(new_vectors[0].tolist()) # 保存更新后的CSV df.to_csv(file_name_res_save, sep="\t", index=False) - return df + return "上传完成" def ulit_request_file_zongjie(new_id, sentence, zongjie, title): + new_id = str(new_id) + print(new_id) + print(type(new_id)) file_name_res_save = f"data_file_res/{title}.csv" # 初始化或读取CSV文件 - if os.path.exists(file_name_res_save): - df = pd.read_csv(file_name_res_save, sep="\t") + + df = pd.read_csv(file_name_res_save, sep="\t") + df.loc[df['ID'] == new_id, '正文'] = sentence + if zongjie == None: + pass else: - df = pd.DataFrame(columns=["ID", "正文", "总结", "有效", "已向量化"]) - - # # 添加新数据(生成唯一ID) - # new_row = { - # "ID": str(new_id), - # "正文": sentence, - # "总结": zongjie, - # "有效": True, - # "已向量化": False - # } - # df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) + df.loc[df['ID'] == new_id, '总结'] = zongjie + new_vectors = shengcehng_array([zongjie]) # 假设这是你的向量生成函数 + df.loc[df['ID'] == new_id, '向量'] = str(new_vectors[0].tolist()) - # 筛选需要处理的记录 - to_process = df[df["有效"] == True] + # 保存更新后的CSV + df.to_csv(file_name_res_save, sep="\t", index=False) + return "修改完成" - # 调用API生成总结(示例保留原有逻辑) - data_list = [] - for _, row in to_process.iterrows(): - data_list.append({ - "model": "gpt-4-turbo", - "messages": [{ - "role": "user", - "content": f"{row['正文']}\n以上这条中可能包含了一些病情或者症状,请帮我归纳这条中所对应的病情或者症状是哪些,总结出来,不需要很长,简单归纳即可,直接输出症状或者病情,可以包含一些形容词来辅助描述,不需要有辅助词汇" - }], - "top_p": 0.9, - "temperature": 0.6 - }) - # 并发处理请求 - with concurrent.futures.ThreadPoolExecutor(200) as executor: - results = list(executor.map(dialog_line_parse, data_list)) +def ulit_request_file_check(title): + file_name_res_save = f"data_file_res/{title}.csv" - # 更新总结字段 - for idx, result in zip(to_process.index, results): - summary = result['choices'][0]['message']['content'] - df.at[idx, "总结"] = summary + # 初始化或读取CSV文件 - # 保存更新后的CSV - df.to_csv(file_name_res_save, sep="\t", index=False) - return df + # 初始化或读取CSV文件 + if os.path.exists(file_name_res_save): + df = pd.read_csv(file_name_res_save, sep="\t").values.tolist() + data_new = [] + for i in df: + if i[3] == True: + data_new.append([i[0], i[1], i[2]]) + return data_new + else: + return "无可展示文件" + + +def ulit_request_file_check_dan(new_id, title): + new_id = str(new_id) + file_name_res_save = f"data_file_res/{title}.csv" + + # 初始化或读取CSV文件 + + # 初始化或读取CSV文件 + if os.path.exists(file_name_res_save): + df = pd.read_csv(file_name_res_save, sep="\t") + zhengwen = df.loc[df['ID'] == new_id, '正文'].values + zongjie = df.loc[df['ID'] == new_id, '总结'].values + # 输出结果 + if len(zhengwen) > 0: + if df.loc[df['ID'] == new_id, '有效'].values == True: + return [new_id, zhengwen[0], zongjie[0]] + else: + return "未找到对应的ID" + else: + return "未找到对应的ID" + else: + return "无可展示文件" def main(question, title, top): @@ -311,22 +360,30 @@ def main(question, title, top): index = faiss.IndexFlatIP(d) # buid the index # 查找向量 - vector_path = f"data_np/{title_dan}.npy" - vectors = np.load(vector_path) + # vector_path = f"data_np/{title_dan}.npy" + # vectors = np.load(vector_path) data_str = pd.read_csv(f"data_file_res/{title_dan}.csv", sep="\t", encoding="utf-8").values.tolist() + + data_str_valid = [] + for i in data_str: + if i[3] == True: + data_str_valid.append(i) + + data_str_vectors_list = [] + for i in data_str_valid: + data_str_vectors_list.append(eval(i[-1])) + vectors = np.array(data_str_vectors_list) index.add(vectors) D, I = index.search(embs, int(top)) print(I) reference_list = [] for i,j in zip(I[0], D[0]): - reference_list.append([data_str[i], j]) + reference_list.append([data_str_valid[i], j]) for i,j in enumerate(reference_list): paper_list_str += "第{}篇\n{},此篇文章跟问题的相关度为{}%\n".format(str(i+1), j[0][1], j[1]) - - ''' 构造prompt ''' @@ -374,7 +431,7 @@ def model_generate_stream(prompt): print("\ncontent:", end="", flush=True) # Extract and print the content # print(content, end="", flush=True) - print(content) + print(content, end="") yield content @@ -397,24 +454,24 @@ def upload_file_check(): # 增 state_res = "" if state == "1": - df = ulit_request_file(new_id, sentence, title) - Building_vector_database(title, df) - state_res = "上传完成" + state_res = ulit_request_file(sentence, title, zongjie) + # 删 elif state == "2": - delete_data(title, new_id) - state_res = "删除完成" + state_res = delete_data(title, new_id) # 改 elif state == "3": - df = ulit_request_file(new_id, sentence, title) - Building_vector_database(title, df) - state_res = "修改完成" + state_res = ulit_request_file_zongjie(new_id, sentence, zongjie,title) # 查 elif state == "4": - df = ulit_request_file(new_id, sentence, title) + state_res = ulit_request_file_check(title) + + # 通过uuid查单条数据 + elif state == "5": + ulit_request_file_check_dan(new_id, title) state_res = "" return_json = { diff --git a/main_scokt.py b/main_scokt.py index f3756a5..1ac0300 100644 --- a/main_scokt.py +++ b/main_scokt.py @@ -44,7 +44,7 @@ client = OpenAI( models = client.models.list() model = models.data[0].id # model = "1" -model_encode = SentenceTransformer('/home/majiahui/project/models-llm/bge-large-zh-v1.5') +model_encode = SentenceTransformer('/home/zhangbaoxun/project/models-llm/bge-large-zh-v1.5') propmt_connect = '''我是一名中医,你是一个中医的医生的助理,我的患者有一个症状,症状如下: {} 根据这些症状,我通过查找资料,{} @@ -113,6 +113,13 @@ def shengcehng_array(data): def main(question, title, top): + ''' + 主函数,用来匹配句子放到prompt中,生成回答 + :param question: + :param title: + :param top: + :return: + ''' db_dict = { "1": "yetianshi" } @@ -149,23 +156,36 @@ def main(question, title, top): # vector_path = f"data_np/{title_dan}.npy" # vectors = np.load(vector_path) + # 读取向量文件 csv文件结构: + # ID + # 正文 + # 总结 + # 有效 + # 向量 data_str = pd.read_csv(f"data_file_res/{title_dan}.csv", sep="\t", encoding="utf-8").values.tolist() data_str_valid = [] for i in data_str: + # i[3] == True 说明数据没有被删除,如果是false说明被删除 if i[3] == True: data_str_valid.append(i) + # 把有效数据的向量汇总出来 data_str_vectors_list = [] for i in data_str_valid: data_str_vectors_list.append(eval(i[-1])) + + # 拼接成向量矩阵 vectors = np.array(data_str_vectors_list) index.add(vectors) + + # 使用faiss找到最相似向量 D, I = index.search(embs, int(top)) print(I) reference_list = [] for i,j in zip(I[0], D[0]): + # 添加 csv对应的数据 data_str_valid[i]表示 csv中一行的所有数据 ID 正文 总结 有效 向量 以及 j表示相关度是多少 reference_list.append([data_str_valid[i], j]) for i,j in enumerate(reference_list): @@ -178,10 +198,12 @@ def main(question, title, top): for i in db_type_list: propmt_connect_ziliao_input.append(propmt_connect_ziliao.format(i, paper_list_str)) + # 构造输入问题,把上面的都展示出来 propmt_connect_ziliao_input_str = ",".join(propmt_connect_ziliao_input) propmt_connect_input = propmt_connect.format(question, propmt_connect_ziliao_input_str) + ''' - 生成回答 + 生成回答,这个model_generate_stream可以根据需要指定模型 ''' return model_generate_stream(propmt_connect_input) @@ -259,38 +281,40 @@ async def handle_websocket(websocket): async def main_api(): try: + # 是否加载 ssl_context = None - + # wss服务开关 True是打开wss服务 + wss_bool = False # 检查证书文件是否存在 - ssl_cert = "yitongtang66.com.crt" + ssl_cert = "yitongtang66.com_ca_chains.crt" ssl_key = "yitongtang66.com.key" # ssl_cert = "yizherenxin.cn.crt" # ssl_key = "yizherenxin.cn.key" - ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) - ssl_context.load_cert_chain(ssl_cert, ssl_key) - ssl_context.check_hostname = False # 必须禁用主机名验证 - ssl_context.verify_mode = ssl.CERT_NONE # 不验证证书 - - # if os.path.exists(ssl_cert) and os.path.exists(ssl_key): - # try: - # # 创建SSL上下文 - # ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) - # # 加载证书链 - # ssl_context.load_cert_chain(ssl_cert, ssl_key) - # # 禁用主机名验证(对于自签名证书) - # ssl_context.check_hostname = False - # ssl_context.verify_mode = ssl.CERT_NONE - # print("SSL证书已加载,使用WSS协议") - # except Exception as e: - # print(f"SSL证书加载失败: {e}") - # print("将使用WS协议") - # ssl_context = None - # else: - # print("警告:SSL证书文件未找到,将使用WS协议") - # ssl_context = None + # ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + # ssl_context.load_cert_chain(ssl_cert, ssl_key) + # ssl_context.check_hostname = False # 必须禁用主机名验证 + # ssl_context.verify_mode = ssl.CERT_NONE # 不验证证书 + + if wss_bool == True: + try: + # 创建SSL上下文 + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + # 加载证书链 + ssl_context.load_cert_chain(ssl_cert, ssl_key) + # 禁用主机名验证(对于自签名证书) + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + print("SSL证书已加载,使用WSS协议") + except Exception as e: + print(f"SSL证书加载失败: {e}") + print("将使用WS协议") + ssl_context = None + else: + print("警告:SSL证书文件未找到,将使用WS协议") + ssl_context = None # 创建服务器 server = await websockets.serve( @@ -303,6 +327,7 @@ async def main_api(): close_timeout=30 # 添加关闭超时 ) + # 启动27001端口 if ssl_context: print("WSS服务器已启动: wss://0.0.0.0:27001") else: @@ -323,9 +348,10 @@ if __name__ == "__main__": logging.basicConfig(level=logging.INFO) + # 启动服务 try: asyncio.run(main_api()) except KeyboardInterrupt: print("服务器被用户中断") except Exception as e: - print(f"服务器运行错误: {e}") \ No newline at end of file + print(f"服务器运行错误: {e}")