from flask import Flask, jsonify from flask import request from transformers import pipeline import redis import uuid import json from threading import Thread from vllm import LLM, SamplingParams import time import threading import time import concurrent.futures import requests import socket def get_host_ip(): """ 查询本机ip地址 :return: ip """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] finally: s.close() return ip app = Flask(__name__) app.config["JSON_AS_ASCII"] = False def dialog_line_parse(url, text): """ 将数据输入模型进行分析并输出结果 :param url: 模型url :param text: 进入模型的数据 :return: 模型返回结果 """ response = requests.post( url, json=text, timeout=1000 ) if response.status_code == 200: return response.json() else: # logger.error( # "【{}】 Failed to get a proper response from remote " # "server. Status Code: {}. Response: {}" # "".format(url, response.status_code, response.text) # ) print("【{}】 Failed to get a proper response from remote " "server. Status Code: {}. Response: {}" "".format(url, response.status_code, response.text)) print(text) return [] def request_api_chatgpt(prompt): data = { "texts": prompt } response = requests.post( chatgpt_url_predict, json=data, timeout=100000 ) if response.status_code == 200: return response.json() else: # logger.error( # "【{}】 Failed to get a proper response from remote " # "server. Status Code: {}. Response: {}" # "".format(url, response.status_code, response.text) # ) print("Failed to get a proper response from remote " "server. Status Code: {}. Response: {}" "".format(response.status_code, response.text)) return {} def uuid_search(uuid): data = { "id": uuid } response = requests.post( chatgpt_url_search, json=data, timeout=100000 ) if response.status_code == 200: return response.json() else: # logger.error( # "【{}】 Failed to get a proper response from remote " # "server. Status Code: {}. Response: {}" # "".format(url, response.status_code, response.text) # ) print("Failed to get a proper response from remote " "server. Status Code: {}. Response: {}" "".format(response.status_code, response.text)) return {} def uuid_search_mp(results): results_list = [""] * len(results) while True: tiaochu_bool = True for i in results_list: if i == "": tiaochu_bool = False break if tiaochu_bool == True: break for i in range(len(results)): uuid = results[i]["texts"]["id"] result = uuid_search(uuid) if result["code"] == 200: results_list[i] = result["text"] time.sleep(3) return results_list def get_multiple_urls(urls): input_values = [] for i in urls: input_values.append(i[1]) with concurrent.futures.ThreadPoolExecutor() as executor: # 使用map方法并发地调用worker_function results = list(executor.map(request_api_chatgpt, input_values)) with concurrent.futures.ThreadPoolExecutor() as executor: # 使用map方法并发地调用worker_function results = list(executor.map(uuid_search_mp, [results])) return_list = [] for i, j in zip(urls, results[0]): return_list.append([i, j]) return return_list @app.route("/articles_directory", methods=["POST"]) def articles_directory(): text = request.json["texts"] # 获取用户query中的文本 例如"I love you" nums = request.json["nums"] nums = int(nums) url = "http://{}:12000/predict".format(str(get_host_ip())) input_data = [] for i in range(nums): input_data.append([url, {"texts": text}]) with concurrent.futures.ThreadPoolExecutor() as executor: # 使用submit方法将任务提交给线程池,并获取Future对象 futures = [executor.submit(dialog_line_parse, i[0], i[1]) for i in input_data] # 使用as_completed获取已完成的任务,并获取返回值 results = [future.result() for future in concurrent.futures.as_completed(futures)] return jsonify(results) # 返回结果 if __name__ == "__main__": app.run(debug=False, host='0.0.0.0', port=18000)