
9 changed files with 310 additions and 49 deletions
@ -0,0 +1,51 @@ |
|||||
|
from flask import Flask, jsonify |
||||
|
from flask import request |
||||
|
import redis |
||||
|
import uuid |
||||
|
import json |
||||
|
import time |
||||
|
import socket |
||||
|
|
||||
|
def get_host_ip(): |
||||
|
""" |
||||
|
查询本机ip地址 |
||||
|
:return: ip |
||||
|
""" |
||||
|
try: |
||||
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
||||
|
s.connect(('8.8.8.8', 80)) |
||||
|
ip = s.getsockname()[0] |
||||
|
finally: |
||||
|
s.close() |
||||
|
|
||||
|
return ip |
||||
|
|
||||
|
app = Flask(__name__) |
||||
|
app.config["JSON_AS_ASCII"] = False |
||||
|
pool = redis.ConnectionPool(host='localhost', port=63179, max_connections=50,db=11, password="zhicheng123*") |
||||
|
redis_ = redis.Redis(connection_pool=pool, decode_responses=True) |
||||
|
|
||||
|
db_key_query = 'query' |
||||
|
db_key_query_articles_directory = 'query_articles_directory' |
||||
|
db_key_result = 'result' |
||||
|
batch_size = 32 |
||||
|
|
||||
|
|
||||
|
@app.route("/predict", methods=["POST"]) |
||||
|
def handle_query(): |
||||
|
text = request.json["texts"] # 获取用户query中的文本 例如"I love you" |
||||
|
id_ = str(uuid.uuid1()) # 为query生成唯一标识 |
||||
|
d = {'id': id_, 'text': text} # 绑定文本和query id |
||||
|
redis_.rpush(db_key_query, json.dumps(d)) # 加入redis |
||||
|
while True: |
||||
|
result = redis_.get(id_) # 获取该query的模型结果 |
||||
|
if result is not None: |
||||
|
redis_.delete(id_) |
||||
|
result_text = {'code': "200", 'data': json.loads(result)} |
||||
|
break |
||||
|
time.sleep(1) |
||||
|
|
||||
|
return jsonify(result_text) # 返回结果 |
||||
|
|
||||
|
if __name__ == "__main__": |
||||
|
app.run(debug=False, host='0.0.0.0', port=18001) |
@ -0,0 +1,21 @@ |
|||||
|
# 并行工作线程数 |
||||
|
workers = 8 |
||||
|
# 监听内网端口5000【按需要更改】 |
||||
|
bind = '0.0.0.0:12000' |
||||
|
|
||||
|
loglevel = 'debug' |
||||
|
|
||||
|
worker_class = "gevent" |
||||
|
# 设置守护进程【关闭连接时,程序仍在运行】 |
||||
|
daemon = True |
||||
|
# 设置超时时间120s,默认为30s。按自己的需求进行设置 |
||||
|
timeout = 120 |
||||
|
# 设置访问日志和错误信息日志路径 |
||||
|
accesslog = './logs/acess.log' |
||||
|
errorlog = './logs/error.log' |
||||
|
# access_log_format = '%(h) - %(t)s - %(u)s - %(s)s %(H)s' |
||||
|
# errorlog = '-' # 记录到标准输出 |
||||
|
|
||||
|
|
||||
|
# 设置最大并发量 |
||||
|
worker_connections = 20000 |
@ -0,0 +1,57 @@ |
|||||
|
import os |
||||
|
os.environ["CUDA_VISIBLE_DEVICES"] = "3" |
||||
|
from transformers import pipeline |
||||
|
import redis |
||||
|
import uuid |
||||
|
import json |
||||
|
from threading import Thread |
||||
|
from vllm import LLM, SamplingParams |
||||
|
import time |
||||
|
import threading |
||||
|
import time |
||||
|
import concurrent.futures |
||||
|
import requests |
||||
|
import socket |
||||
|
|
||||
|
|
||||
|
pool = redis.ConnectionPool(host='localhost', port=63179, max_connections=50,db=11, password="zhicheng123*") |
||||
|
redis_ = redis.Redis(connection_pool=pool, decode_responses=True) |
||||
|
|
||||
|
db_key_query = 'query' |
||||
|
db_key_query_articles_directory = 'query_articles_directory' |
||||
|
db_key_result = 'result' |
||||
|
batch_size = 512 |
||||
|
|
||||
|
sampling_params = SamplingParams(temperature=0.95, top_p=0.7,presence_penalty=0.9,stop="</s>", max_tokens=4096) |
||||
|
models_path = "/home/majiahui/project/models-llm/openbuddy-llama-7b-finetune" |
||||
|
llm = LLM(model=models_path, tokenizer_mode="slow") |
||||
|
|
||||
|
def classify(batch_size): # 调用模型,设置最大batch_size |
||||
|
while True: |
||||
|
texts = [] |
||||
|
query_ids = [] |
||||
|
if redis_.llen(db_key_query) == 0: # 若队列中没有元素就继续获取 |
||||
|
time.sleep(2) |
||||
|
continue |
||||
|
for i in range(min(redis_.llen(db_key_query), batch_size)): |
||||
|
query = redis_.lpop(db_key_query).decode('UTF-8') # 获取query的text |
||||
|
query_ids.append(json.loads(query)['id']) |
||||
|
texts.append(json.loads(query)['text']) # 拼接若干text 为batch |
||||
|
outputs = llm.generate(texts, sampling_params) # 调用模型 |
||||
|
|
||||
|
generated_text_list = [""] * len(texts) |
||||
|
print("outputs", len(outputs)) |
||||
|
for i, output in enumerate(outputs): |
||||
|
index = output.request_id |
||||
|
generated_text = output.outputs[0].text |
||||
|
generated_text_list[int(index)] = generated_text |
||||
|
|
||||
|
|
||||
|
for (id_, output) in zip(query_ids, generated_text_list): |
||||
|
res = output |
||||
|
redis_.set(id_, json.dumps(res)) # 将模型结果送回队列 |
||||
|
|
||||
|
|
||||
|
if __name__ == '__main__': |
||||
|
t = Thread(target=classify, args=(batch_size,)) |
||||
|
t.start() |
@ -0,0 +1 @@ |
|||||
|
gunicorn flask_predict:app -c gunicorn_config.py |
@ -0,0 +1 @@ |
|||||
|
nohup python mistral_model_predict_vllm.py > mistral_model_predict_vllm_logs.file 2>&1 & |
@ -0,0 +1,47 @@ |
|||||
|
import concurrent.futures |
||||
|
import requests |
||||
|
import socket |
||||
|
|
||||
|
|
||||
|
def dialog_line_parse(url, text): |
||||
|
""" |
||||
|
将数据输入模型进行分析并输出结果 |
||||
|
:param url: 模型url |
||||
|
:param text: 进入模型的数据 |
||||
|
:return: 模型返回结果 |
||||
|
""" |
||||
|
|
||||
|
response = requests.post( |
||||
|
url, |
||||
|
json=text, |
||||
|
timeout=1000 |
||||
|
) |
||||
|
if response.status_code == 200: |
||||
|
return response.json() |
||||
|
else: |
||||
|
# logger.error( |
||||
|
# "【{}】 Failed to get a proper response from remote " |
||||
|
# "server. Status Code: {}. Response: {}" |
||||
|
# "".format(url, response.status_code, response.text) |
||||
|
# ) |
||||
|
print("【{}】 Failed to get a proper response from remote " |
||||
|
"server. Status Code: {}. Response: {}" |
||||
|
"".format(url, response.status_code, response.text)) |
||||
|
print(text) |
||||
|
return [] |
||||
|
|
||||
|
nums = 1000 |
||||
|
url = "http://192.168.31.74:18001/predict" |
||||
|
|
||||
|
input_data = [] |
||||
|
for i in range(nums): |
||||
|
input_data.append([url, {"texts": "User:你好\nAssistant:"}]) |
||||
|
|
||||
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
||||
|
# 使用submit方法将任务提交给线程池,并获取Future对象 |
||||
|
futures = [executor.submit(dialog_line_parse, i[0], i[1]) for i in input_data] |
||||
|
|
||||
|
# 使用as_completed获取已完成的任务,并获取返回值 |
||||
|
results = [future.result() for future in concurrent.futures.as_completed(futures)] |
||||
|
|
||||
|
print(results) |
@ -0,0 +1,76 @@ |
|||||
|
import threading |
||||
|
import requests |
||||
|
import time |
||||
|
|
||||
|
|
||||
|
# 用于记录成功和失败请求的全局变量 |
||||
|
success_count = 0 |
||||
|
failure_count = 0 |
||||
|
lock = threading.Lock() |
||||
|
|
||||
|
|
||||
|
def dialog_line_parse(url, text): |
||||
|
""" |
||||
|
将数据输入模型进行分析并输出结果 |
||||
|
:param url: 模型url |
||||
|
:param text: 进入模型的数据 |
||||
|
:return: 模型返回结果 |
||||
|
""" |
||||
|
|
||||
|
response = requests.post( |
||||
|
url, |
||||
|
json=text, |
||||
|
timeout=1000 |
||||
|
) |
||||
|
if response.status_code == 200: |
||||
|
return response.json() |
||||
|
else: |
||||
|
# logger.error( |
||||
|
# "【{}】 Failed to get a proper response from remote " |
||||
|
# "server. Status Code: {}. Response: {}" |
||||
|
# "".format(url, response.status_code, response.text) |
||||
|
# ) |
||||
|
print("【{}】 Failed to get a proper response from remote " |
||||
|
"server. Status Code: {}. Response: {}" |
||||
|
"".format(url, response.status_code, response.text)) |
||||
|
print(text) |
||||
|
return [] |
||||
|
|
||||
|
|
||||
|
# 定义一个函数来执行 HTTP 请求 |
||||
|
def make_request(url): |
||||
|
global success_count, failure_count |
||||
|
|
||||
|
try: |
||||
|
a = dialog_line_parse(url, {"texts": "User:你好\nAssistant:"})['data'] |
||||
|
print(a) |
||||
|
with lock: |
||||
|
success_count += 1 |
||||
|
except: |
||||
|
with lock: |
||||
|
failure_count += 1 |
||||
|
|
||||
|
# 要并发请求的 URL 列表 |
||||
|
urls = [ |
||||
|
'http://192.168.31.74:18001/predict', |
||||
|
# 可以添加更多的 URL |
||||
|
] * 30 |
||||
|
|
||||
|
|
||||
|
# 创建一个线程列表 |
||||
|
threads = [] |
||||
|
|
||||
|
# 创建并启动线程 |
||||
|
start= time.time() |
||||
|
for url in urls: |
||||
|
thread = threading.Thread(target=make_request, args=(url,)) |
||||
|
thread.start() |
||||
|
threads.append(thread) |
||||
|
|
||||
|
# 等待所有线程完成 |
||||
|
for thread in threads: |
||||
|
thread.join() |
||||
|
end = time.time() |
||||
|
print(end-start) |
||||
|
print(f"Successful requests: {success_count}") |
||||
|
print(f"Failed requests: {failure_count}") |
@ -0,0 +1,51 @@ |
|||||
|
import concurrent.futures |
||||
|
import requests |
||||
|
import socket |
||||
|
|
||||
|
|
||||
|
def dialog_line_parse(url, text): |
||||
|
""" |
||||
|
将数据输入模型进行分析并输出结果 |
||||
|
:param url: 模型url |
||||
|
:param text: 进入模型的数据 |
||||
|
:return: 模型返回结果 |
||||
|
""" |
||||
|
|
||||
|
response = requests.post( |
||||
|
url, |
||||
|
json=text, |
||||
|
timeout=1000 |
||||
|
) |
||||
|
if response.status_code == 200: |
||||
|
return response.json() |
||||
|
else: |
||||
|
# logger.error( |
||||
|
# "【{}】 Failed to get a proper response from remote " |
||||
|
# "server. Status Code: {}. Response: {}" |
||||
|
# "".format(url, response.status_code, response.text) |
||||
|
# ) |
||||
|
print("【{}】 Failed to get a proper response from remote " |
||||
|
"server. Status Code: {}. Response: {}" |
||||
|
"".format(url, response.status_code, response.text)) |
||||
|
print(text) |
||||
|
return [] |
||||
|
|
||||
|
|
||||
|
text = "User:生成目录#\n问:为论文题目《基于跨文化意识培养的中职英语词汇教学模式及策略行动研究》生成目录,要求只有一级标题和二级标题,一级标题使用中文数字 例如一、xxx;二级标题使用阿拉伯数字 例如1.1 xxx;一级标题不少于7个;每个一级标题至少包含3个二级标题\n答:\n\nAssistant:" # 获取用户query中的文本 例如"I love you" |
||||
|
nums = 10 |
||||
|
|
||||
|
nums = int(nums) |
||||
|
url = "http://192.168.31.74:18001/predict" |
||||
|
|
||||
|
input_data = [] |
||||
|
for i in range(nums): |
||||
|
input_data.append([url, {"texts": text}]) |
||||
|
|
||||
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
||||
|
# 使用submit方法将任务提交给线程池,并获取Future对象 |
||||
|
futures = [executor.submit(dialog_line_parse, i[0], i[1]) for i in input_data] |
||||
|
|
||||
|
# 使用as_completed获取已完成的任务,并获取返回值 |
||||
|
results = [future.result() for future in concurrent.futures.as_completed(futures)] |
||||
|
|
||||
|
print(results) |
Loading…
Reference in new issue