Browse Source

首次提交

master
majiahui@haimaqingfan.com 8 months ago
commit
c16c2e38df
  1. 21
      gunicorn_config.py
  2. 131
      mistral_api.py
  3. 111
      mistral_model_predict_vllm_1.py
  4. 1
      run_api_gunicorn.sh
  5. 1
      run_model_1.sh

21
gunicorn_config.py

@ -0,0 +1,21 @@
# 并行工作线程数
workers = 8
# 监听内网端口5000【按需要更改】
bind = '0.0.0.0:12002'
loglevel = 'debug'
worker_class = "gevent"
# 设置守护进程【关闭连接时,程序仍在运行】
daemon = True
# 设置超时时间120s,默认为30s。按自己的需求进行设置
timeout = 120
# 设置访问日志和错误信息日志路径
accesslog = './logs/acess1.log'
errorlog = './logs/error1.log'
# access_log_format = '%(h) - %(t)s - %(u)s - %(s)s %(H)s'
# errorlog = '-' # 记录到标准输出
# 设置最大并发量
worker_connections = 20000

131
mistral_api.py

@ -0,0 +1,131 @@
from flask import Flask, jsonify
from flask import request
from transformers import pipeline
import redis
import uuid
import json
from threading import Thread
from vllm import LLM, SamplingParams
import time
import threading
import time
import concurrent.futures
import requests
import socket
app = Flask(__name__)
app.config["JSON_AS_ASCII"] = False
pool = redis.ConnectionPool(host='localhost', port=63179, max_connections=50,db=1, password="zhicheng123*")
redis_ = redis.Redis(connection_pool=pool, decode_responses=True)
db_key_query = 'query'
db_key_querying = 'querying'
db_key_queryset = 'queryset'
db_key_result = 'result'
db_key_error = 'error'
def smtp_f(name):
# 在下面的代码行中使用断点来调试脚本。
import smtplib
from email.mime.text import MIMEText
from email.header import Header
sender = '838878981@qq.com' # 发送邮箱
receivers = ['838878981@qq.com'] # 接收邮箱
auth_code = "jfqtutaiwrtdbcge" # 授权码
message = MIMEText('基础大模型出现错误,紧急', 'plain', 'utf-8')
message['From'] = Header("Sender<%s>" % sender) # 发送者
message['To'] = Header("Receiver<%s>" % receivers[0]) # 接收者
subject = name
message['Subject'] = Header(subject, 'utf-8')
try:
server = smtplib.SMTP_SSL('smtp.qq.com', 465)
server.login(sender, auth_code)
server.sendmail(sender, receivers, message.as_string())
print("邮件发送成功")
server.close()
except smtplib.SMTPException:
print("Error: 无法发送邮件")
@app.route("/predict", methods=["POST"])
def predict():
text = request.json["texts"] # 获取用户query中的文本 例如"I love you"
id_ = str(uuid.uuid1()) # 为query生成唯一标识
print("uuid: ", uuid)
d = {'id': id_, 'text': text} # 绑定文本和query id
try:
load_request_path = './request_data_logs/{}.json'.format(id_)
with open(load_request_path, 'w', encoding='utf8') as f2:
# ensure_ascii=False才能输入中文,否则是Unicode字符
# indent=2 JSON数据的缩进,美观
json.dump(d, f2, ensure_ascii=False, indent=4)
redis_.rpush(db_key_query, json.dumps({"id": id_, "path": load_request_path})) # 加入redis
redis_.sadd(db_key_querying, id_)
redis_.sadd(db_key_queryset, id_)
return_text = {"texts": {'id': id_, }, "probabilities": None, "status_code": 200}
except:
return_text = {"texts": {'id': id_, }, "probabilities": None, "status_code": 400}
smtp_f("vllm-main-paper")
return jsonify(return_text) # 返回结果
@app.route("/search", methods=["POST"])
def search():
id_ = request.json['id'] # 获取用户query中的文本 例如"I love you"
result = redis_.get(id_) # 获取该query的模型结果
try:
if result is not None:
result_path = result.decode('UTF-8')
with open(result_path, encoding='utf8') as f1:
# 加载文件的对象
result_dict = json.load(f1)
code = result_dict["status_code"]
texts = result_dict["texts"]
probabilities = result_dict["probabilities"]
if str(code) == 400:
redis_.rpush(db_key_error, json.dumps({"id": id_}))
return False
result_text = {'code': code, 'text': texts, 'probabilities': probabilities}
else:
querying_list = list(redis_.smembers(db_key_querying))
querying_set = set()
for i in querying_list:
querying_set.add(i.decode())
querying_bool = False
if id_ in querying_set:
querying_bool = True
query_list_json = redis_.lrange(db_key_query, 0, -1)
query_set_ids = set()
for i in query_list_json:
data_dict = json.loads(i)
query_id = data_dict['id']
query_set_ids.add(query_id)
query_bool = False
if id_ in query_set_ids:
query_bool = True
if querying_bool == True and query_bool == True:
result_text = {'code': "201", 'text': "", 'probabilities': None}
elif querying_bool == True and query_bool == False:
result_text = {'code': "202", 'text': "", 'probabilities': None}
else:
result_text = {'code': "203", 'text': "", 'probabilities': None}
load_request_path = './request_data_logs_203/{}.json'.format(id_)
with open(load_request_path, 'w', encoding='utf8') as f2:
# ensure_ascii=False才能输入中文,否则是Unicode字符
# indent=2 JSON数据的缩进,美观
json.dump(result_text, f2, ensure_ascii=False, indent=4)
except:
smtp_f("vllm-main")
result_text = {'code': "400", 'text': "", 'probabilities': None}
return jsonify(result_text) # 返回结果
if __name__ == "__main__":
app.run(debug=False, host='0.0.0.0', port=12002)

111
mistral_model_predict_vllm_1.py

@ -0,0 +1,111 @@
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
from transformers import pipeline
import redis
import uuid
import json
from threading import Thread
from vllm import LLM, SamplingParams
import time
import threading
import time
import concurrent.futures
import requests
import socket
pool = redis.ConnectionPool(host='localhost', port=63179, max_connections=50,db=1, password="zhicheng123*")
redis_ = redis.Redis(connection_pool=pool, decode_responses=True)
db_key_query = 'query'
db_key_querying = 'querying'
db_key_result = 'result'
batch_size = 32
# sampling_params = SamplingParams(temperature=0.95, top_p=0.7,presence_penalty=0.9,stop="</s>", max_tokens=4096)
sampling_params = SamplingParams(temperature=0.95, top_p=0.7,stop="</s>", presence_penalty=1.1, max_tokens=8192)
# sampling_params = SamplingParams(temperature=0.95, top_p=0.7,stop="</s>", presence_penalty=1.1 , max_tokens=8192,)
# models_path = "/home/majiahui/project/LLaMA-Factory-main/lora_openbuddy_mistral_7b_v20_3-32k_paper_model_15"
# models_path = "/home/majiahui/project/LLaMA-Factory-main/lora_openbuddy_mistral_7b_v20_3-32k_paper_model_16"
# models_path = "/home/majiahui/project/LLaMA-Factory-long-paper/models/lora_openbuddy_mistral_7b_v20_3-32k_paper_model_16_Image_Enhancement_kongzhi_2"
# models_path = "/home/majiahui/project/LLaMA-Factory-long-paper/models/lora_openbuddy_mistral_7b_v20_3-32k_paper_model_16_Image_Enhancement_2"
models_path = "/home/majiahui/project/models-llm/lora_openbuddy_mistral_7b_v20_3-32k_paper_model_16_kto_picturn_kongzhi_xiaobiaoti_merge"
llm = LLM(model=models_path, tokenizer_mode="slow", max_model_len=8192)
class log:
def __init__(self):
pass
def log(*args, **kwargs):
format = '%Y/%m/%d-%H:%M:%S'
format_h = '%Y-%m-%d'
value = time.localtime(int(time.time()))
dt = time.strftime(format, value)
dt_log_file = time.strftime(format_h, value)
log_file = 'log_file/access-%s' % dt_log_file + ".log"
if not os.path.exists(log_file):
with open(os.path.join(log_file), 'w', encoding='utf-8') as f:
print(dt, *args, file=f, **kwargs)
else:
with open(os.path.join(log_file), 'a+', encoding='utf-8') as f:
print(dt, *args, file=f, **kwargs)
def classify(batch_size): # 调用模型,设置最大batch_size
while True:
texts = []
query_ids = []
if redis_.llen(db_key_query) == 0: # 若队列中没有元素就继续获取
time.sleep(2)
continue
# for i in range(min(redis_.llen(db_key_query), batch_size)):
while True:
query = redis_.lpop(db_key_query) # 获取query的text
if query == None:
break
query = query.decode('UTF-8')
data_dict_path = json.loads(query)
path = data_dict_path['path']
with open(path, encoding='utf8') as f1:
# 加载文件的对象
data_dict = json.load(f1)
# query_ids.append(json.loads(query)['id'])
# texts.append(json.loads(query)['text']) # 拼接若干text 为batch
query_id = data_dict['id']
text = data_dict["text"]
query_ids.append(query_id)
texts.append(text)
if len(texts) == batch_size:
break
outputs = llm.generate(texts, sampling_params) # 调用模型
generated_text_list = [""] * len(texts)
print("outputs", len(outputs))
for i, output in enumerate(outputs):
index = output.request_id
generated_text = output.outputs[0].text
generated_text_list[int(index)] = generated_text
for (id_, output) in zip(query_ids, generated_text_list):
return_text = {"texts": output, "probabilities": None, "status_code": 200}
load_result_path = "./new_data_logs/{}.json".format(id_)
with open(load_result_path, 'w', encoding='utf8') as f2:
# ensure_ascii=False才能输入中文,否则是Unicode字符
# indent=2 JSON数据的缩进,美观
json.dump(return_text, f2, ensure_ascii=False, indent=4)
redis_.set(id_, load_result_path, 86400)
# redis_.set(id_, load_result_path, 30)
redis_.srem(db_key_querying, id_)
log.log('start at',
'query_id:{},load_result_path:{},return_text:{}'.format(
id_, load_result_path, return_text))
if __name__ == '__main__':
t = Thread(target=classify, args=(batch_size,))
t.start()

1
run_api_gunicorn.sh

@ -0,0 +1 @@
gunicorn mistral_api:app -c gunicorn_config.py

1
run_model_1.sh

@ -0,0 +1 @@
nohup python mistral_model_predict_vllm_1.py > myout_mis_model_1.file 2>&1 &
Loading…
Cancel
Save