Browse Source

增加测试scokt请求,并更改流程,线上跑通

pull/1/head
majiahui@haimaqingfan.com 2 weeks ago
parent
commit
c751f371a7
  1. 19
      README.md
  2. 237
      main.py
  3. 74
      main_scokt.py

19
README.md

@ -0,0 +1,19 @@
### 知识库增删改查
```
python main.py
```
### 知识库流式问答socket
```
python main_scokt.py
```
### 仿deepseek流式问答 socket (非 rag相关内容)
```
python main_scoket_deepspeek.py
```
### 具体接口可以参考接口文档
接口文档.docx
https://console-docs.apipost.cn/preview/55b7d541588142d1/f4645422856c695a
https://console-docs.apipost.cn/preview/f03a79d844523711/2f4079d715d28b32

237
main.py

@ -16,8 +16,10 @@ from flask_cors import CORS
import pandas as pd
import concurrent.futures
import json
import torch
import uuid
# flask配置
app = Flask(__name__)
CORS(app)
app.config["JSON_AS_ASCII"] = False
@ -30,10 +32,13 @@ client = OpenAI(
base_url=openai_api_base,
)
# 模型配置
models = client.models.list()
model = models.data[0].id
# model = "1"
model_encode = SentenceTransformer('/home/majiahui/project/models-llm/bge-large-zh-v1.5')
# 提示配置
propmt_connect = '''我是一名中医,你是一个中医的医生的助理,我的患者有一个症状,症状如下:
{}
根据这些症状我通过查找资料{}
@ -97,12 +102,23 @@ def dialog_line_parse(text):
def shengcehng_array(data):
'''
模型生成向量
:param data:
:return:
'''
embs = model_encode.encode(data, normalize_embeddings=True)
return embs
def Building_vector_database(title, df):
'''
次函数暂时弃用
:param title:
:param df:
:return:
'''
# 加载需要处理的数据(有效且未向量化)
to_process = df[(df["有效"] == True) & (df["已向量化"] == False)]
@ -143,23 +159,21 @@ def Building_vector_database(title, df):
df.to_csv(f"data_file_res/{title}.csv", sep="\t", index=False)
def delete_data(title, data_id):
def delete_data(title, new_id):
'''
假删除只是标记有效无效
:param title:
:param new_id:
:return:
'''
new_id = str(new_id)
# 更新CSV标记
csv_path = f"data_file_res/{title}.csv"
df = pd.read_csv(csv_path, sep="\t", dtype={"ID": str})
df.loc[df["ID"] == data_id, "有效"] = False
df = pd.read_csv(csv_path, sep="\t")
# df.loc[df["ID"] == new_id, "有效"] = False
df.loc[df['ID'] == new_id, "有效"] = False
df.to_csv(csv_path, sep="\t", index=False)
# 更新索引标记
index_path = f"data_np/{title}_index.json"
if os.path.exists(index_path):
with open(index_path, "r+") as f:
index_data = json.load(f)
if data_id in index_data:
index_data[data_id]["valid"] = False
f.seek(0)
json.dump(index_data, f)
f.truncate()
return "删除完成"
def check_file_exists(file_path):
@ -175,106 +189,141 @@ def check_file_exists(file_path):
return os.path.isfile(file_path)
def ulit_request_file(new_id, sentence, title):
def ulit_request_file(sentence, title, zongjie):
'''
上传文件生成固定内容"ID", "正文", "总结", "有效", "向量"
:param sentence:
:param title:
:param zongjie:
:return:
'''
file_name_res_save = f"data_file_res/{title}.csv"
# 初始化或读取CSV文件
# 初始化或读取CSV文件,如果存在文件,读取文件,并添加行,
# 如果不存在文件,新建DataFrame
if os.path.exists(file_name_res_save):
df = pd.read_csv(file_name_res_save, sep="\t")
# 检查是否已存在相同正文
if sentence in df["正文"].values:
print("正文已存在,跳过处理")
return df
if zongjie == None:
return "正文已存在,跳过处理"
else:
result = df[df['正文'] == sentence]
id_ = result['ID'].values[0]
print(id_)
return ulit_request_file_zongjie(id_, sentence, zongjie, title)
else:
df = pd.DataFrame(columns=["ID", "正文", "总结", "有效", "已向量化"])
df = pd.DataFrame(columns=["ID", "正文", "总结", "有效", "向量"])
# 添加新数据(生成唯一ID)
if zongjie == None:
id_ = str(uuid.uuid1())
new_row = {
"ID": str(new_id),
"ID": id_,
"正文": sentence,
"总结": None,
"有效": True,
"向量": False
"向量": None
}
df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
# 筛选需要处理的记录
to_process = df[(df["总结"].isna()) & (df["有效"] == True)]
# 调用API生成总结(示例保留原有逻辑)
data_list = []
for _, row in to_process.iterrows():
data_list.append({
# 需要根据不同的项目修改提示,目的是精简内容,为了方便匹配
data_dan = {
"model": "gpt-4-turbo",
"messages": [{
"role": "user",
"content": f"{row['正文']}\n以上这条中可能包含了一些病情或者症状,请帮我归纳这条中所对应的病情或者症状是哪些,总结出来,不需要很长,简单归纳即可,直接输出症状或者病情,可以包含一些形容词来辅助描述,不需要有辅助词汇"
"content": f"{sentence}\n以上这条中可能包含了一些病情或者症状,请帮我归纳这条中所对应的病情或者症状是哪些,总结出来,不需要很长,简单归纳即可,直接输出症状或者病情,可以包含一些形容词来辅助描述,不需要有辅助词汇"
}],
"top_p": 0.9,
"temperature": 0.3
})
}
results = dialog_line_parse(data_dan)
summary = results['choices'][0]['message']['content']
# 并发处理请求
with concurrent.futures.ThreadPoolExecutor(200) as executor:
results = list(executor.map(dialog_line_parse, data_list))
# 这是你的向量生成函数,来生成总结的词汇的向量
new_vectors = shengcehng_array([summary])
df.loc[df['ID'] == id_, '总结'] = summary
df.loc[df['ID'] == id_, '向量'] = str(new_vectors[0].tolist())
# 更新总结字段
for idx, result in zip(to_process.index, results):
summary = result['choices'][0]['message']['content']
df.at[idx, "总结"] = summary
else:
id_ = str(uuid.uuid1())
new_row = {
"ID": id_ ,
"正文": sentence,
"总结": zongjie,
"有效": True,
"向量": None
}
df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
new_vectors = shengcehng_array([zongjie]) # 假设这是你的向量生成函数
df.loc[df['ID'] == id_, '总结'] = zongjie
df.loc[df['ID'] == id_, '向量'] = str(new_vectors[0].tolist())
# 保存更新后的CSV
df.to_csv(file_name_res_save, sep="\t", index=False)
return df
return "上传完成"
def ulit_request_file_zongjie(new_id, sentence, zongjie, title):
new_id = str(new_id)
print(new_id)
print(type(new_id))
file_name_res_save = f"data_file_res/{title}.csv"
# 初始化或读取CSV文件
if os.path.exists(file_name_res_save):
df = pd.read_csv(file_name_res_save, sep="\t")
df.loc[df['ID'] == new_id, '正文'] = sentence
if zongjie == None:
pass
else:
df = pd.DataFrame(columns=["ID", "正文", "总结", "有效", "已向量化"])
# # 添加新数据(生成唯一ID)
# new_row = {
# "ID": str(new_id),
# "正文": sentence,
# "总结": zongjie,
# "有效": True,
# "已向量化": False
# }
# df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
df.loc[df['ID'] == new_id, '总结'] = zongjie
new_vectors = shengcehng_array([zongjie]) # 假设这是你的向量生成函数
df.loc[df['ID'] == new_id, '向量'] = str(new_vectors[0].tolist())
# 保存更新后的CSV
df.to_csv(file_name_res_save, sep="\t", index=False)
return "修改完成"
# 筛选需要处理的记录
to_process = df[df["有效"] == True]
# 调用API生成总结(示例保留原有逻辑)
data_list = []
for _, row in to_process.iterrows():
data_list.append({
"model": "gpt-4-turbo",
"messages": [{
"role": "user",
"content": f"{row['正文']}\n以上这条中可能包含了一些病情或者症状,请帮我归纳这条中所对应的病情或者症状是哪些,总结出来,不需要很长,简单归纳即可,直接输出症状或者病情,可以包含一些形容词来辅助描述,不需要有辅助词汇"
}],
"top_p": 0.9,
"temperature": 0.6
})
def ulit_request_file_check(title):
file_name_res_save = f"data_file_res/{title}.csv"
# 并发处理请求
with concurrent.futures.ThreadPoolExecutor(200) as executor:
results = list(executor.map(dialog_line_parse, data_list))
# 初始化或读取CSV文件
# 更新总结字段
for idx, result in zip(to_process.index, results):
summary = result['choices'][0]['message']['content']
df.at[idx, "总结"] = summary
# 初始化或读取CSV文件
if os.path.exists(file_name_res_save):
df = pd.read_csv(file_name_res_save, sep="\t").values.tolist()
data_new = []
for i in df:
if i[3] == True:
data_new.append([i[0], i[1], i[2]])
return data_new
else:
return "无可展示文件"
# 保存更新后的CSV
df.to_csv(file_name_res_save, sep="\t", index=False)
return df
def ulit_request_file_check_dan(new_id, title):
new_id = str(new_id)
file_name_res_save = f"data_file_res/{title}.csv"
# 初始化或读取CSV文件
# 初始化或读取CSV文件
if os.path.exists(file_name_res_save):
df = pd.read_csv(file_name_res_save, sep="\t")
zhengwen = df.loc[df['ID'] == new_id, '正文'].values
zongjie = df.loc[df['ID'] == new_id, '总结'].values
# 输出结果
if len(zhengwen) > 0:
if df.loc[df['ID'] == new_id, '有效'].values == True:
return [new_id, zhengwen[0], zongjie[0]]
else:
return "未找到对应的ID"
else:
return "未找到对应的ID"
else:
return "无可展示文件"
def main(question, title, top):
@ -311,22 +360,30 @@ def main(question, title, top):
index = faiss.IndexFlatIP(d) # buid the index
# 查找向量
vector_path = f"data_np/{title_dan}.npy"
vectors = np.load(vector_path)
# vector_path = f"data_np/{title_dan}.npy"
# vectors = np.load(vector_path)
data_str = pd.read_csv(f"data_file_res/{title_dan}.csv", sep="\t", encoding="utf-8").values.tolist()
data_str_valid = []
for i in data_str:
if i[3] == True:
data_str_valid.append(i)
data_str_vectors_list = []
for i in data_str_valid:
data_str_vectors_list.append(eval(i[-1]))
vectors = np.array(data_str_vectors_list)
index.add(vectors)
D, I = index.search(embs, int(top))
print(I)
reference_list = []
for i,j in zip(I[0], D[0]):
reference_list.append([data_str[i], j])
reference_list.append([data_str_valid[i], j])
for i,j in enumerate(reference_list):
paper_list_str += "{}\n{},此篇文章跟问题的相关度为{}%\n".format(str(i+1), j[0][1], j[1])
'''
构造prompt
'''
@ -374,7 +431,7 @@ def model_generate_stream(prompt):
print("\ncontent:", end="", flush=True)
# Extract and print the content
# print(content, end="", flush=True)
print(content)
print(content, end="")
yield content
@ -397,24 +454,24 @@ def upload_file_check():
# 增
state_res = ""
if state == "1":
df = ulit_request_file(new_id, sentence, title)
Building_vector_database(title, df)
state_res = "上传完成"
state_res = ulit_request_file(sentence, title, zongjie)
# 删
elif state == "2":
delete_data(title, new_id)
state_res = "删除完成"
state_res = delete_data(title, new_id)
# 改
elif state == "3":
df = ulit_request_file(new_id, sentence, title)
Building_vector_database(title, df)
state_res = "修改完成"
state_res = ulit_request_file_zongjie(new_id, sentence, zongjie,title)
# 查
elif state == "4":
df = ulit_request_file(new_id, sentence, title)
state_res = ulit_request_file_check(title)
# 通过uuid查单条数据
elif state == "5":
ulit_request_file_check_dan(new_id, title)
state_res = ""
return_json = {

74
main_scokt.py

@ -44,7 +44,7 @@ client = OpenAI(
models = client.models.list()
model = models.data[0].id
# model = "1"
model_encode = SentenceTransformer('/home/majiahui/project/models-llm/bge-large-zh-v1.5')
model_encode = SentenceTransformer('/home/zhangbaoxun/project/models-llm/bge-large-zh-v1.5')
propmt_connect = '''我是一名中医,你是一个中医的医生的助理,我的患者有一个症状,症状如下:
{}
根据这些症状我通过查找资料{}
@ -113,6 +113,13 @@ def shengcehng_array(data):
def main(question, title, top):
'''
主函数用来匹配句子放到prompt中生成回答
:param question:
:param title:
:param top:
:return:
'''
db_dict = {
"1": "yetianshi"
}
@ -149,23 +156,36 @@ def main(question, title, top):
# vector_path = f"data_np/{title_dan}.npy"
# vectors = np.load(vector_path)
# 读取向量文件 csv文件结构:
# ID
# 正文
# 总结
# 有效
# 向量
data_str = pd.read_csv(f"data_file_res/{title_dan}.csv", sep="\t", encoding="utf-8").values.tolist()
data_str_valid = []
for i in data_str:
# i[3] == True 说明数据没有被删除,如果是false说明被删除
if i[3] == True:
data_str_valid.append(i)
# 把有效数据的向量汇总出来
data_str_vectors_list = []
for i in data_str_valid:
data_str_vectors_list.append(eval(i[-1]))
# 拼接成向量矩阵
vectors = np.array(data_str_vectors_list)
index.add(vectors)
# 使用faiss找到最相似向量
D, I = index.search(embs, int(top))
print(I)
reference_list = []
for i,j in zip(I[0], D[0]):
# 添加 csv对应的数据 data_str_valid[i]表示 csv中一行的所有数据 ID 正文 总结 有效 向量 以及 j表示相关度是多少
reference_list.append([data_str_valid[i], j])
for i,j in enumerate(reference_list):
@ -178,10 +198,12 @@ def main(question, title, top):
for i in db_type_list:
propmt_connect_ziliao_input.append(propmt_connect_ziliao.format(i, paper_list_str))
# 构造输入问题,把上面的都展示出来
propmt_connect_ziliao_input_str = "".join(propmt_connect_ziliao_input)
propmt_connect_input = propmt_connect.format(question, propmt_connect_ziliao_input_str)
'''
生成回答
生成回答这个model_generate_stream可以根据需要指定模型
'''
return model_generate_stream(propmt_connect_input)
@ -259,38 +281,40 @@ async def handle_websocket(websocket):
async def main_api():
try:
# 是否加载
ssl_context = None
# wss服务开关 True是打开wss服务
wss_bool = False
# 检查证书文件是否存在
ssl_cert = "yitongtang66.com.crt"
ssl_cert = "yitongtang66.com_ca_chains.crt"
ssl_key = "yitongtang66.com.key"
# ssl_cert = "yizherenxin.cn.crt"
# ssl_key = "yizherenxin.cn.key"
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(ssl_cert, ssl_key)
ssl_context.check_hostname = False # 必须禁用主机名验证
ssl_context.verify_mode = ssl.CERT_NONE # 不验证证书
# if os.path.exists(ssl_cert) and os.path.exists(ssl_key):
# try:
# # 创建SSL上下文
# ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
# # 加载证书链
# ssl_context.load_cert_chain(ssl_cert, ssl_key)
# # 禁用主机名验证(对于自签名证书)
# ssl_context.check_hostname = False
# ssl_context.verify_mode = ssl.CERT_NONE
# print("SSL证书已加载,使用WSS协议")
# except Exception as e:
# print(f"SSL证书加载失败: {e}")
# print("将使用WS协议")
# ssl_context = None
# else:
# print("警告:SSL证书文件未找到,将使用WS协议")
# ssl_context = None
# ssl_context.check_hostname = False # 必须禁用主机名验证
# ssl_context.verify_mode = ssl.CERT_NONE # 不验证证书
if wss_bool == True:
try:
# 创建SSL上下文
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
# 加载证书链
ssl_context.load_cert_chain(ssl_cert, ssl_key)
# 禁用主机名验证(对于自签名证书)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
print("SSL证书已加载,使用WSS协议")
except Exception as e:
print(f"SSL证书加载失败: {e}")
print("将使用WS协议")
ssl_context = None
else:
print("警告:SSL证书文件未找到,将使用WS协议")
ssl_context = None
# 创建服务器
server = await websockets.serve(
@ -303,6 +327,7 @@ async def main_api():
close_timeout=30 # 添加关闭超时
)
# 启动27001端口
if ssl_context:
print("WSS服务器已启动: wss://0.0.0.0:27001")
else:
@ -323,6 +348,7 @@ if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# 启动服务
try:
asyncio.run(main_api())
except KeyboardInterrupt:

Loading…
Cancel
Save