Compare commits

...

5 Commits

Author SHA1 Message Date
majiahui@haimaqingfan.com a83c265f4e 增加测试scokt请求 1 month ago
majiahui@haimaqingfan.com 9c1cc4c768 增加scokt请求 1 month ago
majiahui@haimaqingfan.com 90af48046f 向量融合到csv中,并支持增删改查操作 1 month ago
majiahui@haimaqingfan.com 1e93757254 修复批量上传出错问题,并发承载力更强 1 month ago
majiahui@haimaqingfan.com 823923c927 单条数据上传 1 month ago
  1. 85
      ceshi_scokt.py
  2. 429
      main.py
  3. 241
      main_scokt.py

85
ceshi_scokt.py

@ -0,0 +1,85 @@
from openai import OpenAI
openai_api_key = "token-abc123"
openai_api_base = "http://127.0.0.1:12011/v1"
client = OpenAI(
api_key=openai_api_key,
base_url=openai_api_base,
)
models = client.models.list()
model = models.data[0].id
def model_generate_stream(prompt):
messages = [
{"role": "user", "content": prompt}
]
stream = client.chat.completions.create(model=model,
messages=messages,
stream=True)
printed_reasoning_content = False
printed_content = False
for chunk in stream:
reasoning_content = None
content = None
# Check the content is reasoning_content or content
if hasattr(chunk.choices[0].delta, "reasoning_content"):
reasoning_content = chunk.choices[0].delta.reasoning_content
elif hasattr(chunk.choices[0].delta, "content"):
content = chunk.choices[0].delta.content
if reasoning_content is not None:
if not printed_reasoning_content:
printed_reasoning_content = True
print("reasoning_content:", end="", flush=True)
print(reasoning_content, end="", flush=True)
elif content is not None:
if not printed_content:
printed_content = True
print("\ncontent:", end="", flush=True)
# Extract and print the content
# print(content, end="", flush=True)
print(content)
yield content
# if __name__ == '__main__':
# for i in model_generate_stream("你好"):
# print(i)
import asyncio
import websockets
import json
async def handle_websocket(websocket):
print("客户端已连接")
try:
while True:
message = await websocket.recv()
print("收到消息:", message)
data = json.loads(message)
texts = data.get("texts")
title = data.get("title")
top = data.get("top")
response = model_generate_stream(texts)
# response = message + "111"
for char in response:
await websocket.send(char)
# await asyncio.sleep(0.3)
await websocket.send("[DONE]")
except websockets.exceptions.ConnectionClosed:
print("客户端断开连接")
async def main():
async with websockets.serve(handle_websocket, "0.0.0.0", 5500):
print("WebSocket 服务器已启动,监听端口 5500")
await asyncio.Future() # 永久运行
if __name__ == "__main__":
asyncio.run(main()) # 正确启动事件循环

429
main.py

@ -2,21 +2,45 @@
# 按 Shift+F10 执行或将其替换为您的代码。
# 按 双击 Shift 在所有地方搜索类、文件、工具窗口、操作和设置。
import os
import faiss
import numpy as np
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
import requests
import time
from flask import Flask, jsonify
from flask import request
from flask import Flask, jsonify, Response, request
from openai import OpenAI
from flask_cors import CORS
import pandas as pd
import concurrent.futures
import json
from threading import Thread
import redis
app = Flask(__name__)
CORS(app)
app.config["JSON_AS_ASCII"] = False
model = SentenceTransformer('/home/majiahui/project/models-llm/bge-large-zh-v1.5')
pool = redis.ConnectionPool(host='localhost', port=63179, max_connections=100, db=1, password="zhicheng123*")
redis_ = redis.Redis(connection_pool=pool, decode_responses=True)
db_key_query = 'query'
db_key_querying = 'querying'
batch_size = 32
openai_api_key = "token-abc123"
openai_api_base = "http://127.0.0.1:12011/v1"
client = OpenAI(
api_key=openai_api_key,
base_url=openai_api_base,
)
models = client.models.list()
model = models.data[0].id
# model = "1"
model_encode = SentenceTransformer('/home/majiahui/project/models-llm/bge-large-zh-v1.5')
propmt_connect = '''我是一名中医,你是一个中医的医生的助理,我的患者有一个症状,症状如下:
{}
根据这些症状我通过查找资料{}
@ -26,7 +50,7 @@ propmt_connect_ziliao = '''在“{}”资料中,有如下相关内容:
{}'''
def dialog_line_parse(url, text):
def dialog_line_parse(text):
"""
将数据输入模型进行分析并输出结果
:param url: 模型url
@ -34,8 +58,9 @@ def dialog_line_parse(url, text):
:return: 模型返回结果
"""
url_predict = "http://118.178.228.101:12004/predict"
response = requests.post(
url,
url_predict,
json=text,
timeout=100000
)
@ -49,46 +74,166 @@ def dialog_line_parse(url, text):
# )
print("{}】 Failed to get a proper response from remote "
"server. Status Code: {}. Response: {}"
"".format(url, response.status_code, response.text))
"".format(url_predict, response.status_code, response.text))
return {}
# ['choices'][0]['message']['content']
#
# text = text['messages'][0]['content']
# return_text = {
# 'code': 200,
# 'id': "1",
# 'object': 0,
# 'created': 0,
# 'model': 0,
# 'choices': [
# {
# 'index': 0,
# 'message': {
# 'role': 'assistant',
# 'content': text
# },
# 'logprobs': None,
# 'finish_reason': 'stop'
# }
# ],
# 'usage': 0,
# 'system_fingerprint': 0
# }
# return return_text
def shengcehng_array(data):
embs = model.encode(data, normalize_embeddings=True)
embs = model_encode.encode(data, normalize_embeddings=True)
return embs
def Building_vector_database(type, name, df):
data_ndarray = np.empty((0, 1024))
for sen in df:
data_ndarray = np.concatenate((data_ndarray, shengcehng_array([sen[0]])))
print("data_ndarray.shape", data_ndarray.shape)
print("data_ndarray.shape", data_ndarray.shape)
np.save(f'data_np/{name}.npy', data_ndarray)
def Building_vector_database(title, df):
# 加载需要处理的数据(有效且未向量化)
to_process = df[(df["有效"] == True) & (df["已向量化"] == False)]
if len(to_process) == 0:
print("无新增数据需要向量化")
return
# 生成向量数组
new_vectors = shengcehng_array(to_process["总结"].tolist()) # 假设这是你的向量生成函数
# 加载现有向量库和索引
vector_path = f"data_np/{title}.npy"
index_path = f"data_np/{title}_index.json"
vectors = np.load(vector_path) if os.path.exists(vector_path) else np.empty((0, 1024))
index_data = {}
if os.path.exists(index_path):
with open(index_path, "r") as f:
index_data = json.load(f)
# 更新索引和向量库
start_idx = len(vectors)
vectors = np.vstack([vectors, new_vectors])
for i, (_, row) in enumerate(to_process.iterrows()):
index_data[row["ID"]] = {
"row": start_idx + i,
"valid": True
}
# 保存数据
np.save(vector_path, vectors)
with open(index_path, "w") as f:
json.dump(index_data, f)
# 标记已向量化
df.loc[to_process.index, "已向量化"] = True
df.to_csv(f"data_file_res/{title}.csv", sep="\t", index=False)
def ulit_request_file(file, title):
file_name = file.filename
file_name_save = "data_file/{}.csv".format(title)
file.save(file_name_save)
def delete_data(title, data_id):
# 更新CSV标记
csv_path = f"data_file_res/{title}.csv"
df = pd.read_csv(csv_path, sep="\t", dtype={"ID": str})
df.loc[df["ID"] == data_id, "有效"] = False
df.to_csv(csv_path, sep="\t", index=False)
# try:
# with open(file_name_save, encoding="gbk") as f:
# content = f.read()
# except:
# with open(file_name_save, encoding="utf-8") as f:
# content = f.read()
# elif file_name.split(".")[-1] == "docx":
# content = docx2txt.process(file_name_save)
# content_list = [i for i in content.split("\n")]
df = pd.read_csv(file_name_save, sep="\t", encoding="utf-8").values.tolist()
def check_file_exists(file_path):
"""
检查文件是否存在
参数:
file_path (str): 要检查的文件路径
返回:
bool: 文件存在返回True否则返回False
"""
return os.path.isfile(file_path)
return df
def ulit_request_file(new_id, sentence, title):
file_name_res_save = f"data_file_res/{title}.csv"
def main(question, db_type, top):
# 初始化或读取CSV文件
if os.path.exists(file_name_res_save):
df = pd.read_csv(file_name_res_save, sep="\t")
# # 检查是否已存在相同正文
# if sentence in df["正文"].values:
# print("正文已存在,跳过处理")
# return df
else:
df = pd.DataFrame(columns=["ID", "正文", "总结", "有效", "已向量化", "向量"])
new_row = {
"ID": new_id,
"正文": sentence,
"总结": None,
"有效": True,
"已向量化": False,
"向量": None,
}
df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
# 筛选需要处理的记录
to_process = df[(df["总结"].isna()) & (df["有效"] == True)]
# 调用API生成总结(示例保留原有逻辑)
data_list = []
for _, row in to_process.iterrows():
data_list.append({
"model": "gpt-4-turbo",
"messages": [{
"role": "user",
"content": f"{row['正文']}\n以上这条中可能包含了一些病情或者症状,请帮我归纳这条中所对应的病情或者症状是哪些,总结出来,不需要很长,简单归纳即可,直接输出症状或者病情,可以包含一些形容词来辅助描述,不需要有辅助词汇"
}],
"top_p": 0.9,
"temperature": 0.6
})
# 并发处理请求
with concurrent.futures.ThreadPoolExecutor(200) as executor:
results = list(executor.map(dialog_line_parse, data_list))
# 更新总结字段
for idx, result in zip(to_process.index, results):
summary = result['choices'][0]['message']['content']
df.at[idx, "总结"] = summary
# df.loc[df.index[2], "总结"] = None
# df.loc[df.index[3], "总结"] = None
# df.loc[df.index[4], "总结"] = None
# df.loc[df.index[5], "总结"] = None
df_ce = df[(df["有效"] == True) & (df["总结"].notnull())]
for idx in df_ce.index:
a = shengcehng_array([df_ce.at[idx, "总结"]])
df.at[idx, "向量"] = json.dumps(a[0].tolist())
df.at[idx, "已向量化"] = True
df.to_csv(file_name_res_save, sep="\t", index=False)
def main(question, title, top):
db_dict = {
"1": "yetianshi"
}
@ -114,25 +259,37 @@ def main(question, db_type, top):
根据提问匹配上下文
'''
d = 1024
db_type_list = db_type.split(",")
db_type_list = title.split(",")
paper_list_str = ""
for i in db_type_list:
for title_dan in db_type_list:
embs = shengcehng_array([question])
index = faiss.IndexFlatIP(d) # buid the index
data_np = np.load(f"data_np/{i}.npy")
# data_str = open(f"data_file/{i}.txt").read().split("\n")
data_str = pd.read_csv(f"data_file/{i}.csv", sep="\t", encoding="utf-8").values.tolist()
index.add(data_np)
# 查找向量
file_name_res_save = f"data_file_res/{title_dan}.csv"
df = pd.read_csv(file_name_res_save, sep="\t", encoding="utf-8")
df_ce = df[df["有效"] == True]
print(df_ce.shape)
data_np = []
for idx in df_ce.index:
data_np.append(json.loads(df.loc[idx, "向量"]))
vectors = np.array(data_np, dtype=object)
# data_str = pd.read_csv(file_name_res_save, sep="\t", encoding="utf-8").values.tolist()
index.add(vectors)
D, I = index.search(embs, int(top))
print(I)
reference_list = []
for i,j in zip(I[0], D[0]):
reference_list.append([data_str[i], j])
reference_list.append([df_ce.loc[df_ce.index[i], "正文"], j])
for i,j in enumerate(reference_list):
paper_list_str += "{}\n{},此篇文章的转发数为{},评论数为{},点赞数为{}\n,此篇文章跟问题的相关度为{}%\n".format(str(i+1), j[0][0], j[0][1], j[0][2], j[0][3], j[1])
paper_list_str += "{}\n{},此篇文章跟问题的相关度为{}%\n".format(str(i+1), j[0], j[1])
'''
构造prompt
@ -147,47 +304,140 @@ def main(question, db_type, top):
'''
生成回答
'''
url_predict = "http://192.168.31.74:26000/predict"
url_search = "http://192.168.31.74:26000/search"
return model_generate_stream(propmt_connect_input)
# data = {
# "content": propmt_connect_input
# }
data = {
"content": propmt_connect_input,
"model": "qwq-32",
"top_p": 0.9,
"temperature": 0.6
}
def classify(): # 调用模型,设置最大batch_size
while True:
if redis_.llen(db_key_query) == 0: # 若队列中没有元素就继续获取
time.sleep(3)
continue
query = redis_.lpop(db_key_query).decode('UTF-8') # 获取query的text
data_dict = json.loads(query)
if data_dict["state"] == "1":
new_id = data_dict["id"]
sentence = data_dict["sentence"]
title = data_dict["title"]
ulit_request_file(new_id, sentence, title)
res = dialog_line_parse(url_predict, data)
id_ = res["texts"]["id"]
data = {
"id": id_
}
def add_dan_data(new_id, sentence, title):
file_name_res_save = f"data_file_res/{title}.csv"
while True:
res = dialog_line_parse(url_search, data)
if res["code"] == 200:
break
else:
time.sleep(1)
spilt_str = "</think>"
think, response = str(res["text"]).split(spilt_str)
return think, response
# 初始化或读取CSV文件
df = pd.read_csv(file_name_res_save, sep="\t")
if sentence in df["正文"].values:
print("正文已存在,跳过处理")
return False
else:
ulit_request_file(new_id, sentence, title)
return True
def updata_dan_data(new_id, sentence, title):
file_name_res_save = f"data_file_res/{title}.csv"
df = pd.read_csv(file_name_res_save, sep="\t")
# 筛选需要处理的记录
propmt_connect = {
"model": "gpt-4-turbo",
"messages": [{
"role": "user",
"content": f"{sentence}\n以上这条中可能包含了一些病情或者症状,请帮我归纳这条中所对应的病情或者症状是哪些,总结出来,不需要很长,简单归纳即可,直接输出症状或者病情,可以包含一些形容词来辅助描述,不需要有辅助词汇"
}],
"top_p": 0.9,
"temperature": 0.6
}
result = dialog_line_parse(propmt_connect)
print(result)
summary = result['choices'][0]['message']['content']
# 更新总结,正文字段
df.loc[df["ID"] == new_id, "总结"] = summary
df.loc[df["ID"] == new_id, "正文"] = sentence
a = shengcehng_array([summary])
df.loc[df["ID"] == new_id, "向量"] = json.dumps(a[0].tolist())
df.to_csv(file_name_res_save, sep="\t", index=False)
def model_generate_stream(prompt):
messages = [
{"role": "user", "content": prompt}
]
stream = client.chat.completions.create(model=model,
messages=messages,
stream=True)
printed_reasoning_content = False
printed_content = False
for chunk in stream:
reasoning_content = None
content = None
# Check the content is reasoning_content or content
if hasattr(chunk.choices[0].delta, "reasoning_content"):
reasoning_content = chunk.choices[0].delta.reasoning_content
elif hasattr(chunk.choices[0].delta, "content"):
content = chunk.choices[0].delta.content
if reasoning_content is not None:
if not printed_reasoning_content:
printed_reasoning_content = True
print("reasoning_content:", end="", flush=True)
print(reasoning_content, end="", flush=True)
elif content is not None:
if not printed_content:
printed_content = True
print("\ncontent:", end="", flush=True)
# Extract and print the content
# print(content, end="", flush=True)
print(content)
yield content
@app.route("/upload_file", methods=["POST"])
def upload_file():
print(request.remote_addr)
file = request.files.get('file')
title = request.form.get("title")
df = ulit_request_file(file, title)
Building_vector_database("1", title, df)
sentence = request.json['sentence']
title = request.json["title"]
new_id = request.json["id"]
state = request.json["state"] # 1: 批量新增 2:单条新增 3:单挑修改 4: 单条删除
'''
{
"1": "csv",
"2": "xlsx",
"3": "txt",
"4": "pdf"
}
'''
state_res = ""
if state == "1":
redis_.rpush(db_key_query, json.dumps({
"id": new_id,
"sentence": sentence,
"state": state,
"title": title
})) # 加入redis
state_res = "上传完成,正在排队处理数据"
elif state == "2":
info_bool = add_dan_data(new_id, sentence, title)
if info_bool == True:
state_res = "上传完成"
else:
state_res = "上传失败,库中有重复数据"
elif state == "3":
updata_dan_data(new_id, sentence, title)
state_res = "修改完成"
elif state == "4":
delete_data(title, new_id)
state_res = "删除完成"
return_json = {
"code": 200,
"info": "上传完成"
"info": state_res
}
return jsonify(return_json) # 返回结果
@ -195,31 +445,38 @@ def upload_file():
@app.route("/upload_file_check", methods=["POST"])
def upload_file_check():
print(request.remote_addr)
file = request.files.get('file')
title = request.form.get("title")
df = ulit_request_file(file, title)
Building_vector_database("1", title, df)
return_json = {
"code": 200,
"info": "上传完成"
}
return jsonify(return_json) # 返回结果
new_id = request.json["id"]
data_list = redis_.lrange(db_key_query, 0, -1) # 0 表示开始,-1 表示结束(全部)
# 解析 JSON 数据
data_list_id_ = []
for item in data_list:
data = json.loads(item.decode("utf-8")) # Redis 返回的是 bytes,需要 decode + json.loads
data_list_id_.append(data["id"])
if new_id in data_list_id_:
return_json = {
"code": 200,
"info": "上传中"
}
return jsonify(return_json)
else:
return_json = {
"code": 200,
"info": "已入库"
}
return jsonify(return_json)
@app.route("/search", methods=["POST"])
def search():
print(request.remote_addr)
texts = request.json["texts"]
text_type = request.json["text_type"]
title = request.json["title"]
top = request.json["top"]
think, response = main(texts, text_type, top)
return_json = {
"code": 200,
"think": think,
"response": response
}
return jsonify(return_json) # 返回结果
response = main(texts, title, top)
return Response(response, mimetype='text/plain; charset=utf-8') # 返回结果
t = Thread(target=classify)
t.start()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=27000, threaded=True, debug=False)

241
main_scokt.py

@ -0,0 +1,241 @@
# 这是一个示例 Python 脚本。
# 按 Shift+F10 执行或将其替换为您的代码。
# 按 双击 Shift 在所有地方搜索类、文件、工具窗口、操作和设置。
import os
import faiss
import numpy as np
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
import requests
import time
from flask import Flask, jsonify, Response, request
from openai import OpenAI
from flask_cors import CORS
import pandas as pd
import concurrent.futures
import json
from threading import Thread
import redis
import asyncio
import websockets
app = Flask(__name__)
CORS(app)
app.config["JSON_AS_ASCII"] = False
pool = redis.ConnectionPool(host='localhost', port=63179, max_connections=100, db=1, password="zhicheng123*")
redis_ = redis.Redis(connection_pool=pool, decode_responses=True)
db_key_query = 'query'
db_key_querying = 'querying'
batch_size = 32
openai_api_key = "token-abc123"
openai_api_base = "http://127.0.0.1:12011/v1"
client = OpenAI(
api_key=openai_api_key,
base_url=openai_api_base,
)
models = client.models.list()
model = models.data[0].id
# model = "1"
model_encode = SentenceTransformer('/home/majiahui/project/models-llm/bge-large-zh-v1.5')
propmt_connect = '''我是一名中医,你是一个中医的医生的助理,我的患者有一个症状,症状如下:
{}
根据这些症状我通过查找资料{}
请根据上面的这些资料和方子并根据每篇文章的转发数确定文章的重要程度转发数越高的文章最终答案的参考度越高反之越低根据患者的症状和上面的文章的资料的重要程度以及文章和症状的匹配程度帮我开出正确的药方和治疗方案'''
propmt_connect_ziliao = '''在“{}”资料中,有如下相关内容:
{}'''
def dialog_line_parse(text):
"""
将数据输入模型进行分析并输出结果
:param url: 模型url
:param text: 进入模型的数据
:return: 模型返回结果
"""
url_predict = "http://118.178.228.101:12004/predict"
response = requests.post(
url_predict,
json=text,
timeout=100000
)
if response.status_code == 200:
return response.json()
else:
# logger.error(
# "【{}】 Failed to get a proper response from remote "
# "server. Status Code: {}. Response: {}"
# "".format(url, response.status_code, response.text)
# )
print("{}】 Failed to get a proper response from remote "
"server. Status Code: {}. Response: {}"
"".format(url_predict, response.status_code, response.text))
return {}
# ['choices'][0]['message']['content']
#
# text = text['messages'][0]['content']
# return_text = {
# 'code': 200,
# 'id': "1",
# 'object': 0,
# 'created': 0,
# 'model': 0,
# 'choices': [
# {
# 'index': 0,
# 'message': {
# 'role': 'assistant',
# 'content': text
# },
# 'logprobs': None,
# 'finish_reason': 'stop'
# }
# ],
# 'usage': 0,
# 'system_fingerprint': 0
# }
# return return_text
def shengcehng_array(data):
embs = model_encode.encode(data, normalize_embeddings=True)
return embs
def main(question, title, top):
db_dict = {
"1": "yetianshi"
}
'''
定义文件路径
'''
'''
加载文件
'''
'''
文本分割
'''
'''
构建向量数据库
1. 正常匹配
2. 把文本使用大模型生成一个问题之后再进行匹配
'''
'''
根据提问匹配上下文
'''
d = 1024
db_type_list = title.split(",")
paper_list_str = ""
for title_dan in db_type_list:
embs = shengcehng_array([question])
index = faiss.IndexFlatIP(d) # buid the index
# 查找向量
vector_path = f"data_np/{title_dan}.npy"
vectors = np.load(vector_path)
data_str = pd.read_csv(f"data_file/{title_dan}.csv", sep="\t", encoding="utf-8").values.tolist()
index.add(vectors)
D, I = index.search(embs, int(top))
print(I)
reference_list = []
for i, j in zip(I[0], D[0]):
reference_list.append([data_str[i], j])
for i, j in enumerate(reference_list):
paper_list_str += "{}\n{},此篇文章跟问题的相关度为{}%\n".format(str(i + 1), j[0][1], j[1])
'''
构造prompt
'''
print("paper_list_str", paper_list_str)
propmt_connect_ziliao_input = []
for i in db_type_list:
propmt_connect_ziliao_input.append(propmt_connect_ziliao.format(i, paper_list_str))
propmt_connect_ziliao_input_str = "".join(propmt_connect_ziliao_input)
propmt_connect_input = propmt_connect.format(question, propmt_connect_ziliao_input_str)
'''
生成回答
'''
return model_generate_stream(propmt_connect_input)
def model_generate_stream(prompt):
messages = [
{"role": "user", "content": prompt}
]
stream = client.chat.completions.create(model=model,
messages=messages,
stream=True)
printed_reasoning_content = False
printed_content = False
for chunk in stream:
reasoning_content = None
content = None
# Check the content is reasoning_content or content
if hasattr(chunk.choices[0].delta, "reasoning_content"):
reasoning_content = chunk.choices[0].delta.reasoning_content
elif hasattr(chunk.choices[0].delta, "content"):
content = chunk.choices[0].delta.content
if reasoning_content is not None:
if not printed_reasoning_content:
printed_reasoning_content = True
print("reasoning_content:", end="", flush=True)
print(reasoning_content, end="", flush=True)
elif content is not None:
if not printed_content:
printed_content = True
print("\ncontent:", end="", flush=True)
# Extract and print the content
# print(content, end="", flush=True)
print(content)
yield content
async def handle_websocket(websocket):
print("客户端已连接")
try:
while True:
message = await websocket.recv()
data = json.loads(message)
texts = data.get("texts")
title = data.get("title")
top = data.get("top")
print("收到消息:", message)
response = main(texts, title, top)
# response = message + "111"
for char in response:
await websocket.send(char)
# await asyncio.sleep(0.3)
await websocket.send("[DONE]")
except websockets.exceptions.ConnectionClosed:
print("客户端断开连接")
async def main_api():
async with websockets.serve(handle_websocket, "0.0.0.0", 27001):
print("WebSocket 服务器已启动,监听端口 27001")
await asyncio.Future() # 永久运行
if __name__ == "__main__":
asyncio.run(main_api()) # 正确启动事件循环
Loading…
Cancel
Save