File size: 6,262 Bytes
48ec4db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b8255d
 
48ec4db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
from app.backend.models.users import get_user_last_chat, find_user_by_id, add_new_user, User
from fastapi import Response, Request, HTTPException
from app.settings import settings, logger, BASE_DIR
from datetime import datetime, timedelta, timezone
from app.backend.models.chats import Chat
from uuid import uuid4
import asyncio
import shutil
import jwt
import os


async def remove_user(user_id: str) -> None:
    loop = asyncio.get_event_loop()
    path = os.path.join(BASE_DIR, "chats_storage", f"user_id={user_id}")
    try:
        loop.run_in_executor(None, shutil.rmtree, path)
    except Exception as e:
        await logger.error(f"Error at remove_user: {e}")


async def extract_user_from_context(request: Request) -> User | None:
    if hasattr(request.state, "current_user"):
        return request.state.current_user

    if settings.debug:
        await logger.info("No attribute 'current_user'")

    return None


async def create_access_token(user_id: str, expires_delta: timedelta = settings.max_cookie_lifetime) -> str:
    token_payload = {"user_id": user_id}
    token_payload.update({"exp": datetime.now() + expires_delta})
    loop = asyncio.get_event_loop()

    try:
        encoded_jwt: str = await loop.run_in_executor(
            None,
            jwt.encode,
            token_payload,
            settings.secret_pepper,
            settings.jwt_algorithm
        )
    except Exception:
        raise HTTPException(status_code=500, detail="json encoding error")

    if settings.debug:
        await logger.info(f"New JWT token - {encoded_jwt}")

    return encoded_jwt


async def create_user() -> User | None:
    new_user_id = str(uuid4())
    try:
        user = await add_new_user(id=new_user_id)
    except Exception as e:
        raise HTTPException(status_code=418, detail=e)

    if settings.debug:
        await logger.info(f"Created user - {user.id}")

    return user


async def authorize_user(response: Response, user: User) -> dict:
    if settings.debug:
        await logger.info("START Authorizing User")
    try:
        access_token: str = await create_access_token(user_id=user.id)
        expires = datetime.now(timezone.utc) + settings.max_cookie_lifetime

        response.set_cookie(
            key="access_token",
            value=access_token,
            path="/",
            expires=expires.strftime("%a, %d %b %Y %H:%M:%S GMT"),
            max_age=settings.max_cookie_lifetime,
            httponly=True,
            secure=True,
            samesite='None'
        )

        return {"status": "ok"}
    except jwt.ExpiredSignatureError:
        await remove_user(user.id)
    finally:
        if settings.debug:
            await logger.info("END Authorizing User")


async def get_current_user(request: Request) -> User | None:
    if settings.debug:
        await logger.info("START Getting User")

    loop = asyncio.get_event_loop()
    try:
        user = None
        token: str | None = request.cookies.get("access_token")

        if settings.debug:
            await logger.info(f"Token -----> {token if token else 'Empty token!'}")

        if not token:
            return None

        try:
            token_data = await loop.run_in_executor(
                None,
                jwt.decode,
                bytes(token, encoding="utf-8"),
                settings.secret_pepper,
                [settings.jwt_algorithm],
            )
            user_id = token_data.get("user_id")

            if settings.debug:
                await logger.info(f"User id -----> {user_id if user_id else 'Empty user id!'}")

            user = await find_user_by_id(id=user_id)

            if settings.debug:
                await logger.info(f"Found user -----> {user.id if user else 'No user was found!'}")
        except Exception as e:
            raise e

        if not user:
            return None

        return user
    except HTTPException as exception:
        raise exception
    finally:
        if settings.debug:
            await logger.info("END Getting User")


async def check_cookie(request: Request) -> dict:
    result = {"token": "No token is present"}
    token = request.cookies.get("access_token")
    if token:
        result["token"] = token
    return result


async def clear_cookie(response: Response) -> dict:
    response.set_cookie(key="access_token", value="", httponly=True)
    return {"status": "ok"}


async def get_latest_chat(user: User) -> Chat | None:
    return await get_user_last_chat(user)


async def refresh_cookie(request: Request, response: Response) -> None:
    if settings.debug:
        await logger.info("START Refreshing cookie")

    loop = asyncio.get_event_loop()
    try:

        token: str | None = request.cookies.get("access_token")

        if settings.debug:
            await logger.info(f"Token -----> {token if token else 'Empty token!'}")

        if token is None:
            return

        try:
            jwt_token = await loop.run_in_executor(
                None,
                jwt.decode,
                bytes(token, encoding="utf-8"),
                settings.secret_pepper,
                [settings.jwt_algorithm],
            )

            exp_datetime = datetime.fromtimestamp(jwt_token.get("exp"), tz=timezone.utc)

            if settings.debug:
                await logger.info(f"Expires -----> {exp_datetime if exp_datetime else 'No expiration date!'}")
        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail="jwt signature has expired")
        except jwt.PyJWTError as e:
            raise HTTPException(status_code=500, detail=e)

        diff = exp_datetime - datetime.now(timezone.utc)

        if settings.debug:
            await logger.info(f"Difference -----> {diff if diff else 'No difference in date!'}")

        if diff.total_seconds() < 0.2 * settings.max_cookie_lifetime.total_seconds():

            if settings.debug:
                await logger.info("Refreshing")

            user = await extract_user_from_context(request)
            await authorize_user(response, user)
    except HTTPException as exception:
        raise exception
    finally:
        if settings.debug:
            await logger.info("END Refreshing cookie")