You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

519 lines
17 KiB

import os
from flask import Flask, render_template, request, redirect, url_for, jsonify
from flask import Flask,url_for,redirect,request,render_template,send_from_directory
from werkzeug.utils import secure_filename
app = Flask(__name__)
import time
import re
import requests
import uuid
import socket
# 上传文件存储目录
# UPLOAD_FOLDER = '/home/majiahui/ai_creative_workshop/uploads'
current_path = os.getcwd()
UPLOAD_FOLDER = os.path.join(current_path, 'uploads')
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
# 正则表达式
RE_CHINA_NUMS = "[1-9].(.*)"
# 允许的文件类型
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
fenhao_list = [";", ""]
moahao_list = [":", ""]
prompt_picture_dict = {
"1": "图中的商品:{},有什么突出亮点和卖点,请分条列举出来,要求亮点或者卖点要用一个词总结,冒号后面在进行解释,例如:1. 时尚黑色:图中的鞋子是黑色的,符合时尚潮流,适合不同场合的穿搭。",
"2": "图中的商品:{},有什么亮点,写一段营销话语",
"3": "图中的商品:{},有以下亮点:\n{}\n根据这些优势亮点,写一段营销文本让商品卖的更好",
"4": "图中的商品:{},有哪些不足之处可以改进?",
"5": "图中{}的渲染图做哪些调整可以更吸引消费者,请分条列举,例如:“1.xxx\n2.xxx”",
"6": "根据图中的商品:{},生成五个商品名称,要求商品名称格式中包含的信息(有品牌名,有产品名,有细分产品种类词,比如猫砂,篮球鞋等,有三到五个卖点和形容词)。请分条列举,例如:“1.xxx \n2.xxx \n3.xxx \n4.xxx \n5.xxx”",
# "6": "根据图中的商品:{},生成一个商品名称,要求商品名称格式中包含的信息(有品牌名,有产品名,有细分产品种类词,比如猫砂,篮球鞋等,有三到五个卖点和形容词)"
}
prompt_text_dict = {
"1": "",
"2": "User:商品名称:{};卖点:{},请帮我生成一个有很多活泼表情的小红书文案,以商品使用者角度来写作,让人感觉真实\nAssistant:",
"3": "图中{}有以下亮点:\n{}\n根据这些优势亮点,写一段营销文本让商品买的更好",
"4": "图中{}有哪些不足之处可以改进?",
"5": "图中{}的渲染图做哪些调整可以更吸引消费者",
}
def get_host_ip():
"""
查询本机ip地址
:return: ip
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
def dialog_line_parse(url, text):
"""
将数据输入模型进行分析并输出结果
:param url: 模型url
:param text: 进入模型的数据
:return: 模型返回结果
"""
response = requests.post(
url,
json=text,
timeout=1000
)
if response.status_code == 200:
return response.json()
else:
# logger.error(
# "【{}】 Failed to get a proper response from remote "
# "server. Status Code: {}. Response: {}"
# "".format(url, response.status_code, response.text)
# )
print("{}】 Failed to get a proper response from remote "
"server. Status Code: {}. Response: {}"
"".format(url, response.status_code, response.text))
print(text)
return {}
class log:
def __init__(self):
pass
def log(*args, **kwargs):
format = '%Y/%m/%d-%H:%M:%S'
format_h = '%Y-%m-%d'
value = time.localtime(int(time.time()))
dt = time.strftime(format, value)
dt_log_file = time.strftime(format_h, value)
log_file = 'log_file/access-%s' % dt_log_file + ".log"
if not os.path.exists(log_file):
with open(os.path.join(log_file), 'w', encoding='utf-8') as f:
print(dt, *args, file=f, **kwargs)
else:
with open(os.path.join(log_file), 'a+', encoding='utf-8') as f:
print(dt, *args, file=f, **kwargs)
# def upload():
# """
# #头像上传表单页面
# :return:
# """
# if request.method=='POST':
# #接受头像字段
# avatar=request.files['avatar']
# #判断头像是否上传
# if avatar and allowed_file(avatar.filename):
# filename=random_file(avatar.filename)
# avatar.save(os.path.join(app.config['UPLOAD_FOLDER'],filename))
# return redirect(url_for('uploaded_file',filename=filename))
# return render_template('upload.html')
# 检查文件扩展名
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def picture_model_predict(image_path, prompt):
# query = tokenizer.from_list_format([
# {'image': image_path},
# {'text': prompt},
# ])
#
# response, history = model.chat(tokenizer, query=query, history=None)
# return response
url = "http://{}:19001/predict".format(str(get_host_ip()))
data = {
"path_list": [image_path],
"prompt": prompt
}
result = dialog_line_parse(url, data)["data"]
return result
def text_model_predict(prompt):
# query = tokenizer.from_list_format([
# {'image': image_path},
# {'text': prompt},
# ])
#
# response, history = model.chat(tokenizer, query=query, history=None)
# return response
url = "http://192.168.31.74:12000/predict"
data = {
"texts": prompt,
}
result = dialog_line_parse(url, data)["data"]
return result
def type_1(path_list, commodity, input_type):
code = 200
result_list_len = False
dan_result_geshi = True
dan_result_geshi_maohao = True
return_list = []
prompy_text = prompt_picture_dict[input_type]
prompy_text = prompy_text.format(commodity)
for path in path_list:
cycle_num = 0
while True:
result = picture_model_predict(path, prompy_text)
result_list = str(result).split("\n")
result_list = [i for i in result_list if i != ""]
if len(result_list) > 3:
result_list_len = True
for result_dan in result_list:
dan_maohao = False
response_re = re.findall(RE_CHINA_NUMS, result_dan)
if response_re == []:
dan_result_geshi = False
continue
for maohao in moahao_list:
if maohao in result_dan:
dan_maohao = True
break
if dan_maohao == False:
dan_result_geshi_maohao = False
break
cycle_num += 1
if cycle_num == 4:
return 400, []
if result_list_len == True and dan_result_geshi == True and dan_result_geshi_maohao == True:
break
maidian_list = []
for i in result_list:
response_re = re.findall(RE_CHINA_NUMS, i)
guanjianci = response_re[0].split("")
maidian_list.append([i, guanjianci])
return_list.append(maidian_list)
return code, return_list
def type_2(path_list, commodity, input_type, additional):
code = 200
return_list = []
return_1_data = type_1(path_list, commodity, "1")
maidian = [i[0][1][0] for i in return_1_data]
fenhao = ""
if additional != "":
for i in fenhao_list:
if i in additional:
fenhao = i
break
if fenhao == "":
return code, []
maidian_user = [i for i in additional.split(fenhao) if i != ""]
maidian += maidian_user
prompt_text = prompt_text_dict[input_type].format(commodity, "".join(maidian))
result = text_model_predict(prompt_text)
return_list.append(result)
return code, return_list
def type_3(path_list, commodity, input_type, additional):
code = 200
return_list = []
return_1_data = type_1(path_list, commodity, "1")
maidian = [i[0][1][0] for i in return_1_data]
fenhao = ""
if additional != "":
for i in fenhao_list:
if i in additional:
fenhao = i
break
if fenhao == "":
return code, []
maidian_user = [i for i in additional.split(fenhao) if i != ""]
maidian += maidian_user
prompt_text = prompt_text_dict[input_type].format(commodity, "".join(maidian))
result = text_model_predict(prompt_text)
return_list.append(result)
return code, return_list
def type_4(path_list, commodity, input_type, additional):
code = 200
return_list = []
return_1_data = type_1(path_list, commodity, "1")
maidian = [i[0][1][0] for i in return_1_data]
fenhao = ""
if additional != "":
for i in fenhao_list:
if i in additional:
fenhao = i
break
if fenhao == "":
return code, []
maidian_user = [i for i in additional.split(fenhao) if i != ""]
maidian += maidian_user
prompt_text = prompt_text_dict[input_type].format(commodity, "".join(maidian))
result = text_model_predict(prompt_text)
return_list.append(result)
return code, return_list
def type_5(path_list, commodity, input_type):
code = 200
return_list = []
prompy_text = prompt_picture_dict[input_type]
prompy_text = prompy_text.format(commodity)
result_list_type = False
for path in path_list:
while True:
cycle_num = 0
if result_list_type == True:
break
if cycle_num == 4:
return 400, []
result = picture_model_predict(path, prompy_text)
result_list = str(result).split("\n")
result_list = [i for i in result_list if i != ""]
result_list_new = []
for i in result_list:
response_re = re.findall(RE_CHINA_NUMS, i)
if response_re == []:
continue
else:
result_list_new.append(i)
if result_list_new != []:
result_list_type = True
return_list.append(result_list_new)
return code, return_list
def type_6(path_list, commodity, input_type):
code = 200
return_list = []
commodity_list = []
prompy_text = prompt_picture_dict[input_type]
prompy_text = prompy_text.format(commodity)
result_list_type = False
for path in path_list:
# for i in range(5):
# result = picture_model_predict(path, prompy_text)
# commodity_list.append(result)
# return_list.append(commodity_list)
# ++++++++++++++++++++++++++++++++++++++++++++++++++++
while True:
cycle_num = 0
if result_list_type == True:
break
if cycle_num == 4:
return 400, []
result = picture_model_predict(path, prompy_text)
result_list = str(result).split("\n")
result_list = [i for i in result_list if i != ""]
result_list_new = []
for i in result_list:
response_re = re.findall(RE_CHINA_NUMS, i)
if response_re == []:
continue
else:
result_list_new.append(response_re[0])
if result_list_new != []:
result_list_type = True
return_list.append(result_list_new)
return code, return_list
def type_7(path_list, additional):
code = 200
prompy_text = additional
return_list = []
for path in path_list:
result = picture_model_predict(path, prompy_text)
return_list.append(result)
return code, return_list
def picture_main(path_list, commodity, input_type, additional):
if input_type == "1":
return type_1(path_list, commodity, input_type)
elif input_type == "2":
return type_2(path_list, commodity, input_type, additional)
#
elif input_type == "3":
return type_3(path_list, commodity, input_type, additional)
#
elif input_type == "4":
return type_4(path_list, commodity, input_type, additional)
elif input_type == "5":
return type_5(path_list, commodity, input_type)
elif input_type == "6":
return type_6(path_list, commodity, input_type)
elif input_type == "7":
return type_7(path_list, additional)
else:
return "1111"
def main(file_list, type_str, commodity, additional):
path_list = []
for file in file_list:
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
kuozhan = filename.split(".")[-1]
uuid_picture = str(uuid.uuid1())
filename = ".".join([uuid_picture, kuozhan])
path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(path)
path_list.append(path)
# 业务逻辑
try:
type_list = str(type_str).split(",")
code = 200
result = {
"main": [],
"spilt": []
}
for type_dan in type_list:
slice_dan = []
print("type:", type_dan)
code, result_dan = picture_main(path_list, commodity, type_dan, additional)
if code == 400:
break
if type_dan == "1":
result_dan_new = []
for i in result_dan[0]:
result_dan_new.append(i[0])
slice_dan.append(i[1])
main_dan = [result_dan_new]
else:
main_dan = result_dan
if slice_dan != []:
result["spilt"].append({type_dan: slice_dan})
result["main"].append({type_dan: main_dan})
return_text = {"texts": result, "probabilities": None, "status_code": code}
except:
return_text = {"texts": "运算出错", "probabilities": None, "status_code": 400}
return return_text, path_list
# 文件上传处理
@app.route('/vl_chat_visualization', methods=['GET','POST'])
def vl_chat_visualization():
if request.method == 'POST':
file0 = request.files.get('file0')
file1 = request.files.get('file1')
file2 = request.files.get('file2')
file3 = request.files.get('file3')
file4 = request.files.get('file4')
file5 = request.files.get('file5')
commodity = request.form.get('commodity')
type_str = request.form.get('type')
additional = request.form.get("additional")
file_list = [file0, file1, file2, file3, file4, file5]
# if commodity == False or type_str == False and file0 == False:
# return str(400)
try:
assert file0
except:
return_text = {"texts": "没有主图", "probabilities": None, "status_code": 400}
return jsonify(return_text)
try:
assert commodity
except:
return_text = {"texts": "没有商品类型", "probabilities": None, "status_code": 400}
return jsonify(return_text)
try:
assert type_str
except:
return_text = {"texts": "没有生成类型", "probabilities": None, "status_code": 400}
return jsonify(return_text)
return_text, path_list = main(file_list, type_str, commodity, additional)
log.log('start at',
'filename:{}, commodity:{}, type:{}, additional:{}, result:{}'.format(
str(path_list), commodity, str(type_str), additional, return_text))
return return_text["texts"]["main"][0]["7"][0]
return render_template('upload.html')
@app.route('/vl_chat', methods=['POST'])
def vl_chat():
file0 = request.files.get('file0')
file1 = request.files.get('file1')
file2 = request.files.get('file2')
file3 = request.files.get('file3')
file4 = request.files.get('file4')
file5 = request.files.get('file5')
commodity = request.form.get('commodity')
type_str = request.form.get('type')
additional = request.form.get("additional")
file_list = [file0, file1, file2, file3, file4, file5]
# if commodity == False or type_str == False and file0 == False:
# return str(400)
try:
assert file0
except:
return_text = {"texts": "没有主图", "probabilities": None, "status_code": 400}
return jsonify(return_text)
try:
assert commodity
except:
return_text = {"texts": "没有商品类型", "probabilities": None, "status_code": 400}
return jsonify(return_text)
try:
assert type_str
except:
return_text = {"texts": "没有生成类型", "probabilities": None, "status_code": 400}
return jsonify(return_text)
return_text, path_list = main(file_list, type_str, commodity, additional)
log.log('start at',
'filename:{}, commodity:{}, type:{}, additional:{}, result:{}'.format(
str(path_list), commodity, str(type_str), additional, return_text))
return jsonify(return_text)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=19000, threaded=True)