from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt from passlib.context import CryptContext from pydantic import BaseModel from datetime import datetime, timedelta, timezone import sqlite3 import os SECRET_KEY = "your-secret-key" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 300 pwd_context = CryptContext(schemes=["django_pbkdf2_sha256"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") auth_router = APIRouter() class User(BaseModel): username: str class Token(BaseModel): access_token: str token_type: str # SQLite setup DB_NAME = "users.db" def init_db(): conn = sqlite3.connect(DB_NAME) cur = conn.cursor() cur.execute('''CREATE TABLE IF NOT EXISTS users (username TEXT PRIMARY KEY, hashed_password TEXT)''') # Add sample users if they don't exist cur.execute("INSERT OR IGNORE INTO users VALUES (?, ?)", ("admin", pwd_context.hash("admin"))) conn.commit() conn.close() # Initialize the database init_db() def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_user(username: str): conn = sqlite3.connect(DB_NAME) cur = conn.cursor() cur.execute("SELECT * FROM users WHERE username=?", (username,)) user = cur.fetchone() conn.close() if user: return User(username=user[0]) return None def authenticate_user(username: str, password: str): conn = sqlite3.connect(DB_NAME) cur = conn.cursor() cur.execute("SELECT * FROM users WHERE username=?", (username,)) user = cur.fetchone() conn.close() if not user: return False if not verify_password(password, user[1]): return False return User(username=user[0]) def create_access_token(data: dict): to_encode = data.copy() expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt @auth_router.post("/token", response_model=Token) async def login(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token = create_access_token(data={"sub": user.username}) return {"access_token": access_token, "token_type": "bearer"} async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception user = get_user(username) if user is None: raise credentials_exception return user except JWTError: raise credentials_exception