You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
295 lines
10 KiB
295 lines
10 KiB
![]()
2 years ago
|
import os
|
||
|
from flask import Flask, render_template, request, redirect, url_for, jsonify
|
||
|
from werkzeug.utils import secure_filename
|
||
|
app = Flask(__name__)
|
||
|
import time
|
||
|
import re
|
||
|
import requests
|
||
|
import uuid
|
||
|
|
||
|
# 上传文件存储目录
|
||
|
UPLOAD_FOLDER = '/home/majiahui/project/ai_creative_workshop/uploads'
|
||
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
|
||
|
|
||
|
# 正则表达式
|
||
|
RE_CHINA_NUMS = "[1-9].(.*)"
|
||
|
# 允许的文件类型
|
||
|
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
|
||
|
|
||
|
|
||
|
prompt_picture = {
|
||
|
"1": "图中的商品:{},有什么突出亮点和卖点,请分条列举出来,要求亮点或者卖点要用一个词总结,冒号后面在进行解释,例如:1. 时尚黑色:图中的鞋子是黑色的,符合时尚潮流,适合不同场合的穿搭。",
|
||
|
"2": "图中的商品:{},有什么亮点,写一段营销话语",
|
||
|
"3": "图中的商品:{},有以下亮点:\n{}\n根据这些优势亮点,写一段营销文本让商品卖的更好",
|
||
|
"4": "图中的商品:{},有哪些不足之处可以改进?",
|
||
|
"5": "图中{}的渲染图做哪些调整可以更吸引消费者",
|
||
|
"6": "根据图中的商品:{},生成一个商品名称,要求商品名称格式中包含的信息(有品牌名,有产品名,有细分产品种类词,比如猫砂,篮球鞋等,有三到五个卖点和形容词)",
|
||
|
}
|
||
|
|
||
|
# prompt_text = {
|
||
|
# "1": "图中{}有什么突出亮点,请列举出来",
|
||
|
# "2": "图中{}有什么亮点,写一段营销话语",
|
||
|
# "3": "图中{}有以下亮点:\n{}\n根据这些优势亮点,写一段营销文本让商品买的更好",
|
||
|
# "4": "图中{}有哪些不足之处可以改进?",
|
||
|
# "5": "图中{}的渲染图做哪些调整可以更吸引消费者",
|
||
|
# "5": "图中{}的渲染图做哪些调整可以更吸引消费者",
|
||
|
# }
|
||
|
|
||
|
|
||
|
def dialog_line_parse(url, text):
|
||
|
"""
|
||
|
将数据输入模型进行分析并输出结果
|
||
|
:param url: 模型url
|
||
|
:param text: 进入模型的数据
|
||
|
:return: 模型返回结果
|
||
|
"""
|
||
|
|
||
|
response = requests.post(
|
||
|
url,
|
||
|
json=text,
|
||
|
timeout=1000
|
||
|
)
|
||
|
if response.status_code == 200:
|
||
|
return response.json()
|
||
|
else:
|
||
|
# logger.error(
|
||
|
# "【{}】 Failed to get a proper response from remote "
|
||
|
# "server. Status Code: {}. Response: {}"
|
||
|
# "".format(url, response.status_code, response.text)
|
||
|
# )
|
||
|
print("【{}】 Failed to get a proper response from remote "
|
||
|
"server. Status Code: {}. Response: {}"
|
||
|
"".format(url, response.status_code, response.text))
|
||
|
print(text)
|
||
|
return {}
|
||
|
|
||
|
|
||
|
class log:
|
||
|
def __init__(self):
|
||
|
pass
|
||
|
|
||
|
def log(*args, **kwargs):
|
||
|
format = '%Y/%m/%d-%H:%M:%S'
|
||
|
format_h = '%Y-%m-%d'
|
||
|
value = time.localtime(int(time.time()))
|
||
|
dt = time.strftime(format, value)
|
||
|
dt_log_file = time.strftime(format_h, value)
|
||
|
log_file = 'log_file/access-%s' % dt_log_file + ".log"
|
||
|
if not os.path.exists(log_file):
|
||
|
with open(os.path.join(log_file), 'w', encoding='utf-8') as f:
|
||
|
print(dt, *args, file=f, **kwargs)
|
||
|
else:
|
||
|
with open(os.path.join(log_file), 'a+', encoding='utf-8') as f:
|
||
|
print(dt, *args, file=f, **kwargs)
|
||
|
|
||
|
|
||
|
# 检查文件扩展名
|
||
|
def allowed_file(filename):
|
||
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
||
|
|
||
|
|
||
|
def picyure_model_predict(image_path, prompt):
|
||
|
# query = tokenizer.from_list_format([
|
||
|
# {'image': image_path},
|
||
|
# {'text': prompt},
|
||
|
# ])
|
||
|
#
|
||
|
# response, history = model.chat(tokenizer, query=query, history=None)
|
||
|
# return response
|
||
|
|
||
|
url = "http://192.168.31.74:19001/predict"
|
||
|
data = {
|
||
|
"path_list": [image_path],
|
||
|
"prompt": prompt
|
||
|
}
|
||
|
result = dialog_line_parse(url, data)["data"]
|
||
|
return result
|
||
|
|
||
|
|
||
|
def picture_main(path_list, commodity, type, additional):
|
||
|
if type == "1":
|
||
|
result_list_len = False
|
||
|
dan_result_geshi = True
|
||
|
dan_result_geshi_maohao = True
|
||
|
|
||
|
prompy_text = prompt_picture[type]
|
||
|
prompy_text = prompy_text.format(commodity)
|
||
|
while True:
|
||
|
result = picyure_model_predict(path_list[0], prompy_text)
|
||
|
result_list = str(result).split("\n")
|
||
|
result_list = [i for i in result_list if i != ""]
|
||
|
if len(result_list) > 3:
|
||
|
result_list_len = True
|
||
|
for i in result_list:
|
||
|
response_re = re.findall(RE_CHINA_NUMS, i)
|
||
|
if response_re == []:
|
||
|
dan_result_geshi = False
|
||
|
continue
|
||
|
if ":" not in i:
|
||
|
dan_result_geshi_maohao = False
|
||
|
continue
|
||
|
if result_list_len == True and dan_result_geshi == True and dan_result_geshi_maohao == True:
|
||
|
break
|
||
|
|
||
|
maidian_list = []
|
||
|
for i in result_list:
|
||
|
response_re = re.findall(RE_CHINA_NUMS, i)
|
||
|
guanjianci = response_re[0].split(":")
|
||
|
maidian_list.append([i, guanjianci])
|
||
|
|
||
|
return maidian_list
|
||
|
|
||
|
elif type == "2":
|
||
|
prompy_text = prompt_picture[type]
|
||
|
prompy_text = prompy_text.format(commodity, additional)
|
||
|
return_list = []
|
||
|
for path in path_list:
|
||
|
result = picyure_model_predict(path, prompy_text)
|
||
|
return_list.append(result)
|
||
|
return return_list
|
||
|
#
|
||
|
elif type == "3":
|
||
|
prompy_text = prompt_picture[type]
|
||
|
prompy_text = prompy_text.format(commodity, additional)
|
||
|
return_list = []
|
||
|
for path in path_list:
|
||
|
result = picyure_model_predict(path, prompy_text)
|
||
|
return_list.append(result)
|
||
|
return return_list
|
||
|
#
|
||
|
elif type == "4":
|
||
|
prompy_text = prompt_picture[type]
|
||
|
prompy_text = prompy_text.format(commodity, additional)
|
||
|
return_list = []
|
||
|
for path in path_list:
|
||
|
for i in range(5):
|
||
|
result = picyure_model_predict(path, prompy_text)
|
||
|
return_list.append(result)
|
||
|
return return_list
|
||
|
|
||
|
elif type == "5":
|
||
|
return_list = []
|
||
|
prompy_text = prompt_picture[type]
|
||
|
prompy_text = prompy_text.format(commodity)
|
||
|
result_list_type = False
|
||
|
|
||
|
for path in path_list:
|
||
|
while True:
|
||
|
if result_list_type == True:
|
||
|
break
|
||
|
result = picyure_model_predict(path, prompy_text)
|
||
|
result_list = str(result).split("\n")
|
||
|
result_list = [i for i in result_list if i != ""]
|
||
|
result_list_new = []
|
||
|
for i in result_list:
|
||
|
response_re = re.findall(RE_CHINA_NUMS, i)
|
||
|
if response_re == []:
|
||
|
continue
|
||
|
else:
|
||
|
result_list_new.append(i)
|
||
|
if result_list_new != []:
|
||
|
result_list_type = True
|
||
|
return_list.append(result_list_new)
|
||
|
|
||
|
return return_list
|
||
|
|
||
|
elif type == "6":
|
||
|
return_list = []
|
||
|
prompy_text = prompt_picture[type]
|
||
|
prompy_text = prompy_text.format(commodity)
|
||
|
for path in path_list:
|
||
|
for i in range(5):
|
||
|
result = picyure_model_predict(path, prompy_text)
|
||
|
return_list.append(result)
|
||
|
return return_list
|
||
|
|
||
|
elif type == "7":
|
||
|
prompy_text = additional
|
||
|
return_list = []
|
||
|
for path in path_list:
|
||
|
result = picyure_model_predict(path, prompy_text)
|
||
|
return_list.append(result)
|
||
|
return return_list
|
||
|
|
||
|
else:
|
||
|
return "1111"
|
||
|
|
||
|
# 文件上传处理
|
||
|
@app.route('/vl_chat', methods=['POST'])
|
||
|
def upload_file():
|
||
|
|
||
|
file0 = request.files.get('file0')
|
||
|
file1 = request.files.get('file1')
|
||
|
file2 = request.files.get('file2')
|
||
|
file3 = request.files.get('file3')
|
||
|
file4 = request.files.get('file4')
|
||
|
file5 = request.files.get('file5')
|
||
|
commodity = request.form.get('commodity')
|
||
|
type_str = request.form.get('type')
|
||
|
additional = request.form.get("additional")
|
||
|
file_list = [file0, file1, file2, file3, file4, file5]
|
||
|
|
||
|
# if commodity == False or type_str == False and file0 == False:
|
||
|
# return str(400)
|
||
|
try:
|
||
|
assert file0
|
||
|
except:
|
||
|
return_text = {"texts": "没有主图", "probabilities": None, "status_code": 400}
|
||
|
return jsonify(return_text)
|
||
|
|
||
|
try:
|
||
|
assert commodity
|
||
|
except:
|
||
|
return_text = {"texts": "没有商品类型", "probabilities": None, "status_code": 400}
|
||
|
return jsonify(return_text)
|
||
|
|
||
|
try:
|
||
|
assert type_str
|
||
|
except:
|
||
|
return_text = {"texts": "没有生成类型", "probabilities": None, "status_code": 400}
|
||
|
return jsonify(return_text)
|
||
|
|
||
|
path_list = []
|
||
|
for file in file_list:
|
||
|
if file and allowed_file(file.filename):
|
||
|
filename = secure_filename(file.filename)
|
||
|
kuozhan = filename.split(".")[-1]
|
||
|
uuid_picture = str(uuid.uuid1())
|
||
|
filename = ".".join([uuid_picture, kuozhan])
|
||
|
path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
||
|
file.save(path)
|
||
|
path_list.append(path)
|
||
|
|
||
|
# 业务逻辑
|
||
|
try:
|
||
|
type_list = str(type_str).split(",")
|
||
|
|
||
|
result = []
|
||
|
for type_dan in type_list:
|
||
|
print("type:", type_dan)
|
||
|
result_dan = picture_main(path_list, commodity, type_dan, additional)
|
||
|
result.append(result_dan)
|
||
|
return_text = {"texts": result, "probabilities": None, "status_code": 200}
|
||
|
except:
|
||
|
return_text = {"texts": "运算出错", "probabilities": None, "status_code": 400}
|
||
|
log.log('start at',
|
||
|
'filename:{}, commodity:{}, type:{}, additional:{}, result:{}'.format(
|
||
|
str(path_list), commodity, str(type_str), additional, return_text))
|
||
|
return jsonify(return_text)
|
||
|
|
||
|
|
||
|
|
||
|
# 无文件上传
|
||
|
# @app.route('/chat', methods=['POST'])
|
||
|
# def upload_file():
|
||
|
#
|
||
|
# type = request.files.get('type')
|
||
|
# describe = request.form.get("describe")
|
||
|
# advantage = request.form.get("dadvantage")
|
||
|
#
|
||
|
# return "1"
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
app.run(host="0.0.0.0", port=19000, threaded=True)
|