""" Authentication utilities and JWT handling. """ from datetime import datetime, timedelta, timezone from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import HTTPException, status import logging from config import settings logger = logging.getLogger(__name__) # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Hash a password.""" return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create a JWT access token.""" to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(minutes=settings.access_token_expire_minutes) to_encode.update({"exp": expire, "type": "access"}) encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm) return encoded_jwt def create_refresh_token(data: dict) -> str: """Create a JWT refresh token.""" to_encode = data.copy() expire = datetime.now(timezone.utc) + timedelta(days=settings.refresh_token_expire_days) to_encode.update({"exp": expire, "type": "refresh"}) encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm) return encoded_jwt def verify_token(token: str, token_type: str = "access") -> dict: """Verify and decode a JWT token.""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) user_id: str | None = payload.get("sub") token_type_claim: str | None = payload.get("type") if user_id is None or token_type_claim != token_type: raise credentials_exception return payload except JWTError as e: logger.error(f"JWT verification error: {e}") raise credentials_exception def refresh_access_token(refresh_token: str) -> str: """Create a new access token from a refresh token.""" try: payload = verify_token(refresh_token, "refresh") user_id = payload.get("sub") if not user_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token" ) access_token = create_access_token(data={"sub": user_id}) return access_token except HTTPException: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token" )