|
@ -9,6 +9,10 @@ import json |
|
|
from threading import Thread |
|
|
from threading import Thread |
|
|
from vllm import LLM, SamplingParams |
|
|
from vllm import LLM, SamplingParams |
|
|
import time |
|
|
import time |
|
|
|
|
|
import threading |
|
|
|
|
|
import time |
|
|
|
|
|
import concurrent.futures |
|
|
|
|
|
import requests |
|
|
|
|
|
|
|
|
app = Flask(__name__) |
|
|
app = Flask(__name__) |
|
|
app.config["JSON_AS_ASCII"] = False |
|
|
app.config["JSON_AS_ASCII"] = False |
|
@ -16,13 +20,43 @@ pool = redis.ConnectionPool(host='localhost', port=63179, max_connections=50,db= |
|
|
redis_ = redis.Redis(connection_pool=pool, decode_responses=True) |
|
|
redis_ = redis.Redis(connection_pool=pool, decode_responses=True) |
|
|
|
|
|
|
|
|
db_key_query = 'query' |
|
|
db_key_query = 'query' |
|
|
|
|
|
db_key_query_articles_directory = 'query_articles_directory' |
|
|
db_key_result = 'result' |
|
|
db_key_result = 'result' |
|
|
batch_size = 32 |
|
|
batch_size = 32 |
|
|
|
|
|
|
|
|
sampling_params = SamplingParams(temperature=0.95, top_p=0.7,presence_penalty=0.9,stop="</s>", max_tokens=2048) |
|
|
sampling_params = SamplingParams(temperature=0.95, top_p=0.7,presence_penalty=0.9,stop="</s>", max_tokens=4096) |
|
|
models_path = "/home/majiahui/project/models-llm/openbuddy-llama-7b-finetune" |
|
|
models_path = "/home/majiahui/project/models-llm/openbuddy-llama-7b-finetune" |
|
|
llm = LLM(model=models_path, tokenizer_mode="slow") |
|
|
llm = LLM(model=models_path, tokenizer_mode="slow") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dialog_line_parse(url, text): |
|
|
|
|
|
""" |
|
|
|
|
|
将数据输入模型进行分析并输出结果 |
|
|
|
|
|
:param url: 模型url |
|
|
|
|
|
:param text: 进入模型的数据 |
|
|
|
|
|
:return: 模型返回结果 |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
response = requests.post( |
|
|
|
|
|
url, |
|
|
|
|
|
json=text, |
|
|
|
|
|
timeout=1000 |
|
|
|
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
|
|
|
return response.json() |
|
|
|
|
|
else: |
|
|
|
|
|
# logger.error( |
|
|
|
|
|
# "【{}】 Failed to get a proper response from remote " |
|
|
|
|
|
# "server. Status Code: {}. Response: {}" |
|
|
|
|
|
# "".format(url, response.status_code, response.text) |
|
|
|
|
|
# ) |
|
|
|
|
|
print("【{}】 Failed to get a proper response from remote " |
|
|
|
|
|
"server. Status Code: {}. Response: {}" |
|
|
|
|
|
"".format(url, response.status_code, response.text)) |
|
|
|
|
|
print(text) |
|
|
|
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def classify(batch_size): # 调用模型,设置最大batch_size |
|
|
def classify(batch_size): # 调用模型,设置最大batch_size |
|
|
while True: |
|
|
while True: |
|
|
texts = [] |
|
|
texts = [] |
|
@ -35,8 +69,16 @@ def classify(batch_size): # 调用模型,设置最大batch_size |
|
|
query_ids.append(json.loads(query)['id']) |
|
|
query_ids.append(json.loads(query)['id']) |
|
|
texts.append(json.loads(query)['text']) # 拼接若干text 为batch |
|
|
texts.append(json.loads(query)['text']) # 拼接若干text 为batch |
|
|
outputs = llm.generate(texts, sampling_params) # 调用模型 |
|
|
outputs = llm.generate(texts, sampling_params) # 调用模型 |
|
|
for (id_, output) in zip(query_ids, outputs): |
|
|
|
|
|
res = output.outputs[0].text |
|
|
generated_text_list = [""] * len(texts) |
|
|
|
|
|
for i, output in enumerate(outputs): |
|
|
|
|
|
index = output.request_id |
|
|
|
|
|
generated_text = output.outputs[0].text |
|
|
|
|
|
generated_text_list[int(index)] = generated_text |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (id_, output) in zip(query_ids, generated_text_list): |
|
|
|
|
|
res = output |
|
|
redis_.set(id_, json.dumps(res)) # 将模型结果送回队列 |
|
|
redis_.set(id_, json.dumps(res)) # 将模型结果送回队列 |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -57,6 +99,29 @@ def handle_query(): |
|
|
return jsonify(result_text) # 返回结果 |
|
|
return jsonify(result_text) # 返回结果 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/articles_directory", methods=["POST"]) |
|
|
|
|
|
def handle_query(): |
|
|
|
|
|
text = request.json["texts"] # 获取用户query中的文本 例如"I love you" |
|
|
|
|
|
nums = request.json["nums"] |
|
|
|
|
|
|
|
|
|
|
|
nums = int(nums) |
|
|
|
|
|
url = "http://114.116.25.228:18000/predict" |
|
|
|
|
|
|
|
|
|
|
|
input_data = [] |
|
|
|
|
|
for i in range(nums): |
|
|
|
|
|
input_data.append([url, {"texts": text}]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
|
|
|
|
# 使用submit方法将任务提交给线程池,并获取Future对象 |
|
|
|
|
|
futures = [executor.submit(dialog_line_parse, i[0], i[1]) for i in input_data] |
|
|
|
|
|
|
|
|
|
|
|
# 使用as_completed获取已完成的任务,并获取返回值 |
|
|
|
|
|
results = [future.result() for future in concurrent.futures.as_completed(futures)] |
|
|
|
|
|
|
|
|
|
|
|
return jsonify(results) # 返回结果 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
if __name__ == "__main__": |
|
|
t = Thread(target=classify, args=(batch_size,)) |
|
|
t = Thread(target=classify, args=(batch_size,)) |
|
|
t.start() |
|
|
t.start() |
|
|