Spaces:
Running
Running
""" | |
Authentication router for user management and JWT tokens. | |
""" | |
from datetime import timedelta | |
from fastapi import APIRouter, Depends, HTTPException, status | |
from fastapi.security import HTTPBearer | |
from sqlalchemy.orm import Session | |
import logging | |
from sqlalchemy.ext.asyncio import AsyncSession | |
from sqlalchemy import select | |
from database import get_async_db | |
from models import User | |
from schemas import UserCreate, UserResponse, UserLogin, Token, APIResponse | |
from auth import verify_password, get_password_hash, create_access_token, create_refresh_token, refresh_access_token | |
from dependencies import get_current_active_user | |
from config import settings | |
logger = logging.getLogger(__name__) | |
router = APIRouter() | |
security = HTTPBearer() | |
async def register_user( | |
user_data: UserCreate, | |
db: AsyncSession = Depends(get_async_db) | |
): | |
"""Register a new user.""" | |
# Check if user already exists | |
existing_user = await db.execute( | |
select(User).where(User.email == user_data.email) | |
) | |
existing_user = existing_user.scalar_one_or_none() | |
if existing_user: | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
detail="Email already registered" | |
) | |
# Create new user | |
hashed_password = get_password_hash(user_data.password) | |
db_user = User( | |
email=user_data.email, | |
full_name=user_data.full_name, | |
hashed_password=hashed_password | |
) | |
db.add(db_user) | |
await db.commit() | |
await db.refresh(db_user) | |
logger.info(f"User registered: {db_user.email}") | |
return APIResponse( | |
success=True, | |
message="User registered successfully", | |
data={"user_id": db_user.id} | |
) | |
async def login_user( | |
user_credentials: UserLogin, | |
db: AsyncSession = Depends(get_async_db) | |
): | |
"""Authenticate user and return JWT tokens.""" | |
# Get user | |
user_result = await db.execute( | |
select(User).where(User.email == user_credentials.email) | |
) | |
user = user_result.scalar_one_or_none() | |
if not user or not verify_password(user_credentials.password, user.hashed_password): | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Incorrect email or password", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
if not user.is_active: | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
detail="Inactive user account" | |
) | |
# Create tokens | |
access_token = create_access_token(data={"sub": user.id}) | |
refresh_token = create_refresh_token(data={"sub": user.id}) | |
logger.info(f"User logged in: {user.email}") | |
return Token( | |
access_token=access_token, | |
refresh_token=refresh_token, | |
token_type="bearer" | |
) | |
from pydantic import BaseModel | |
class RefreshTokenRequest(BaseModel): | |
refresh_token: str | |
async def refresh_token( | |
request: RefreshTokenRequest | |
): | |
"""Refresh access token using refresh token.""" | |
try: | |
new_access_token = refresh_access_token(request.refresh_token) | |
return { | |
"access_token": new_access_token, | |
"token_type": "bearer" | |
} | |
except HTTPException as e: | |
raise e | |
except Exception as e: | |
logger.error(f"Token refresh error: {e}") | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Could not refresh token" | |
) | |
async def get_current_user_info( | |
current_user: User = Depends(get_current_active_user) | |
): | |
"""Get current user information.""" | |
return current_user | |
async def logout_user( | |
current_user: User = Depends(get_current_active_user) | |
): | |
"""Logout user (client should discard tokens).""" | |
logger.info(f"User logged out: {current_user.email}") | |
return APIResponse( | |
success=True, | |
message="User logged out successfully" | |
) | |