Spaces:
Sleeping
Sleeping
File size: 6,164 Bytes
be25939 99ba41d be25939 99ba41d be25939 99ba41d be25939 99ba41d be25939 99ba41d be25939 99ba41d be25939 99ba41d be25939 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
from flask import Blueprint, request, jsonify, session, current_app
from app.models.user import User
from app.utils.decorators import login_required, admin_required
from bson.objectid import ObjectId
user_bp = Blueprint('users', __name__)
def get_mongo():
return current_app.extensions['pymongo'][0]
def get_bcrypt():
return current_app.extensions['bcrypt']
@user_bp.route('/profile', methods=['GET'])
def get_profile():
"""Get current user profile"""
try:
user_id = session['user_id']
user = User.find_by_id(user_id)
if not user:
return jsonify({"error": "User not found"}), 404
# Remove sensitive data
user_data = {
"id": str(user['_id']),
"email": user['email'],
"first_name": user['first_name'],
"last_name": user['last_name'],
"role": user.get('role', 'customer'),
"created_at": user.get('created_at'),
"email_verified": user.get('email_verified', False),
"last_login": user.get('last_login')
}
return jsonify({"user": user_data}), 200
except Exception as e:
return jsonify({"error": "Failed to get profile", "details": str(e)}), 500
@user_bp.route('/profile', methods=['PUT'])
def update_profile():
"""Update current user profile"""
try:
data = request.get_json()
# Fields that can be updated
required_fields = ['id']
for field in required_fields:
if field not in data or not data[field]:
return jsonify({"error": f"{field} is required"}), 400
allowed_fields = ['name', 'email','phone', 'address']
update_data = {}
for field in allowed_fields:
if field in data and data[field]:
update_data[field] = data[field]
id = data['id']
# Update user
success = User.update_user(id, update_data)
if not success:
return jsonify({"error": "Failed to update profile"}), 500
# Get updated user data
user = User.find_by_id(id)
user_data = {
"id": user['_id'],
"email": user['email'],
"name": user['name'],
"phone": user['phone'],
"address": user['address']
}
return jsonify({
"message": "Profile updated successfully",
"user": user_data
}), 200
except Exception as e:
return jsonify({"error": "Failed to update profile", "details": str(e)}), 500
@user_bp.route('/change-password', methods=['PUT'])
@login_required
def change_password():
"""Change user password"""
try:
user_id = session['user_id']
data = request.get_json()
# Validate required fields
if not data.get('current_password') or not data.get('new_password'):
return jsonify({"error": "Current password and new password are required"}), 400
current_password = data['current_password']
new_password = data['new_password']
# Get current user
user = User.find_by_id(user_id)
if not user:
return jsonify({"error": "User not found"}), 404
# Verify current password
if not User.check_password(user['password'], current_password):
return jsonify({"error": "Current password is incorrect"}), 400
# Validate new password
is_valid, message = User.validate_password(new_password)
if not is_valid:
return jsonify({"error": message}), 400
# Hash new password
bcrypt = get_bcrypt()
new_password_hash = bcrypt.generate_password_hash(new_password).decode('utf-8')
# Update password
success = User.update_user(user_id, {"password": new_password_hash})
if not success:
return jsonify({"error": "Failed to update password"}), 500
return jsonify({"message": "Password changed successfully"}), 200
except Exception as e:
return jsonify({"error": "Failed to change password", "details": str(e)}), 500
@user_bp.route('/deactivate', methods=['PUT'])
@login_required
def deactivate_account():
"""Deactivate user account"""
try:
user_id = session['user_id']
# Update user status
success = User.update_user(user_id, {"is_active": False})
if not success:
return jsonify({"error": "Failed to deactivate account"}), 500
# Clear session
session.clear()
return jsonify({"message": "Account deactivated successfully"}), 200
except Exception as e:
return jsonify({"error": "Failed to deactivate account", "details": str(e)}), 500
@user_bp.route('/list', methods=['GET'])
@admin_required
def list_users():
"""List all users (admin only)"""
try:
mongo = get_mongo()
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 10))
# Calculate skip value for pagination
skip = (page - 1) * per_page
# Get users with pagination
users_cursor = mongo.db.users.find(
{},
{"password": 0} # Exclude password field
).skip(skip).limit(per_page).sort("created_at", -1)
users = []
for user in users_cursor:
user['id'] = str(user['_id'])
del user['_id']
users.append(user)
# Get total count
total_users = mongo.db.users.count_documents({})
return jsonify({
"users": users,
"pagination": {
"page": page,
"per_page": per_page,
"total": total_users,
"pages": (total_users + per_page - 1) // per_page
}
}), 200
except Exception as e:
return jsonify({"error": "Failed to list users", "details": str(e)}), 500 |