Spaces:
Running
Running
""" | |
FastAPI dependencies for authentication and database access. | |
""" | |
from typing import Optional | |
from fastapi import Depends, HTTPException, status | |
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
from sqlalchemy.orm import Session | |
from sqlalchemy.ext.asyncio import AsyncSession | |
from sqlalchemy import select | |
import logging | |
from database import get_async_db | |
from models import User, Project, ProjectMember, ProjectRole | |
from auth import verify_token | |
from schemas import TokenData | |
logger = logging.getLogger(__name__) | |
security = HTTPBearer() | |
async def get_current_user( | |
credentials: HTTPAuthorizationCredentials = Depends(security), | |
db: AsyncSession = Depends(get_async_db) | |
) -> User: | |
"""Get the current authenticated user (async).""" | |
credentials_exception = HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Could not validate credentials", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
try: | |
payload = verify_token(credentials.credentials) | |
user_id: str | None = payload.get("sub") | |
if user_id is None: | |
raise credentials_exception | |
token_data = TokenData(user_id=user_id) | |
except Exception as e: | |
logger.error(f"Token validation error: {e}") | |
raise credentials_exception | |
# Get user from database | |
user_result = await db.execute(select(User).where(User.id == token_data.user_id)) | |
user = user_result.scalar_one_or_none() | |
if user is None: | |
raise credentials_exception | |
if not user.is_active: | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
detail="Inactive user" | |
) | |
return user | |
async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User: | |
"""Get the current active user (async).""" | |
if not current_user.is_active: | |
raise HTTPException(status_code=400, detail="Inactive user") | |
return current_user | |
async def check_project_access( | |
project_id: str, | |
current_user: User = Depends(get_current_active_user), | |
db: AsyncSession = Depends(get_async_db), | |
required_role: Optional[ProjectRole] = None | |
) -> Project: | |
"""Check if user has access to a project (async).""" | |
project_result = await db.execute( | |
select(Project) | |
.where(Project.id == project_id) | |
) | |
project = project_result.scalar_one_or_none() | |
if not project: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, | |
detail="Project not found" | |
) | |
# Check if user is owner | |
if project.owner_id == current_user.id: | |
return project | |
# Check if user is a member | |
member_result = await db.execute( | |
select(ProjectMember).where( | |
ProjectMember.project_id == project_id, | |
ProjectMember.user_id == current_user.id | |
) | |
) | |
member = member_result.scalar_one_or_none() | |
if not member: | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, | |
detail="Access denied to this project" | |
) | |
# Check role requirements | |
if required_role: | |
if project.owner_id != current_user.id and member.role != required_role: | |
if required_role == ProjectRole.OWNER: | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, | |
detail="Only project owner can perform this action" | |
) | |
return project | |
async def require_project_owner( | |
project_id: str, | |
current_user: User = Depends(get_current_active_user), | |
db: AsyncSession = Depends(get_async_db) | |
): | |
"""Dependency that requires project owner access (async).""" | |
return await check_project_access(project_id, current_user, db, ProjectRole.OWNER) | |
async def require_project_editor( | |
project_id: str, | |
current_user: User = Depends(get_current_active_user), | |
db: AsyncSession = Depends(get_async_db) | |
): | |
"""Dependency that requires at least editor access to project (async).""" | |
return await check_project_access(project_id, current_user, db) | |