import os os.environ["CUDA_VISIBLE_DEVICES"] = "2" from transformers import pipeline import redis import uuid import json from threading import Thread from vllm import LLM, SamplingParams import time import threading import time import concurrent.futures import requests import socket pool = redis.ConnectionPool(host='localhost', port=63179, max_connections=50,db=3, password="zhicheng123*") redis_ = redis.Redis(connection_pool=pool, decode_responses=True) db_key_query = 'query' db_key_querying = 'querying' db_key_result = 'result' batch_size = 32 # sampling_params = SamplingParams(temperature=0.95, top_p=0.7,presence_penalty=0.9,stop="", max_tokens=4096) sampling_params = SamplingParams(temperature=0.95, top_p=0.7,stop="", presence_penalty=1.1, max_tokens=8192) models_path = "/home/majiahui/project/LLaMA-Factory-main/lora_openbuddy_mistral_7b_v20_3-32k_paper_model_10" llm = LLM(model=models_path, tokenizer_mode="slow", max_model_len=8192) class log: def __init__(self): pass def log(*args, **kwargs): format = '%Y/%m/%d-%H:%M:%S' format_h = '%Y-%m-%d' value = time.localtime(int(time.time())) dt = time.strftime(format, value) dt_log_file = time.strftime(format_h, value) log_file = 'log_file/access-%s' % dt_log_file + ".log" if not os.path.exists(log_file): with open(os.path.join(log_file), 'w', encoding='utf-8') as f: print(dt, *args, file=f, **kwargs) else: with open(os.path.join(log_file), 'a+', encoding='utf-8') as f: print(dt, *args, file=f, **kwargs) def classify(batch_size): # 调用模型,设置最大batch_size while True: texts = [] query_ids = [] if redis_.llen(db_key_query) == 0: # 若队列中没有元素就继续获取 time.sleep(2) continue # for i in range(min(redis_.llen(db_key_query), batch_size)): while True: query = redis_.lpop(db_key_query) # 获取query的text if query == None: break query = query.decode('UTF-8') data_dict_path = json.loads(query) path = data_dict_path['path'] with open(path, encoding='utf8') as f1: # 加载文件的对象 data_dict = json.load(f1) # query_ids.append(json.loads(query)['id']) # texts.append(json.loads(query)['text']) # 拼接若干text 为batch query_id = data_dict['id'] text = data_dict["text"] query_ids.append(query_id) texts.append(text) if len(texts) == batch_size: break outputs = llm.generate(texts, sampling_params) # 调用模型 generated_text_list = [""] * len(texts) print("outputs", len(outputs)) for i, output in enumerate(outputs): index = output.request_id generated_text = output.outputs[0].text generated_text_list[int(index)] = generated_text for (id_, output) in zip(query_ids, generated_text_list): return_text = {"texts": output, "probabilities": None, "status_code": 200} load_result_path = "./new_data_logs/{}.json".format(id_) with open(load_result_path, 'w', encoding='utf8') as f2: # ensure_ascii=False才能输入中文,否则是Unicode字符 # indent=2 JSON数据的缩进,美观 json.dump(return_text, f2, ensure_ascii=False, indent=4) redis_.set(id_, load_result_path,86400) # redis_.set(id_, load_result_path, 30) redis_.srem(db_key_querying, id_) log.log('start at', 'query_id:{},load_result_path:{},return_text:{}'.format( id_, load_result_path, return_text)) if __name__ == '__main__': t = Thread(target=classify, args=(batch_size,)) t.start()