Spaces:
Sleeping
Sleeping
from app.backend.models.users import User, add_new_user, find_user_by_email, find_user_by_access_string, update_user | |
from bcrypt import gensalt, hashpw, checkpw | |
from app.settings import very_secret_pepper, jwt_algorithm, max_cookie_lifetime | |
from fastapi import HTTPException | |
import jwt | |
from datetime import datetime, timedelta | |
from fastapi import Response, Request | |
from secrets import token_urlsafe | |
import hmac | |
import hashlib | |
# A vot nado bilo izuchat kak web dev rabotaet | |
''' | |
Creates a jwt token by access string | |
Param: | |
access_string - randomly (safe methods) generated string (by default - 16 len) | |
expires_delta - time in seconds, defines a token lifetime | |
Returns: | |
string with 4 sections (valid jwt token) | |
''' | |
def create_access_token(access_string: str, expires_delta: timedelta = timedelta(seconds=max_cookie_lifetime)) -> str: | |
token_payload = { | |
"access_string": access_string, | |
} | |
token_payload.update({"exp": datetime.now() + expires_delta}) | |
encoded_jwt: str = jwt.encode(token_payload, very_secret_pepper, algorithm=jwt_algorithm) | |
return encoded_jwt | |
''' | |
Safely creates random string of 16 chars | |
''' | |
def create_access_string() -> str: | |
return token_urlsafe(16) | |
''' | |
Hashes access string using hmac and sha256 | |
We can not use the same methods as we do to save password | |
since we need to know a salt to get similar hash, but since | |
we put a raw string (non-hashed) we won't be able to guess | |
salt | |
''' | |
def hash_access_string(string: str) -> str: | |
return hmac.new( | |
key=very_secret_pepper.encode("utf-8"), | |
msg=string.encode("utf-8"), | |
digestmod=hashlib.sha256 | |
).hexdigest() | |
''' | |
Creates a new user and sets a cookie with jwt token | |
Params: | |
response - needed to set a cookie | |
... | |
Returns: | |
Dict to send a response in JSON | |
''' | |
def create_user(response: Response, email: str, password: str) -> dict: | |
user: User = find_user_by_email(email=email) | |
if user is not None: | |
return HTTPException(418, "The user with similar email already exists") | |
salt: bytes = gensalt(rounds=16) | |
password_hashed: str = hashpw(password.encode("utf-8"), salt).decode("utf-8") | |
access_string: str = create_access_string() | |
access_string_hashed: str = hash_access_string(string=access_string) | |
add_new_user(email=email, password_hash=password_hashed, access_string_hash=access_string_hashed) | |
access_token: str = create_access_token(access_string=access_string) | |
response.set_cookie(key="access_token", value=access_token, path='/', max_age=max_cookie_lifetime, httponly=True) | |
return {"status": "ok"} | |
''' | |
Finds user by email. If user is found, sets a cookie with token | |
''' | |
def authenticate_user(response: Response, email: str, password: str) -> dict: | |
user: User = find_user_by_email(email=email) | |
if not user: | |
raise HTTPException(418, "User does not exists") | |
if not checkpw(password.encode('utf-8'), user.password_hash.encode('utf-8')): | |
raise HTTPException(418, "Wrong credentials") | |
access_string: str = create_access_string() | |
access_string_hashed: str = hash_access_string(string=access_string) | |
update_user(user, access_string_hash=access_string_hashed) | |
access_token = create_access_token(access_string) | |
response.set_cookie(key="access_token", value=access_token, path='/', max_age=max_cookie_lifetime, httponly=True) | |
return {"status": "ok"} | |
''' | |
Get user from token stored in cookies | |
''' | |
def get_current_user(request: Request) -> User | None: | |
token: str | None = request.cookies.get("access_token") | |
if not token: | |
return None | |
access_string = jwt.decode( | |
jwt=bytes(token, encoding='utf-8'), | |
key=very_secret_pepper, | |
algorithms=[jwt_algorithm] | |
).get('access_string') | |
user = find_user_by_access_string(hash_access_string(access_string)) | |
if not user: | |
return None | |
return user | |
''' | |
Checks if cookie with access token is present | |
''' | |
def check_cookie(request: Request) -> dict: | |
result = {"token": "No token is present"} | |
token = request.cookies.get("access_token") | |
if token: | |
result["token"] = token | |
return result | |
def clear_cookie(response: Response) -> dict: | |
response.set_cookie(key="access_token", value="", httponly=True) | |
return {"status": "ok"} |