diff --git a/flask_batch.py b/flask_batch.py index 3708802..38633ff 100644 --- a/flask_batch.py +++ b/flask_batch.py @@ -9,6 +9,10 @@ import json from threading import Thread from vllm import LLM, SamplingParams import time +import threading +import time +import concurrent.futures +import requests app = Flask(__name__) app.config["JSON_AS_ASCII"] = False @@ -16,13 +20,43 @@ pool = redis.ConnectionPool(host='localhost', port=63179, max_connections=50,db= redis_ = redis.Redis(connection_pool=pool, decode_responses=True) db_key_query = 'query' +db_key_query_articles_directory = 'query_articles_directory' db_key_result = 'result' batch_size = 32 -sampling_params = SamplingParams(temperature=0.95, top_p=0.7,presence_penalty=0.9,stop="", max_tokens=2048) +sampling_params = SamplingParams(temperature=0.95, top_p=0.7,presence_penalty=0.9,stop="", max_tokens=4096) models_path = "/home/majiahui/project/models-llm/openbuddy-llama-7b-finetune" llm = LLM(model=models_path, tokenizer_mode="slow") + +def dialog_line_parse(url, text): + """ + 将数据输入模型进行分析并输出结果 + :param url: 模型url + :param text: 进入模型的数据 + :return: 模型返回结果 + """ + + response = requests.post( + url, + json=text, + timeout=1000 + ) + if response.status_code == 200: + return response.json() + else: + # logger.error( + # "【{}】 Failed to get a proper response from remote " + # "server. Status Code: {}. Response: {}" + # "".format(url, response.status_code, response.text) + # ) + print("【{}】 Failed to get a proper response from remote " + "server. Status Code: {}. Response: {}" + "".format(url, response.status_code, response.text)) + print(text) + return [] + + def classify(batch_size): # 调用模型,设置最大batch_size while True: texts = [] @@ -35,8 +69,16 @@ def classify(batch_size): # 调用模型,设置最大batch_size query_ids.append(json.loads(query)['id']) texts.append(json.loads(query)['text']) # 拼接若干text 为batch outputs = llm.generate(texts, sampling_params) # 调用模型 - for (id_, output) in zip(query_ids, outputs): - res = output.outputs[0].text + + generated_text_list = [""] * len(texts) + for i, output in enumerate(outputs): + index = output.request_id + generated_text = output.outputs[0].text + generated_text_list[int(index)] = generated_text + + + for (id_, output) in zip(query_ids, generated_text_list): + res = output redis_.set(id_, json.dumps(res)) # 将模型结果送回队列 @@ -57,6 +99,29 @@ def handle_query(): return jsonify(result_text) # 返回结果 +@app.route("/articles_directory", methods=["POST"]) +def handle_query(): + text = request.json["texts"] # 获取用户query中的文本 例如"I love you" + nums = request.json["nums"] + + nums = int(nums) + url = "http://114.116.25.228:18000/predict" + + input_data = [] + for i in range(nums): + input_data.append([url, {"texts": text}]) + + + with concurrent.futures.ThreadPoolExecutor() as executor: + # 使用submit方法将任务提交给线程池,并获取Future对象 + futures = [executor.submit(dialog_line_parse, i[0], i[1]) for i in input_data] + + # 使用as_completed获取已完成的任务,并获取返回值 + results = [future.result() for future in concurrent.futures.as_completed(futures)] + + return jsonify(results) # 返回结果 + + if __name__ == "__main__": t = Thread(target=classify, args=(batch_size,)) t.start()