import os from flask import Flask, request, render_template_string, redirect, url_for, jsonify from werkzeug.utils import secure_filename from flask_cors import CORS from clickhouse_client import clickhouse_client from config import Config from datetime import datetime app = Flask(__name__) CORS(app) # 允许跨域请求 app.config.from_object(Config) # 配置 UPLOAD_FOLDER = 'uploads' ALLOWED_EXTENSIONS = {'txt'} # 确保上传目录存在 if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER def upload_file(): message = '' filename = '' # 检查是否有文件部分 if 'file' not in request.files: message = '没有文件部分' return_info = { "code": 203, "message": message } return jsonify(return_info) file = request.files['file'] # 如果用户没有选择文件 if file.filename == '': message = '没有选择文件' return_info = { "code": 203, "message": message } return jsonify(return_info) if file and allowed_file(file.filename): # 安全处理文件名 filename = secure_filename(file.filename) # 处理同名文件:添加时间戳 name, ext = os.path.splitext(filename) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{name}_{timestamp}{ext}" # 保存文件 filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) message = '文件上传成功!' return_info = { "code": 203, "message": message } return jsonify(return_info) else: message = '文件类型不允许' def allowed_file(filename): """检查文件扩展名是否允许""" return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS # ========== API路由 ========== @app.route('/') def index(): """首页""" return jsonify({ 'message': 'ClickHouse Flask API', 'endpoints': { 'GET /health': '检查服务健康状态', 'POST /users//records': '创建用户记录', 'POST /users//records/batch': '批量创建记录', 'GET /users//records': '获取用户所有记录', 'GET /users//records/': '获取单条记录', 'PUT /users//records/': '更新记录', 'DELETE /users//records/': '删除单条记录', 'DELETE /users//records': '删除用户所有记录', 'PUT /users//records//restore': '恢复已删除记录', 'GET /users//search': '搜索用户记录', 'GET /users//stats': '获取用户统计信息' } }) @app.route('/health', methods=['GET']) def health_check(): """健康检查""" try: # 测试ClickHouse连接 clickhouse_client.client.execute('SELECT 1') return jsonify({'status': 'healthy', 'database': 'connected'}) except Exception as e: return jsonify({'status': 'unhealthy', 'error': str(e)}), 500 # ========== 用户记录操作 ========== @app.route('/users//records', methods=['POST']) def create_record(user_uuid): """创建用户记录""" try: data = request.get_json() if not data: return jsonify({'error': 'No data provided'}), 400 file_path = data.get('file_path') is_delete = data.get('is_delete', 0) if not file_path: return jsonify({'error': 'file_path is required'}), 400 # 调用更新后的方法,返回record_id record_id = clickhouse_client.create_user_record(user_uuid, file_path, is_delete) if record_id: return jsonify({ 'message': 'Record created successfully', 'user_uuid': user_uuid, 'record_id': record_id, 'file_path': file_path }), 201 else: return jsonify({'error': 'Failed to create record'}), 500 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/users//records/batch', methods=['POST']) def batch_create_records(user_uuid): """批量创建用户记录""" try: data = request.get_json() if not data or not isinstance(data, list): return jsonify({'error': 'Data must be a list of records'}), 400 # 为每条记录添加user_uuid records = [] for item in data: if not isinstance(item, dict): continue record = { 'user_uuid': user_uuid, 'file_path': item.get('file_path', ''), 'is_delete': item.get('is_delete', 0) } if record['file_path']: # 只添加有文件路径的记录 records.append(record) if not records: return jsonify({'error': 'No valid records provided'}), 400 # 调用更新后的方法,返回record_ids record_ids = clickhouse_client.batch_create_records(records) if record_ids: return jsonify({ 'message': f'{len(record_ids)} records created successfully', 'user_uuid': user_uuid, 'record_ids': record_ids, 'count': len(record_ids) }), 201 else: return jsonify({'error': 'Failed to create records'}), 500 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/users//records', methods=['GET']) def get_records(user_uuid): """获取用户的所有记录""" try: include_deleted = request.args.get('include_deleted', 'false').lower() == 'true' records = clickhouse_client.get_user_records(user_uuid, include_deleted) return jsonify({ 'user_uuid': user_uuid, 'count': len(records), 'records': records }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/users//records/', methods=['GET']) def get_single_record(user_uuid, record_id): """获取单条记录""" try: record = clickhouse_client.get_user_record_by_id(record_id) if not record: return jsonify({'error': 'Record not found'}), 404 if record['user_uuid'] != user_uuid: return jsonify({'error': 'Record does not belong to this user'}), 403 return jsonify(record) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/users//records/', methods=['PUT']) def update_record(user_uuid, record_id): """更新记录""" try: # 先检查记录是否存在且属于该用户 record = clickhouse_client.get_user_record_by_id(record_id) if not record: return jsonify({'error': 'Record not found'}), 404 if record['user_uuid'] != user_uuid: return jsonify({'error': 'Record does not belong to this user'}), 403 data = request.get_json() if not data: return jsonify({'error': 'No data provided'}), 400 file_path = data.get('file_path') is_delete = data.get('is_delete') success = clickhouse_client.update_user_record(record_id, file_path, is_delete) if success: return jsonify({ 'message': 'Record updated successfully', 'record_id': record_id, 'user_uuid': user_uuid }) else: return jsonify({'error': 'Failed to update record'}), 500 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/users//records/', methods=['DELETE']) def delete_record(user_uuid, record_id): """删除单条记录""" try: # 先检查记录是否存在且属于该用户 record = clickhouse_client.get_user_record_by_id(record_id) if not record: return jsonify({'error': 'Record not found'}), 404 if record['user_uuid'] != user_uuid: return jsonify({'error': 'Record does not belong to this user'}), 403 # 获取删除类型参数 soft_delete = request.args.get('soft_delete', 'true').lower() == 'true' success = clickhouse_client.delete_user_record(record_id, soft_delete) if success: action = 'soft deleted' if soft_delete else 'permanently deleted' return jsonify({ 'message': f'Record {action} successfully', 'record_id': record_id, 'user_uuid': user_uuid }) else: return jsonify({'error': 'Failed to delete record'}), 500 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/users//records', methods=['DELETE']) def delete_all_records(user_uuid): """删除用户的所有记录""" try: # 获取删除类型参数 soft_delete = request.args.get('soft_delete', 'true').lower() == 'true' success = clickhouse_client.delete_all_user_records(user_uuid, soft_delete) if success: action = 'soft deleted' if soft_delete else 'permanently deleted' return jsonify({ 'message': f'All records for user {user_uuid} have been {action}', 'user_uuid': user_uuid }) else: return jsonify({'error': 'Failed to delete records'}), 500 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/users//records//restore', methods=['PUT']) def restore_record(user_uuid, record_id): """恢复已删除的记录""" try: # 先检查记录是否存在 record = clickhouse_client.get_user_record_by_id(record_id) if not record: return jsonify({'error': 'Record not found'}), 404 if record['user_uuid'] != user_uuid: return jsonify({'error': 'Record does not belong to this user'}), 403 success = clickhouse_client.restore_user_record(record_id) if success: return jsonify({ 'message': 'Record restored successfully', 'record_id': record_id, 'user_uuid': user_uuid }) else: return jsonify({'error': 'Failed to restore record'}), 500 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/users//search', methods=['GET']) def search_records(user_uuid): """搜索用户记录""" try: keyword = request.args.get('keyword') is_delete_param = request.args.get('is_delete') is_delete = None if is_delete_param is not None: is_delete = int(is_delete_param) records = clickhouse_client.search_user_records(user_uuid, keyword, is_delete) return jsonify({ 'user_uuid': user_uuid, 'count': len(records), 'records': records }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/users//stats', methods=['GET']) def get_user_stats(user_uuid): """获取用户统计信息""" try: stats = clickhouse_client.get_user_stats(user_uuid) if stats: return jsonify({ 'user_uuid': user_uuid, 'stats': stats }) else: return jsonify({ 'user_uuid': user_uuid, 'stats': { 'total_count': 0, 'active_count': 0, 'deleted_count': 0, 'first_created': None, 'last_created': None } }) except Exception as e: return jsonify({'error': str(e)}), 500 # ========== 错误处理 ========== @app.errorhandler(404) def not_found(error): return jsonify({'error': 'Not found'}), 404 @app.errorhandler(500) def internal_error(error): return jsonify({'error': 'Internal server error'}), 500 # ========== 主程序 ========== if __name__ == '__main__': # 创建上传目录(如果需要) os.makedirs('uploads', exist_ok=True) # 启动Flask应用 app.run( host='0.0.0.0', port=28002, debug=Config.DEBUG )