Andrchest's picture
final try 1
e53c2d7
from app.backend.models.users import User, add_new_user, find_user_by_email, find_user_by_access_string, update_user
from bcrypt import gensalt, hashpw, checkpw
from app.settings import very_secret_pepper, jwt_algorithm, max_cookie_lifetime
from fastapi import HTTPException
import jwt
from datetime import datetime, timedelta
from fastapi import Response, Request
from secrets import token_urlsafe
import hmac
import hashlib
# A vot nado bilo izuchat kak web dev rabotaet
'''
Creates a jwt token by access string
Param:
access_string - randomly (safe methods) generated string (by default - 16 len)
expires_delta - time in seconds, defines a token lifetime
Returns:
string with 4 sections (valid jwt token)
'''
def create_access_token(access_string: str, expires_delta: timedelta = timedelta(seconds=max_cookie_lifetime)) -> str:
token_payload = {
"access_string": access_string,
}
token_payload.update({"exp": datetime.now() + expires_delta})
encoded_jwt: str = jwt.encode(token_payload, very_secret_pepper, algorithm=jwt_algorithm)
return encoded_jwt
'''
Safely creates random string of 16 chars
'''
def create_access_string() -> str:
return token_urlsafe(16)
'''
Hashes access string using hmac and sha256
We can not use the same methods as we do to save password
since we need to know a salt to get similar hash, but since
we put a raw string (non-hashed) we won't be able to guess
salt
'''
def hash_access_string(string: str) -> str:
return hmac.new(
key=very_secret_pepper.encode("utf-8"),
msg=string.encode("utf-8"),
digestmod=hashlib.sha256
).hexdigest()
'''
Creates a new user and sets a cookie with jwt token
Params:
response - needed to set a cookie
...
Returns:
Dict to send a response in JSON
'''
def create_user(response: Response, email: str, password: str) -> dict:
user: User = find_user_by_email(email=email)
if user is not None:
return HTTPException(418, "The user with similar email already exists")
salt: bytes = gensalt(rounds=16)
password_hashed: str = hashpw(password.encode("utf-8"), salt).decode("utf-8")
access_string: str = create_access_string()
access_string_hashed: str = hash_access_string(string=access_string)
add_new_user(email=email, password_hash=password_hashed, access_string_hash=access_string_hashed)
access_token: str = create_access_token(access_string=access_string)
response.set_cookie(key="access_token", value=access_token, path='/', max_age=max_cookie_lifetime, httponly=True)
return {"status": "ok"}
'''
Finds user by email. If user is found, sets a cookie with token
'''
def authenticate_user(response: Response, email: str, password: str) -> dict:
user: User = find_user_by_email(email=email)
if not user:
raise HTTPException(418, "User does not exists")
if not checkpw(password.encode('utf-8'), user.password_hash.encode('utf-8')):
raise HTTPException(418, "Wrong credentials")
access_string: str = create_access_string()
access_string_hashed: str = hash_access_string(string=access_string)
update_user(user, access_string_hash=access_string_hashed)
access_token = create_access_token(access_string)
response.set_cookie(key="access_token", value=access_token, path='/', max_age=max_cookie_lifetime, httponly=True)
return {"status": "ok"}
'''
Get user from token stored in cookies
'''
def get_current_user(request: Request) -> User | None:
token: str | None = request.cookies.get("access_token")
if not token:
return None
access_string = jwt.decode(
jwt=bytes(token, encoding='utf-8'),
key=very_secret_pepper,
algorithms=[jwt_algorithm]
).get('access_string')
user = find_user_by_access_string(hash_access_string(access_string))
if not user:
return None
return user
'''
Checks if cookie with access token is present
'''
def check_cookie(request: Request) -> dict:
result = {"token": "No token is present"}
token = request.cookies.get("access_token")
if token:
result["token"] = token
return result
def clear_cookie(response: Response) -> dict:
response.set_cookie(key="access_token", value="", httponly=True)
return {"status": "ok"}