#coding:utf-8
import os
from flask import Flask, jsonify
from flask import request
import requests
import time
import socket
import re
import random
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
from selenium.webdriver.common.action_chains import ActionChains
import redis
import json
import socket
import socks
from selenium.webdriver.chrome.service import Service
import concurrent.futures
from retrying import retry
pool = redis.ConnectionPool(host='localhost', port=63179, max_connections=100, db=3, password="zhicheng123*")
redis_ = redis.Redis(connection_pool=pool, decode_responses=True)
app = Flask(__name__)
app.config["JSON_AS_ASCII"] = False
def get_host_ip():
"""
查询本机ip地址
:return: ip
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
class log:
def __init__(self):
pass
def log(*args, **kwargs):
format = '%Y/%m/%d-%H:%M:%S'
format_h = '%Y-%m-%d'
value = time.localtime(int(time.time()))
dt = time.strftime(format, value)
dt_log_file = time.strftime(format_h, value)
log_file = 'log_file/access-%s' % dt_log_file + ".log"
if not os.path.exists(log_file):
with open(os.path.join(log_file), 'w', encoding='utf-8') as f:
print(dt, *args, file=f, **kwargs)
else:
with open(os.path.join(log_file), 'a+', encoding='utf-8') as f:
print(dt, *args, file=f, **kwargs)
prompt_sys = "<|im_start|>user\n{}<|im_end|>\n<|im_start|>assistant\n\n\n\n\n"
db_key_query_ip = 'query_ip'
db_key_query_user = 'query_user'
# 正则模式列表
patterns_title = [
r"根据论文题目《(.*)》,目录是",
r"根据论文题目“(.*)”和目录"
]
patterns_mulu = [
r"目录是“(.*)”,为小标题",
r"目录“(.*)”,为小标题"
]
patterns_small_title = [
r"为小标题“(.*)”填充"
]
chatgpt_url_predict = "http://{}:12001/predict".format(str(get_host_ip()))
chatgpt_url_search = "http://{}:12001/search".format(str(get_host_ip()))
prompt_need = "我是一个博士生,我在写一篇论文,请根据论文题目“{}”和目录“{}”,来判断一下生成目录中“{}”这个小标题对应的内容时,是否需要联网查询相关的背景知识,如果当前小标题跟当前论文关系比较大就不需要,如果当前小标题需要比较强的背景知识就需要,请回答“需要”,或者“不需要”,只需要简单回答这几个字,不要有多余的回答"
prompt_search = "我是一个博士生,我在写一篇论文,我现在已经有论文题目“{}”和目录“{}”,我需要写“{}”这个小标题对应的内容时,需要联网查询相关的背景知识,请帮我生成一个可以放到百度或者google搜索框的问法,通过这个问法问搜索引擎产生的结果可以对我写这个小标题的内容时有所帮助,只需要生成一个可以直接放到百度搜索框的问法,不要生成其他内容"
prompt_panduan = "{}\n这段文本是我从互联网的网页中提取的文本内容,请阅读上面文本,我需要参考上面的文本来帮助我写我论文中的某一段落,我需要完成的论文题目是“{}”,目录是“{}”,需要完成的论文段落标题是“{}”,请帮我判断一下,上面的文本是否对我写这个段落有一定的参考作用,是否可以帮到我,因为有的时候提取的文本是一些垃圾文本,我需要排除这些文本,如果有帮助就回答“有效”,如果没有帮助就回答“无效”,“有效”或者“无效”的判断是根据这个小标题和这篇文章是否跟上面提取的文本是否有关联性,因为有很多反爬虫手段会对网页有限制,导致内容不可用,或者是一些验证码之类的信息,所以需要解释可用或者不可用的原因"
url_ceshi = "http://www.baidu.com"
# 正则提取
def extract_first_match(patterns, text):
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
# 返回第一个非空捕获组的内容
for group in match.groups():
if group:
return group.strip()
return ""
# 初始化redis ip队列
def initialization_ip():
redis_.delete(db_key_query_ip)
for i in range(20):
proxy = ip_gen() # 113.76.193.198:2763
time_pont = int(time.time()) + (60 * 4)
redis_.rpush(db_key_query_ip, json.dumps({"ip": proxy, "time_pont": time_pont})) # 加入redis
@retry(stop_max_attempt_number=10, wait_exponential_multiplier=1000, wait_exponential_max=10000)
def get_paid_proxies():
url = f"http://proxy.siyetian.com/apis_get.html?token=AesJWLNp2a65kaJdXTqFFeNpWT35ERNpnTn1STqFUeORUR31kaNh3TUl0dPRUQy4ERJdXT6lVN.AO4YDN2ADM1cTM&limit=1&type=1&time=&data_format=json"
response = requests.get(url)
return response.json() # 返回格式 ['ip:port', ...]
def ip_gen():
proxies_dict = get_paid_proxies()
ip = proxies_dict['data'][0]['ip']
port = proxies_dict['data'][0]['port']
return "{}:{}".format(ip, port)
# 随机鼠标移动(模拟人类操作)
def human_like_movement(driver):
try:
action = ActionChains(driver)
# 获取窗口大小并计算安全区域
window_size = driver.get_window_size()
safe_width = window_size['width'] - 20
safe_height = window_size['height'] - 20
# 起始位置设为窗口中心
start_x = safe_width // 2
start_y = safe_height // 2
# 生成随机移动路径
for _ in range(random.randint(2, 5)):
# 限制移动范围在安全区域内
offset_x = random.randint(-100, 100)
offset_y = random.randint(-100, 100)
target_x = max(10, min(start_x + offset_x, safe_width))
target_y = max(10, min(start_y + offset_y, safe_height))
# 使用更安全的移动方式
action.move_by_offset(
target_x - start_x,
target_y - start_y
).pause(random.uniform(0.1, 0.5))
start_x, start_y = target_x, target_y
action.perform()
except Exception as e:
print(f"鼠标移动模拟失败: {e}")
# 失败时执行简单滚动作为后备
driver.execute_script("window.scrollBy(0, 200);")
time.sleep(random.uniform(0.5, 1.5))
def socks_proxy(proxy_host, proxy_port):
"""验证 SOCKS5 代理是否可用(不修改全局设置)"""
try:
# 创建一个新的 socks 套接字(不修改全局 socket)
s = socks.socksocket()
s.set_proxy(socks.SOCKS5, proxy_host, proxy_port)
s.settimeout(10) # 设置超时
# 测试连接(这里用百度作为测试目标)
s.connect(('www.baidu.com', 80))
s.close() # 关闭测试连接
return True
except Exception as e:
return False
def driver_config():
print("driver开始")
options = Options()
# 新版无头模式
options.add_argument("--headless=new")
options.add_argument("--disable-gpu")
options.add_argument("--window-size=1280,720") # 更合理的默认大小
# 反检测设置
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
# 添加随机用户代理
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.59",
"Mozilla/5.0 (iPhone; CPU iPhone OS 15_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.5 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (Linux; Android 12; SM-S906N Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/80.0.3987.119 Mobile Safari/537.36"
]
options.add_argument(f"user-agent={random.choice(user_agents)}")
# 其他优化
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--no-sandbox")
# while True:
# if redis_.llen(db_key_query_ip) == 0: # 若队列中没有元素就继续获取
# time.sleep(1)
# continue
# else:
# query = redis_.lpop(db_key_query_ip).decode('UTF-8') # 获取query的text
# break
while True:
query = redis_.lpop(db_key_query_ip)
if query != None:
break
else:
time.sleep(1)
continue
# TODO 需要增加没有代理ip报警
# 控制代理ip变化 query = {"ip": proxy, "time_pont": time_pont}
data_dict_path = json.loads(query)
proxy = data_dict_path["ip"]
time_pont = data_dict_path["time_pont"]
new_time_pont = time.time()
if new_time_pont > time_pont:
print("新增ip")
proxy = ip_gen()
time_pont = int(time.time()) + (60 * 4)
redis_.rpush(db_key_query_ip, json.dumps({"ip": proxy, "time_pont": time_pont})) # 加入redis
else:
proxy_host = str(proxy).split(":")[0]
proxy_port = str(proxy).split(":")[1]
bool_ = socks_proxy(proxy_host, int(proxy_port))
if bool_ == True:
time.sleep(1)
redis_.rpush(db_key_query_ip, json.dumps({"ip": proxy, "time_pont": time_pont})) # 加入redis
else:
print("新增ip")
proxy = ip_gen()
time_pont = int(time.time()) + (60 * 4)
redis_.rpush(db_key_query_ip, json.dumps({"ip": proxy, "time_pont": time_pont})) # 加入redis
print(proxy)
options.add_argument(f"--proxy-server=socks5://{proxy}")
driver_path = "/home/majiahui/.cache/selenium/chromedriver/linux64/137.0.7151.119/chromedriver"
service = Service(executable_path=driver_path)
# 初始化driver
driver = webdriver.Chrome(service=service, options=options)
# 设置更完善的防检测脚本
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
'source': '''
Object.defineProperty(navigator, 'webdriver', {
get: () => false
});
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3]
});
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh']
});
'''
})
print("driver结束")
return driver, proxy
@retry(stop_max_attempt_number=10, wait_exponential_multiplier=1000, wait_exponential_max=10000)
def bing(query, driver):
print("bing请求开始")
# human_like_movement(driver)
return_data = []
url = f"https://www.bing.com/search?q={query}"
try:
# 随机延迟
time.sleep(random.uniform(1, 3))
driver.get(url)
# 模拟人类滚动行为
for _ in range(random.randint(1, 3)):
ActionChains(driver).scroll_by_amount(
random.randint(200, 500),
random.randint(200, 500)
).perform()
time.sleep(random.uniform(0.5, 2))
# 随机等待
time.sleep(random.uniform(2, 5))
# 获取页面内容
soup = BeautifulSoup(driver.page_source, "html.parser")
results = soup.find_all("li", class_="b_algo")
# 提取并打印结果
for result in results[:10]:
title_tag = result.find('h2')
print("title_tag", title_tag)
title = title_tag.get_text(strip=True) if title_tag else "无标题"
link = ""
# 方式1:直接查找a标签的href
link_tag = title_tag.find('a', href=True) # 只找有href属性的a标签
if link_tag:
link = link_tag['href']
else:
# 方式2:查找父级或特定class的a标签
parent_link = title_tag.find_parent('a', href=True)
if parent_link:
link = parent_link['href']
desc_tag = result.find('p', class_='b_lineclamp2') or result.find('p', class_='b_lineclamp3')
desc = desc_tag.get_text(strip=True) if desc_tag else "无描述"
return_data.append({
"title": title,
"link": link,
"desc": desc
})
finally:
# 确保浏览器关闭
time.sleep(random.uniform(1, 3))
driver.quit()
return return_data
def check_problems(input, output):
pantten_formula = r'\\\[.*?\\\]'
pantten_picture = r'.*?'
pantten_tb = r'.*?'
error_data = ""
# 判断是否是小标题任务
if "任务:生成论文小标题内容" in input:
# 判断公式
formula_bool_list = re.findall(pantten_formula, output, re.DOTALL)
tb_bool_list = re.findall(pantten_tb, output, re.DOTALL)
picture_bool_list = re.findall(pantten_picture, output, re.DOTALL)
if "数学公式用\\[\\]进行包裹" not in input and formula_bool_list != []:
error_data += "多生成公式问题:\n"
error_data += "input:\n"
error_data += input
error_data += "output:\n"
error_data += output
error_data += "\n========================================================================\n"
# 判断公式
if "表格部分开始必须用标识,表格部分结束必须用标识,必须返回html格式的表格" not in input and tb_bool_list != []:
error_data += "多生成表格问题:\n"
error_data += "input:\n"
error_data += input
error_data += "output:\n"
error_data += output
error_data += "\n========================================================================\n"
if "图片要求在文字中插入一张图" not in input and picture_bool_list != []:
error_data += "多生成图片问题:\n"
error_data += "input:\n"
error_data += input
error_data += "output:\n"
error_data += output
error_data += "\n========================================================================\n"
if error_data != "":
with open("logs/error_xiaobiaoti.log", "a", encoding="utf-8") as f:
f.write(error_data)
def return_type(input, output):
pantten_formula = r'\\\[.*?\\\]'
pantten_picture = r'.*?'
pantten_tb = r'.*?'
return_type_list = []
# 判断是否是小标题任务
if "任务:生成论文小标题内容" in input:
# 判断表格
tb_bool_list = re.findall(pantten_tb, output, re.DOTALL)
formula_bool_list = re.findall(pantten_formula, output, re.DOTALL)
picture_bool_list = re.findall(pantten_picture, output, re.DOTALL)
if tb_bool_list != []:
return_type_list.append("1")
if formula_bool_list != []:
return_type_list.append("2")
if picture_bool_list != []:
return_type_list.append("3")
return return_type_list
@retry(stop_max_attempt_number=10, wait_exponential_multiplier=1000, wait_exponential_max=10000)
def request_api_chatgpt(content, model, top_p, temperature):
data = {
"content": content,
"model": model,
"top_p": top_p,
"temperature": temperature
}
response = requests.post(
chatgpt_url_predict,
json=data,
timeout=100000
)
if response.status_code == 200:
return response.json()
else:
# logger.error(
# "【{}】 Failed to get a proper response from remote "
# "server. Status Code: {}. Response: {}"
# "".format(url, response.status_code, response.text)
# )
print("Failed to get a proper response from remote "
"server. Status Code: {}. Response: {}"
"".format(response.status_code, response.text))
return {}
@retry(stop_max_attempt_number=10, wait_exponential_multiplier=1000, wait_exponential_max=10000)
def uuid_search(uuid):
data = {
"id": uuid
}
response = requests.post(
chatgpt_url_search,
json=data,
timeout=100000
)
if response.status_code == 200:
return response.json()
else:
# logger.error(
# "【{}】 Failed to get a proper response from remote "
# "server. Status Code: {}. Response: {}"
# "".format(url, response.status_code, response.text)
# )
print("Failed to get a proper response from remote "
"server. Status Code: {}. Response: {}"
"".format(response.status_code, response.text))
return {}
def uuid_search_mp(results):
results_list = [""] * len(results)
while True:
tiaochu_bool = True
for i in results_list:
if i == "":
tiaochu_bool = False
break
if tiaochu_bool == True:
break
for i in range(len(results)):
uuid = results[i]["texts"]["id"]
result = uuid_search(uuid)
if result["code"] == 200:
results_list[i] = result["text"]
time.sleep(3)
return results_list
def get_content(url):
driver, proxy = driver_config()
try:
driver.get(url)
# 添加人类行为模拟
time.sleep(3) # 等待页面加载
# 滚动页面
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(1)
text = driver.find_element("tag name", "body").text
driver.quit()
print("网页内容为", text)
except:
print("==========================")
text = "无可提取内容"
return text
def gen_prompt(prompt_content, model, top_p, temperature):
if "任务:生成论文小标题内容" in prompt_content:
title = extract_first_match(patterns_title, prompt_content)
mulu = extract_first_match(patterns_mulu, prompt_content)
small_title = extract_first_match(patterns_small_title, prompt_content)
prompt_input = prompt_need.format(title, mulu, small_title)
prompt_input = prompt_sys.format(prompt_input)
uid = request_api_chatgpt(prompt_input, model, top_p, temperature)
is_need = uuid_search_mp([uid])[0]
# is_need = request_api_chatgpt(prompt_input)["choices"][0]["message"]["content"]
if "不需要" in is_need:
print("不需要")
return prompt_content
else:
t1 = time.time()
# try:
print("title", title)
print("mulu", mulu)
print("small_title", small_title)
prompt_input = prompt_search.format(title, mulu, small_title)
prompt_input = prompt_sys.format(prompt_input)
uid = request_api_chatgpt(prompt_input, model, top_p, temperature)
search = uuid_search_mp([uid])[0]
# search = request_api_chatgpt(prompt_input)["choices"][0]["message"]["content"]
search = str(search).strip("“").strip("”")
print("查询问题", search)
# query = "共享交通出行者特征及其分担率预测研究"
driver, proxy = driver_config()
try:
data_list = bing(str(search).strip("“").strip("”"), driver)
except:
print("*************************************************")
driver.quit()
print("请求bing出错")
print("请求ip:{}, 问题:{}".format(proxy, search))
print("*************************************************")
return prompt_content
print(data_list)
t2 = time.time()
print("查询bing用时:", str(t2-t1))
main_content_zong = ""
text_zongjie_list = []
# for i in range(len(data_list)):
# driver, proxy = driver_config()
# html_content = get_content(data_list[i]['link'], driver)
# text = "先看文本“{}”请总结一下上面文字的主要内容,尤其介绍到技术点的时候需要介绍的仔细一点".format(html_content)
# text = prompt_sys.format(text)
# text_zongjie_list.append(text)
input_url = []
for i in range(len(data_list)):
input_url.append(data_list[i]['link'])
with concurrent.futures.ThreadPoolExecutor(64) as executor:
# 使用map方法并发地调用worker_function
html_content_list = list(executor.map(get_content, input_url))
# 提取每个网页
t3 = time.time()
print("提取每个网页用时", str(t3-t2))
print(html_content_list)
for html_content in html_content_list:
text = "先看文本“{}”请总结一下上面文字的主要内容,尤其介绍到技术点的时候需要介绍的仔细一点".format(html_content[:15000])
text = prompt_sys.format(text)
text_zongjie_list.append(text)
nums = len(text_zongjie_list)
print("网页个数:", str(nums))
model_list = ["openbuddy-qwen2.5llamaify-7b_train_11_prompt_mistral_gpt_xiaobiaot_real_paper_2"] * nums
top_p_list = [0.7] * nums
temperature_list = [0.3] * nums
# TODO
# uid = request_api_chatgpt(text, model, top_p, temperature)
# main_content = uuid_search_mp([uid])[0]
with concurrent.futures.ThreadPoolExecutor(64) as executor:
# 使用map方法并发地调用worker_function
results_1 = list(executor.map(request_api_chatgpt, text_zongjie_list, model_list, top_p_list, temperature_list))
with concurrent.futures.ThreadPoolExecutor(64) as executor:
# 使用map方法并发地调用worker_function
results = list(executor.map(uuid_search_mp, [results_1]))
t4 = time.time()
print("生成主要内容:", str(t4 - t3))
text_panduan_list = []
main_content_list = []
for main_content in results[0]:
main_content_list.append(main_content)
text = prompt_panduan.format(main_content, title, mulu, small_title)
text = prompt_sys.format(text)
text_panduan_list.append(text)
# TODO
# uid = request_api_chatgpt(text, model, top_p, temperature)
# panduan = uuid_search_mp([uid])[0]
# panduan = request_api_chatgpt(input_)["choices"][0]["message"]["content"]
nums = len(text_panduan_list)
model_list = ["openbuddy-qwen2.5llamaify-7b_train_11_prompt_mistral_gpt_xiaobiaot_real_paper_2"] * nums
top_p_list = [0.7] * nums
temperature_list = [0.3] * nums
with concurrent.futures.ThreadPoolExecutor(64) as executor:
# 使用map方法并发地调用worker_function
results_1 = list(
executor.map(request_api_chatgpt, text_panduan_list, model_list, top_p_list, temperature_list))
with concurrent.futures.ThreadPoolExecutor(64) as executor:
# 使用map方法并发地调用worker_function
results = list(executor.map(uuid_search_mp, [results_1]))
print("判断有效无效", results[0])
index = 1
for i in range(len(results[0])):
panduan = results[0][i]
print("panduan", results[0][i])
panduan = str(panduan).strip("\n")
bool_text = str(panduan).split("\n")[0]
if "有效" in bool_text:
print("bool_text", True)
# main_content = request_api_chatgpt(text)["choices"][0]["message"]["content"]
main_content_zong += "### 第{}篇文章".format(str(index))
index += 1
main_content_zong += main_content_list[i]
main_content_zong += "\n"
print("link", data_list[i]['link'])
print(main_content_list[i])
print("====================================================================")
else:
print("bool_text", False)
continue
if main_content_zong != "":
split_content = "要求:根据论文题目"
content_zong = prompt_content.split(split_content)
content_0 = content_zong[0]
content_1 = content_zong[1]
prompt_main_content = "已经查到的信息:\n{}”".format(main_content_zong[:16000])
prompt_small_title_content = content_0 + "要求:根据{}\n论文题目".format(prompt_main_content) + content_1
print(prompt_small_title_content)
print("+++++++++++++++++++")
return prompt_small_title_content
else:
return prompt_content
else:
return prompt_content
@app.route("/predict", methods=["POST"])
def handle_query():
print(request.remote_addr)
data = request.get_json()
model = data["model"]
messages = data["messages"]
top_p = data.get("top_p", 1.0) # 默认值1.0
temperature = data.get("temperature", 0.7) # 默认值0.7
online_query = data.get("online_query", None) #
user_uuid = data.get("user_uuid", None) #
paper_format = data.get("paper_format", None) #
print(model)
print(messages)
print(top_p)
print(temperature)
print(online_query)
# "messages": [
# {"role": "user", "content": "你好"},
# {"role": "assistant", "content": "你好!有什么我可以帮助你的吗?"},
# # {"role": "user", "content": prompt}
# {"role": "user", "content": "一张信用卡为多个gpt4账号付费会风控吗"}
# ],
# text = "User: " + messages[-1]["content"] + "\nAssistant:"
if online_query == None:
prompt_paper = messages[-1]["content"]
else:
prompt_paper = gen_prompt(messages[-1]["content"], model, top_p, temperature)
content = prompt_sys.format(prompt_paper)
print("content", content)
uid = request_api_chatgpt(content, model, top_p, temperature)
results = uuid_search_mp([uid])[0]
# 检查输入输出
check_problems(messages[0]["content"], results)
return_type_list = return_type(messages[0]["content"], results)
log.log('start at',
'prompt_paper:{},results:{},return_type_list:{}'.format(
prompt_paper, results, str(return_type_list)))
return_text = {
'code': 200,
'id': uid["texts"]["id"],
'object': 0,
'created': 0,
'model': model,
'choices': [
{
'index': 0,
'message': {
'role': 'assistant',
'content': results
},
'logprobs': None,
'finish_reason': 'stop'
}
],
'return_type_list': return_type_list,
'usage': 0,
'system_fingerprint': 0
}
# redis_.rpush(db_key_query, json.dumps({"ip": proxy, "time_pont": time_pont})) # 加入redis
#
return jsonify(return_text)
if __name__ == '__main__':
initialization_ip()
app.run(host="0.0.0.0", port=12004, threaded=True, debug=False)