Spaces:
Sleeping
Sleeping
File size: 4,220 Bytes
bd161ec d7ed1ea bd161ec d7ed1ea bd161ec d7ed1ea bd161ec |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
"""
Authentication router for user management and JWT tokens.
"""
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer
from sqlalchemy.orm import Session
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from database import get_async_db
from models import User
from schemas import UserCreate, UserResponse, UserLogin, Token, APIResponse
from auth import verify_password, get_password_hash, create_access_token, create_refresh_token, refresh_access_token
from dependencies import get_current_active_user
from config import settings
logger = logging.getLogger(__name__)
router = APIRouter()
security = HTTPBearer()
@router.post("/register", response_model=APIResponse, status_code=201)
async def register_user(
user_data: UserCreate,
db: AsyncSession = Depends(get_async_db)
):
"""Register a new user."""
# Check if user already exists
existing_user = await db.execute(
select(User).where(User.email == user_data.email)
)
existing_user = existing_user.scalar_one_or_none()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new user
hashed_password = get_password_hash(user_data.password)
db_user = User(
email=user_data.email,
full_name=user_data.full_name,
hashed_password=hashed_password
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
logger.info(f"User registered: {db_user.email}")
return APIResponse(
success=True,
message="User registered successfully",
data={"user_id": db_user.id}
)
@router.post("/login", response_model=Token)
async def login_user(
user_credentials: UserLogin,
db: AsyncSession = Depends(get_async_db)
):
"""Authenticate user and return JWT tokens."""
# Get user
user_result = await db.execute(
select(User).where(User.email == user_credentials.email)
)
user = user_result.scalar_one_or_none()
if not user or not verify_password(user_credentials.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user account"
)
# Create tokens
access_token = create_access_token(data={"sub": user.id})
refresh_token = create_refresh_token(data={"sub": user.id})
logger.info(f"User logged in: {user.email}")
return Token(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer"
)
from pydantic import BaseModel
class RefreshTokenRequest(BaseModel):
refresh_token: str
@router.post("/refresh", response_model=dict)
async def refresh_token(
request: RefreshTokenRequest
):
"""Refresh access token using refresh token."""
try:
new_access_token = refresh_access_token(request.refresh_token)
return {
"access_token": new_access_token,
"token_type": "bearer"
}
except HTTPException as e:
raise e
except Exception as e:
logger.error(f"Token refresh error: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not refresh token"
)
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(
current_user: User = Depends(get_current_active_user)
):
"""Get current user information."""
return current_user
@router.post("/logout", response_model=APIResponse)
async def logout_user(
current_user: User = Depends(get_current_active_user)
):
"""Logout user (client should discard tokens)."""
logger.info(f"User logged out: {current_user.email}")
return APIResponse(
success=True,
message="User logged out successfully"
)
|