Ananthakr1shnan's picture
Updated files
356ac4f
"""
Authentication and authorization utilities
"""
import jwt
import bcrypt
import json
import os
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from functools import wraps
# Import Flask components only when available
try:
from flask import request, jsonify, session, redirect, url_for
FLASK_AVAILABLE = True
except ImportError:
FLASK_AVAILABLE = False
request = None
jsonify = None
session = None
redirect = None
url_for = None
class AuthManager:
def __init__(self, secret_key: str = None):
self.secret_key = secret_key or os.environ.get('SECRET_KEY', 'dev-secret-key-change-in-production')
self.users_file = 'data/users.json'
self.active_sessions = {} # Track active sessions for security
self.session_file = 'data/active_sessions.json'
self.ensure_users_file()
def ensure_users_file(self):
"""Ensure users file exists"""
os.makedirs('data', exist_ok=True)
if not os.path.exists(self.users_file):
with open(self.users_file, 'w') as f:
json.dump({}, f)
def hash_password(self, password: str) -> str:
"""Hash password with bcrypt"""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def verify_password(self, password: str, hashed: str) -> bool:
"""Verify password against hash"""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def load_users(self) -> Dict[str, Any]:
"""Load users from file"""
try:
with open(self.users_file, 'r') as f:
return json.load(f)
except:
return {}
def save_users(self, users: Dict[str, Any]):
"""Save users to file"""
with open(self.users_file, 'w') as f:
json.dump(users, f, indent=2)
def create_user(self, username: str, email: str, password: str) -> Dict[str, Any]:
"""Create a new user"""
users = self.load_users()
if username in users:
return {'success': False, 'error': 'Username already exists'}
# Check if email already exists
for user_data in users.values():
if user_data.get('email') == email:
return {'success': False, 'error': 'Email already registered'}
# Create user
user_id = f"user_{len(users) + 1}"
users[username] = {
'user_id': user_id,
'email': email,
'password_hash': self.hash_password(password),
'created_at': datetime.now().isoformat(),
'is_active': True
}
self.save_users(users)
return {'success': True, 'user_id': user_id}
def authenticate_user(self, username: str, password: str) -> Dict[str, Any]:
"""Authenticate user credentials"""
users = self.load_users()
if username not in users:
return {'success': False, 'error': 'Invalid username or password'}
user = users[username]
if not self.verify_password(password, user['password_hash']):
return {'success': False, 'error': 'Invalid username or password'}
if not user.get('is_active', True):
return {'success': False, 'error': 'Account is disabled'}
# Generate JWT token with shorter expiration for security
token = jwt.encode({
'user_id': user['user_id'],
'username': username,
'exp': datetime.utcnow() + timedelta(hours=8) # 8 hours instead of 7 days
}, self.secret_key, algorithm='HS256')
# Track active session
self.add_active_session(user['user_id'], token)
return {
'success': True,
'token': token,
'user_id': user['user_id'],
'username': username
}
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Verify JWT token and check active session"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
user_id = payload.get('user_id')
# Check if session is still active
if not self.is_session_active(user_id, token):
return None
# Update session activity
self.update_session_activity(user_id)
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def get_current_user(self, request_obj) -> Optional[Dict[str, Any]]:
"""Get current user from request"""
if not FLASK_AVAILABLE or not request_obj:
return None
# Try Authorization header first
auth_header = request_obj.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
return self.verify_token(token)
# Try session
if session:
token = session.get('auth_token')
if token:
return self.verify_token(token)
return None
def create_default_admin(self) -> Dict[str, Any]:
"""Create default admin user if it doesn't exist"""
users = self.load_users()
admin_username = "admin"
admin_user_id = "admin_user"
# Check if admin already exists (by username or user_id)
if admin_username in users:
return {'success': True, 'message': 'Admin user already exists'}
# Check if user_id already exists
for user_data in users.values():
if user_data.get('user_id') == admin_user_id:
return {'success': True, 'message': 'Admin user ID already exists'}
# Create admin user
users[admin_username] = {
'user_id': admin_user_id,
'email': 'admin@researchmate.local',
'password_hash': self.hash_password('admin123'), # Default password
'created_at': datetime.now().isoformat(),
'is_active': True,
'is_admin': True
}
self.save_users(users)
return {
'success': True,
'message': 'Default admin user created',
'username': admin_username,
'password': 'admin123',
'note': 'Please change the default password after first login'
}
def load_active_sessions(self) -> Dict[str, Any]:
"""Load active sessions from file"""
try:
if os.path.exists(self.session_file):
with open(self.session_file, 'r') as f:
return json.load(f)
except:
pass
return {}
def save_active_sessions(self, sessions: Dict[str, Any]):
"""Save active sessions to file"""
try:
with open(self.session_file, 'w') as f:
json.dump(sessions, f, indent=2)
except:
pass
def add_active_session(self, user_id: str, token: str):
"""Add an active session"""
sessions = self.load_active_sessions()
sessions[user_id] = {
'token': token,
'created_at': datetime.now().isoformat(),
'last_activity': datetime.now().isoformat()
}
self.save_active_sessions(sessions)
def remove_active_session(self, user_id: str):
"""Remove an active session"""
sessions = self.load_active_sessions()
if user_id in sessions:
del sessions[user_id]
self.save_active_sessions(sessions)
def is_session_active(self, user_id: str, token: str) -> bool:
"""Check if a session is active"""
sessions = self.load_active_sessions()
if user_id not in sessions:
return False
session = sessions[user_id]
if session.get('token') != token:
return False
# Check if session is expired (8 hours)
created_at = datetime.fromisoformat(session['created_at'])
if datetime.now() - created_at > timedelta(hours=8):
self.remove_active_session(user_id)
return False
return True
def logout_user(self, user_id: str):
"""Logout user and invalidate session"""
self.remove_active_session(user_id)
return {'success': True, 'message': 'Logged out successfully'}
def cleanup_expired_sessions(self):
"""Clean up expired sessions"""
sessions = self.load_active_sessions()
current_time = datetime.now()
expired_sessions = []
for user_id, session in sessions.items():
created_at = datetime.fromisoformat(session['created_at'])
if current_time - created_at > timedelta(hours=8):
expired_sessions.append(user_id)
for user_id in expired_sessions:
del sessions[user_id]
if expired_sessions:
self.save_active_sessions(sessions)
return len(expired_sessions)
def update_session_activity(self, user_id: str):
"""Update last activity time for a session"""
sessions = self.load_active_sessions()
if user_id in sessions:
sessions[user_id]['last_activity'] = datetime.now().isoformat()
self.save_active_sessions(sessions)
# Global auth manager
auth_manager = AuthManager()
def require_auth(f):
"""Decorator to require authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not FLASK_AVAILABLE:
return f(*args, **kwargs)
user = auth_manager.get_current_user(request)
if not user:
if request.is_json:
return jsonify({'success': False, 'error': 'Authentication required'}), 401
else:
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
def get_current_user() -> Optional[Dict[str, Any]]:
"""Get current authenticated user"""
if not FLASK_AVAILABLE:
return None
return auth_manager.get_current_user(request)