Spaces:
Running
Running
File size: 6,262 Bytes
48ec4db 5b8255d 48ec4db |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
from app.backend.models.users import get_user_last_chat, find_user_by_id, add_new_user, User
from fastapi import Response, Request, HTTPException
from app.settings import settings, logger, BASE_DIR
from datetime import datetime, timedelta, timezone
from app.backend.models.chats import Chat
from uuid import uuid4
import asyncio
import shutil
import jwt
import os
async def remove_user(user_id: str) -> None:
loop = asyncio.get_event_loop()
path = os.path.join(BASE_DIR, "chats_storage", f"user_id={user_id}")
try:
loop.run_in_executor(None, shutil.rmtree, path)
except Exception as e:
await logger.error(f"Error at remove_user: {e}")
async def extract_user_from_context(request: Request) -> User | None:
if hasattr(request.state, "current_user"):
return request.state.current_user
if settings.debug:
await logger.info("No attribute 'current_user'")
return None
async def create_access_token(user_id: str, expires_delta: timedelta = settings.max_cookie_lifetime) -> str:
token_payload = {"user_id": user_id}
token_payload.update({"exp": datetime.now() + expires_delta})
loop = asyncio.get_event_loop()
try:
encoded_jwt: str = await loop.run_in_executor(
None,
jwt.encode,
token_payload,
settings.secret_pepper,
settings.jwt_algorithm
)
except Exception:
raise HTTPException(status_code=500, detail="json encoding error")
if settings.debug:
await logger.info(f"New JWT token - {encoded_jwt}")
return encoded_jwt
async def create_user() -> User | None:
new_user_id = str(uuid4())
try:
user = await add_new_user(id=new_user_id)
except Exception as e:
raise HTTPException(status_code=418, detail=e)
if settings.debug:
await logger.info(f"Created user - {user.id}")
return user
async def authorize_user(response: Response, user: User) -> dict:
if settings.debug:
await logger.info("START Authorizing User")
try:
access_token: str = await create_access_token(user_id=user.id)
expires = datetime.now(timezone.utc) + settings.max_cookie_lifetime
response.set_cookie(
key="access_token",
value=access_token,
path="/",
expires=expires.strftime("%a, %d %b %Y %H:%M:%S GMT"),
max_age=settings.max_cookie_lifetime,
httponly=True,
secure=True,
samesite='None'
)
return {"status": "ok"}
except jwt.ExpiredSignatureError:
await remove_user(user.id)
finally:
if settings.debug:
await logger.info("END Authorizing User")
async def get_current_user(request: Request) -> User | None:
if settings.debug:
await logger.info("START Getting User")
loop = asyncio.get_event_loop()
try:
user = None
token: str | None = request.cookies.get("access_token")
if settings.debug:
await logger.info(f"Token -----> {token if token else 'Empty token!'}")
if not token:
return None
try:
token_data = await loop.run_in_executor(
None,
jwt.decode,
bytes(token, encoding="utf-8"),
settings.secret_pepper,
[settings.jwt_algorithm],
)
user_id = token_data.get("user_id")
if settings.debug:
await logger.info(f"User id -----> {user_id if user_id else 'Empty user id!'}")
user = await find_user_by_id(id=user_id)
if settings.debug:
await logger.info(f"Found user -----> {user.id if user else 'No user was found!'}")
except Exception as e:
raise e
if not user:
return None
return user
except HTTPException as exception:
raise exception
finally:
if settings.debug:
await logger.info("END Getting User")
async def check_cookie(request: Request) -> dict:
result = {"token": "No token is present"}
token = request.cookies.get("access_token")
if token:
result["token"] = token
return result
async def clear_cookie(response: Response) -> dict:
response.set_cookie(key="access_token", value="", httponly=True)
return {"status": "ok"}
async def get_latest_chat(user: User) -> Chat | None:
return await get_user_last_chat(user)
async def refresh_cookie(request: Request, response: Response) -> None:
if settings.debug:
await logger.info("START Refreshing cookie")
loop = asyncio.get_event_loop()
try:
token: str | None = request.cookies.get("access_token")
if settings.debug:
await logger.info(f"Token -----> {token if token else 'Empty token!'}")
if token is None:
return
try:
jwt_token = await loop.run_in_executor(
None,
jwt.decode,
bytes(token, encoding="utf-8"),
settings.secret_pepper,
[settings.jwt_algorithm],
)
exp_datetime = datetime.fromtimestamp(jwt_token.get("exp"), tz=timezone.utc)
if settings.debug:
await logger.info(f"Expires -----> {exp_datetime if exp_datetime else 'No expiration date!'}")
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="jwt signature has expired")
except jwt.PyJWTError as e:
raise HTTPException(status_code=500, detail=e)
diff = exp_datetime - datetime.now(timezone.utc)
if settings.debug:
await logger.info(f"Difference -----> {diff if diff else 'No difference in date!'}")
if diff.total_seconds() < 0.2 * settings.max_cookie_lifetime.total_seconds():
if settings.debug:
await logger.info("Refreshing")
user = await extract_user_from_context(request)
await authorize_user(response, user)
except HTTPException as exception:
raise exception
finally:
if settings.debug:
await logger.info("END Refreshing cookie")
|