
3 changed files with 543 additions and 0 deletions
@ -0,0 +1,18 @@ |
|||
# AI创意工厂 |
|||
根据商品图生成一系列商品信息 |
|||
|
|||
# 安装环境: |
|||
conda create -n ai_creative_workshop python=3.9 |
|||
pip install -r requirements.txt |
|||
|
|||
# 启动项目: |
|||
conda activate ai_creative_workshop |
|||
bash run_api.sh |
|||
|
|||
# 备用方案迁移项目 |
|||
python环境包: Miniconda3-py39_22.11.1-1-Linux-x86_64.sh |
|||
下载显卡驱动: NVIDIA-Linux-x86_64-525.116.04.run |
|||
下载cuda: cuda_11.7.1_515.65.01_linux.run |
|||
下载cudnn: cudnn-linux-x86_64-8.5.0.96_cuda11-archive |
|||
|
|||
|
@ -0,0 +1,518 @@ |
|||
import os |
|||
from flask import Flask, render_template, request, redirect, url_for, jsonify |
|||
from flask import Flask,url_for,redirect,request,render_template,send_from_directory |
|||
|
|||
from werkzeug.utils import secure_filename |
|||
app = Flask(__name__) |
|||
import time |
|||
import re |
|||
import requests |
|||
import uuid |
|||
import socket |
|||
|
|||
# 上传文件存储目录 |
|||
# UPLOAD_FOLDER = '/home/majiahui/ai_creative_workshop/uploads' |
|||
current_path = os.getcwd() |
|||
UPLOAD_FOLDER = os.path.join(current_path, 'uploads') |
|||
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
|||
|
|||
# 正则表达式 |
|||
RE_CHINA_NUMS = "[1-9].(.*)" |
|||
# 允许的文件类型 |
|||
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'} |
|||
fenhao_list = [";", ";"] |
|||
moahao_list = [":", ":"] |
|||
|
|||
|
|||
prompt_picture_dict = { |
|||
"1": "图中的商品:{},有什么突出亮点和卖点,请分条列举出来,要求亮点或者卖点要用一个词总结,冒号后面在进行解释,例如:1. 时尚黑色:图中的鞋子是黑色的,符合时尚潮流,适合不同场合的穿搭。", |
|||
"2": "图中的商品:{},有什么亮点,写一段营销话语", |
|||
"3": "图中的商品:{},有以下亮点:\n{}\n根据这些优势亮点,写一段营销文本让商品卖的更好", |
|||
"4": "图中的商品:{},有哪些不足之处可以改进?", |
|||
"5": "图中{}的渲染图做哪些调整可以更吸引消费者,请分条列举,例如:“1.xxx\n2.xxx”", |
|||
"6": "根据图中的商品:{},生成五个商品名称,要求商品名称格式中包含的信息(有品牌名,有产品名,有细分产品种类词,比如猫砂,篮球鞋等,有三到五个卖点和形容词)。请分条列举,例如:“1.xxx \n2.xxx \n3.xxx \n4.xxx \n5.xxx”", |
|||
# "6": "根据图中的商品:{},生成一个商品名称,要求商品名称格式中包含的信息(有品牌名,有产品名,有细分产品种类词,比如猫砂,篮球鞋等,有三到五个卖点和形容词)" |
|||
} |
|||
|
|||
prompt_text_dict = { |
|||
"1": "", |
|||
"2": "User:商品名称:{};卖点:{},请帮我生成一个有很多活泼表情的小红书文案,以商品使用者角度来写作,让人感觉真实\nAssistant:", |
|||
"3": "图中{}有以下亮点:\n{}\n根据这些优势亮点,写一段营销文本让商品买的更好", |
|||
"4": "图中{}有哪些不足之处可以改进?", |
|||
"5": "图中{}的渲染图做哪些调整可以更吸引消费者", |
|||
} |
|||
|
|||
def get_host_ip(): |
|||
""" |
|||
查询本机ip地址 |
|||
:return: ip |
|||
""" |
|||
try: |
|||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
|||
s.connect(('8.8.8.8', 80)) |
|||
ip = s.getsockname()[0] |
|||
finally: |
|||
s.close() |
|||
|
|||
return ip |
|||
|
|||
|
|||
def dialog_line_parse(url, text): |
|||
""" |
|||
将数据输入模型进行分析并输出结果 |
|||
:param url: 模型url |
|||
:param text: 进入模型的数据 |
|||
:return: 模型返回结果 |
|||
""" |
|||
|
|||
response = requests.post( |
|||
url, |
|||
json=text, |
|||
timeout=1000 |
|||
) |
|||
if response.status_code == 200: |
|||
return response.json() |
|||
else: |
|||
# logger.error( |
|||
# "【{}】 Failed to get a proper response from remote " |
|||
# "server. Status Code: {}. Response: {}" |
|||
# "".format(url, response.status_code, response.text) |
|||
# ) |
|||
print("【{}】 Failed to get a proper response from remote " |
|||
"server. Status Code: {}. Response: {}" |
|||
"".format(url, response.status_code, response.text)) |
|||
print(text) |
|||
return {} |
|||
|
|||
|
|||
class log: |
|||
def __init__(self): |
|||
pass |
|||
|
|||
def log(*args, **kwargs): |
|||
format = '%Y/%m/%d-%H:%M:%S' |
|||
format_h = '%Y-%m-%d' |
|||
value = time.localtime(int(time.time())) |
|||
dt = time.strftime(format, value) |
|||
dt_log_file = time.strftime(format_h, value) |
|||
log_file = 'log_file/access-%s' % dt_log_file + ".log" |
|||
if not os.path.exists(log_file): |
|||
with open(os.path.join(log_file), 'w', encoding='utf-8') as f: |
|||
print(dt, *args, file=f, **kwargs) |
|||
else: |
|||
with open(os.path.join(log_file), 'a+', encoding='utf-8') as f: |
|||
print(dt, *args, file=f, **kwargs) |
|||
|
|||
|
|||
# def upload(): |
|||
# """ |
|||
# #头像上传表单页面 |
|||
# :return: |
|||
# """ |
|||
# if request.method=='POST': |
|||
# #接受头像字段 |
|||
# avatar=request.files['avatar'] |
|||
# #判断头像是否上传 |
|||
# if avatar and allowed_file(avatar.filename): |
|||
# filename=random_file(avatar.filename) |
|||
# avatar.save(os.path.join(app.config['UPLOAD_FOLDER'],filename)) |
|||
# return redirect(url_for('uploaded_file',filename=filename)) |
|||
# return render_template('upload.html') |
|||
|
|||
|
|||
# 检查文件扩展名 |
|||
def allowed_file(filename): |
|||
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS |
|||
|
|||
|
|||
def picture_model_predict(image_path, prompt): |
|||
# query = tokenizer.from_list_format([ |
|||
# {'image': image_path}, |
|||
# {'text': prompt}, |
|||
# ]) |
|||
# |
|||
# response, history = model.chat(tokenizer, query=query, history=None) |
|||
# return response |
|||
|
|||
url = "http://{}:19001/predict".format(str(get_host_ip())) |
|||
data = { |
|||
"path_list": [image_path], |
|||
"prompt": prompt |
|||
} |
|||
result = dialog_line_parse(url, data)["data"] |
|||
return result |
|||
|
|||
|
|||
def text_model_predict(prompt): |
|||
# query = tokenizer.from_list_format([ |
|||
# {'image': image_path}, |
|||
# {'text': prompt}, |
|||
# ]) |
|||
# |
|||
# response, history = model.chat(tokenizer, query=query, history=None) |
|||
# return response |
|||
|
|||
url = "http://192.168.31.74:12000/predict" |
|||
data = { |
|||
"texts": prompt, |
|||
} |
|||
result = dialog_line_parse(url, data)["data"] |
|||
return result |
|||
|
|||
|
|||
|
|||
def type_1(path_list, commodity, input_type): |
|||
code = 200 |
|||
result_list_len = False |
|||
dan_result_geshi = True |
|||
dan_result_geshi_maohao = True |
|||
return_list = [] |
|||
prompy_text = prompt_picture_dict[input_type] |
|||
prompy_text = prompy_text.format(commodity) |
|||
for path in path_list: |
|||
|
|||
cycle_num = 0 |
|||
while True: |
|||
result = picture_model_predict(path, prompy_text) |
|||
result_list = str(result).split("\n") |
|||
result_list = [i for i in result_list if i != ""] |
|||
if len(result_list) > 3: |
|||
result_list_len = True |
|||
for result_dan in result_list: |
|||
dan_maohao = False |
|||
response_re = re.findall(RE_CHINA_NUMS, result_dan) |
|||
if response_re == []: |
|||
dan_result_geshi = False |
|||
continue |
|||
for maohao in moahao_list: |
|||
if maohao in result_dan: |
|||
dan_maohao = True |
|||
break |
|||
if dan_maohao == False: |
|||
dan_result_geshi_maohao = False |
|||
break |
|||
|
|||
cycle_num += 1 |
|||
|
|||
if cycle_num == 4: |
|||
return 400, [] |
|||
|
|||
if result_list_len == True and dan_result_geshi == True and dan_result_geshi_maohao == True: |
|||
break |
|||
|
|||
maidian_list = [] |
|||
for i in result_list: |
|||
response_re = re.findall(RE_CHINA_NUMS, i) |
|||
guanjianci = response_re[0].split(":") |
|||
maidian_list.append([i, guanjianci]) |
|||
return_list.append(maidian_list) |
|||
return code, return_list |
|||
|
|||
def type_2(path_list, commodity, input_type, additional): |
|||
code = 200 |
|||
return_list = [] |
|||
return_1_data = type_1(path_list, commodity, "1") |
|||
maidian = [i[0][1][0] for i in return_1_data] |
|||
|
|||
fenhao = "" |
|||
if additional != "": |
|||
for i in fenhao_list: |
|||
if i in additional: |
|||
fenhao = i |
|||
break |
|||
|
|||
if fenhao == "": |
|||
return code, [] |
|||
|
|||
maidian_user = [i for i in additional.split(fenhao) if i != ""] |
|||
maidian += maidian_user |
|||
|
|||
prompt_text = prompt_text_dict[input_type].format(commodity, "、".join(maidian)) |
|||
result = text_model_predict(prompt_text) |
|||
return_list.append(result) |
|||
return code, return_list |
|||
|
|||
def type_3(path_list, commodity, input_type, additional): |
|||
code = 200 |
|||
return_list = [] |
|||
return_1_data = type_1(path_list, commodity, "1") |
|||
maidian = [i[0][1][0] for i in return_1_data] |
|||
|
|||
fenhao = "" |
|||
if additional != "": |
|||
for i in fenhao_list: |
|||
if i in additional: |
|||
fenhao = i |
|||
break |
|||
|
|||
if fenhao == "": |
|||
return code, [] |
|||
|
|||
maidian_user = [i for i in additional.split(fenhao) if i != ""] |
|||
maidian += maidian_user |
|||
|
|||
prompt_text = prompt_text_dict[input_type].format(commodity, "、".join(maidian)) |
|||
result = text_model_predict(prompt_text) |
|||
return_list.append(result) |
|||
return code, return_list |
|||
|
|||
def type_4(path_list, commodity, input_type, additional): |
|||
code = 200 |
|||
return_list = [] |
|||
return_1_data = type_1(path_list, commodity, "1") |
|||
maidian = [i[0][1][0] for i in return_1_data] |
|||
|
|||
fenhao = "" |
|||
if additional != "": |
|||
for i in fenhao_list: |
|||
if i in additional: |
|||
fenhao = i |
|||
break |
|||
|
|||
if fenhao == "": |
|||
return code, [] |
|||
|
|||
maidian_user = [i for i in additional.split(fenhao) if i != ""] |
|||
maidian += maidian_user |
|||
|
|||
prompt_text = prompt_text_dict[input_type].format(commodity, "、".join(maidian)) |
|||
result = text_model_predict(prompt_text) |
|||
return_list.append(result) |
|||
return code, return_list |
|||
|
|||
def type_5(path_list, commodity, input_type): |
|||
code = 200 |
|||
return_list = [] |
|||
prompy_text = prompt_picture_dict[input_type] |
|||
prompy_text = prompy_text.format(commodity) |
|||
result_list_type = False |
|||
|
|||
for path in path_list: |
|||
while True: |
|||
cycle_num = 0 |
|||
if result_list_type == True: |
|||
break |
|||
if cycle_num == 4: |
|||
return 400, [] |
|||
result = picture_model_predict(path, prompy_text) |
|||
result_list = str(result).split("\n") |
|||
result_list = [i for i in result_list if i != ""] |
|||
result_list_new = [] |
|||
for i in result_list: |
|||
response_re = re.findall(RE_CHINA_NUMS, i) |
|||
if response_re == []: |
|||
continue |
|||
else: |
|||
result_list_new.append(i) |
|||
if result_list_new != []: |
|||
result_list_type = True |
|||
return_list.append(result_list_new) |
|||
|
|||
return code, return_list |
|||
|
|||
def type_6(path_list, commodity, input_type): |
|||
code = 200 |
|||
return_list = [] |
|||
commodity_list = [] |
|||
prompy_text = prompt_picture_dict[input_type] |
|||
prompy_text = prompy_text.format(commodity) |
|||
result_list_type = False |
|||
for path in path_list: |
|||
# for i in range(5): |
|||
# result = picture_model_predict(path, prompy_text) |
|||
# commodity_list.append(result) |
|||
# return_list.append(commodity_list) |
|||
# ++++++++++++++++++++++++++++++++++++++++++++++++++++ |
|||
while True: |
|||
cycle_num = 0 |
|||
if result_list_type == True: |
|||
break |
|||
if cycle_num == 4: |
|||
return 400, [] |
|||
result = picture_model_predict(path, prompy_text) |
|||
result_list = str(result).split("\n") |
|||
result_list = [i for i in result_list if i != ""] |
|||
result_list_new = [] |
|||
for i in result_list: |
|||
response_re = re.findall(RE_CHINA_NUMS, i) |
|||
if response_re == []: |
|||
continue |
|||
else: |
|||
result_list_new.append(response_re[0]) |
|||
if result_list_new != []: |
|||
result_list_type = True |
|||
return_list.append(result_list_new) |
|||
|
|||
return code, return_list |
|||
|
|||
|
|||
def type_7(path_list, additional): |
|||
code = 200 |
|||
prompy_text = additional |
|||
return_list = [] |
|||
for path in path_list: |
|||
result = picture_model_predict(path, prompy_text) |
|||
return_list.append(result) |
|||
return code, return_list |
|||
|
|||
|
|||
def picture_main(path_list, commodity, input_type, additional): |
|||
if input_type == "1": |
|||
return type_1(path_list, commodity, input_type) |
|||
|
|||
elif input_type == "2": |
|||
return type_2(path_list, commodity, input_type, additional) |
|||
# |
|||
elif input_type == "3": |
|||
return type_3(path_list, commodity, input_type, additional) |
|||
# |
|||
elif input_type == "4": |
|||
return type_4(path_list, commodity, input_type, additional) |
|||
|
|||
elif input_type == "5": |
|||
return type_5(path_list, commodity, input_type) |
|||
|
|||
elif input_type == "6": |
|||
return type_6(path_list, commodity, input_type) |
|||
|
|||
elif input_type == "7": |
|||
return type_7(path_list, additional) |
|||
|
|||
else: |
|||
return "1111" |
|||
|
|||
|
|||
def main(file_list, type_str, commodity, additional): |
|||
path_list = [] |
|||
for file in file_list: |
|||
if file and allowed_file(file.filename): |
|||
filename = secure_filename(file.filename) |
|||
kuozhan = filename.split(".")[-1] |
|||
uuid_picture = str(uuid.uuid1()) |
|||
filename = ".".join([uuid_picture, kuozhan]) |
|||
path = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
|||
file.save(path) |
|||
path_list.append(path) |
|||
|
|||
# 业务逻辑 |
|||
try: |
|||
type_list = str(type_str).split(",") |
|||
code = 200 |
|||
result = { |
|||
"main": [], |
|||
"spilt": [] |
|||
} |
|||
for type_dan in type_list: |
|||
slice_dan = [] |
|||
|
|||
print("type:", type_dan) |
|||
code, result_dan = picture_main(path_list, commodity, type_dan, additional) |
|||
if code == 400: |
|||
break |
|||
if type_dan == "1": |
|||
result_dan_new = [] |
|||
for i in result_dan[0]: |
|||
result_dan_new.append(i[0]) |
|||
slice_dan.append(i[1]) |
|||
main_dan = [result_dan_new] |
|||
|
|||
else: |
|||
main_dan = result_dan |
|||
|
|||
if slice_dan != []: |
|||
result["spilt"].append({type_dan: slice_dan}) |
|||
result["main"].append({type_dan: main_dan}) |
|||
|
|||
return_text = {"texts": result, "probabilities": None, "status_code": code} |
|||
except: |
|||
return_text = {"texts": "运算出错", "probabilities": None, "status_code": 400} |
|||
return return_text, path_list |
|||
|
|||
|
|||
# 文件上传处理 |
|||
@app.route('/vl_chat_visualization', methods=['GET','POST']) |
|||
def vl_chat_visualization(): |
|||
|
|||
if request.method == 'POST': |
|||
file0 = request.files.get('file0') |
|||
file1 = request.files.get('file1') |
|||
file2 = request.files.get('file2') |
|||
file3 = request.files.get('file3') |
|||
file4 = request.files.get('file4') |
|||
file5 = request.files.get('file5') |
|||
commodity = request.form.get('commodity') |
|||
type_str = request.form.get('type') |
|||
additional = request.form.get("additional") |
|||
file_list = [file0, file1, file2, file3, file4, file5] |
|||
|
|||
# if commodity == False or type_str == False and file0 == False: |
|||
# return str(400) |
|||
try: |
|||
assert file0 |
|||
except: |
|||
return_text = {"texts": "没有主图", "probabilities": None, "status_code": 400} |
|||
return jsonify(return_text) |
|||
|
|||
try: |
|||
assert commodity |
|||
except: |
|||
return_text = {"texts": "没有商品类型", "probabilities": None, "status_code": 400} |
|||
return jsonify(return_text) |
|||
|
|||
try: |
|||
assert type_str |
|||
except: |
|||
return_text = {"texts": "没有生成类型", "probabilities": None, "status_code": 400} |
|||
return jsonify(return_text) |
|||
|
|||
return_text, path_list = main(file_list, type_str, commodity, additional) |
|||
log.log('start at', |
|||
'filename:{}, commodity:{}, type:{}, additional:{}, result:{}'.format( |
|||
str(path_list), commodity, str(type_str), additional, return_text)) |
|||
return return_text["texts"]["main"][0]["7"][0] |
|||
return render_template('upload.html') |
|||
|
|||
|
|||
@app.route('/vl_chat', methods=['POST']) |
|||
def vl_chat(): |
|||
|
|||
file0 = request.files.get('file0') |
|||
file1 = request.files.get('file1') |
|||
file2 = request.files.get('file2') |
|||
file3 = request.files.get('file3') |
|||
file4 = request.files.get('file4') |
|||
file5 = request.files.get('file5') |
|||
commodity = request.form.get('commodity') |
|||
type_str = request.form.get('type') |
|||
additional = request.form.get("additional") |
|||
file_list = [file0, file1, file2, file3, file4, file5] |
|||
|
|||
# if commodity == False or type_str == False and file0 == False: |
|||
# return str(400) |
|||
try: |
|||
assert file0 |
|||
except: |
|||
return_text = {"texts": "没有主图", "probabilities": None, "status_code": 400} |
|||
return jsonify(return_text) |
|||
|
|||
try: |
|||
assert commodity |
|||
except: |
|||
return_text = {"texts": "没有商品类型", "probabilities": None, "status_code": 400} |
|||
return jsonify(return_text) |
|||
|
|||
try: |
|||
assert type_str |
|||
except: |
|||
return_text = {"texts": "没有生成类型", "probabilities": None, "status_code": 400} |
|||
return jsonify(return_text) |
|||
|
|||
return_text, path_list = main(file_list, type_str, commodity, additional) |
|||
log.log('start at', |
|||
'filename:{}, commodity:{}, type:{}, additional:{}, result:{}'.format( |
|||
str(path_list), commodity, str(type_str), additional, return_text)) |
|||
return jsonify(return_text) |
|||
|
|||
|
|||
if __name__ == "__main__": |
|||
app.run(host="0.0.0.0", port=19000, threaded=True) |
@ -0,0 +1,7 @@ |
|||
Flask==2.3.2 |
|||
gevent==23.9.1 |
|||
greenlet==3.0.1 |
|||
gunicorn==21.2.0 |
|||
redis==4.2.0 |
|||
requests==2.31.0 |
|||
|
Loading…
Reference in new issue