devme's picture
Upload 90 files
030c0e5 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
JWT Authentication for Warp API
Handles JWT token management, refresh, and validation.
Integrates functionality from refresh_jwt.py.
仅限匿名账户,JWT和refresh token保存在内存中。
"""
import base64
import json
import os
import time
from pathlib import Path
import httpx
import asyncio
from dotenv import load_dotenv, set_key
from ..config.settings import REFRESH_TOKEN_B64, REFRESH_URL, CLIENT_VERSION, OS_CATEGORY, OS_NAME, OS_VERSION
from .logging import logger, log
# 内存中的token存储(仅限匿名账户)
_memory_jwt_token: str = ""
_memory_refresh_token: str = ""
_is_anonymous_user: bool = False
def set_memory_jwt_token(token: str, is_anonymous: bool = False) -> None:
"""设置内存中的JWT token(仅限匿名用户)"""
global _memory_jwt_token, _is_anonymous_user
if is_anonymous:
_memory_jwt_token = token
_is_anonymous_user = True
logger.info("JWT token已保存到内存(匿名用户)")
else:
logger.debug("非匿名用户,JWT token不保存到内存")
def get_memory_jwt_token() -> str:
"""获取内存中的JWT token"""
global _memory_jwt_token
return _memory_jwt_token
def set_memory_refresh_token(token: str, is_anonymous: bool = False) -> None:
"""设置内存中的refresh token(仅限匿名用户)"""
global _memory_refresh_token, _is_anonymous_user
if is_anonymous:
_memory_refresh_token = token
_is_anonymous_user = True
logger.info("Refresh token已保存到内存(匿名用户)")
else:
logger.debug("非匿名用户,refresh token不保存到内存")
def get_memory_refresh_token() -> str:
"""获取内存中的refresh token"""
global _memory_refresh_token
return _memory_refresh_token
def is_anonymous_user() -> bool:
"""检查当前是否为匿名用户"""
global _is_anonymous_user
return _is_anonymous_user
def decode_jwt_payload(token: str) -> dict:
"""Decode JWT payload to check expiration"""
try:
parts = token.split('.')
if len(parts) != 3:
return {}
payload_b64 = parts[1]
padding = 4 - len(payload_b64) % 4
if padding != 4:
payload_b64 += '=' * padding
payload_bytes = base64.urlsafe_b64decode(payload_b64)
payload = json.loads(payload_bytes.decode('utf-8'))
return payload
except Exception as e:
logger.debug(f"Error decoding JWT: {e}")
return {}
def is_token_expired(token: str, buffer_minutes: int = 5) -> bool:
payload = decode_jwt_payload(token)
if not payload or 'exp' not in payload:
return True
expiry_time = payload['exp']
current_time = time.time()
buffer_time = buffer_minutes * 60
return (expiry_time - current_time) <= buffer_time
async def refresh_jwt_token() -> dict:
"""Refresh the JWT token using the refresh token.
优先级:内存中的refresh token(匿名用户) > 环境变量 > 默认token
"""
logger.info("Refreshing JWT token...")
# 优先使用内存中的refresh token(匿名用户)
memory_refresh = get_memory_refresh_token()
if memory_refresh:
logger.info("使用内存中的refresh token(匿名用户)")
payload = f"grant_type=refresh_token&refresh_token={memory_refresh}".encode("utf-8")
else:
# 其次使用环境变量中的refresh token
env_refresh = os.getenv("WARP_REFRESH_TOKEN")
if env_refresh:
logger.info("使用环境变量中的refresh token")
payload = f"grant_type=refresh_token&refresh_token={env_refresh}".encode("utf-8")
else:
logger.info("使用默认的refresh token")
payload = base64.b64decode(REFRESH_TOKEN_B64)
headers = {
"x-warp-client-version": CLIENT_VERSION,
"x-warp-os-category": OS_CATEGORY,
"x-warp-os-name": OS_NAME,
"x-warp-os-version": OS_VERSION,
"content-type": "application/x-www-form-urlencoded",
"accept": "*/*",
"accept-encoding": "gzip, br",
"content-length": str(len(payload))
}
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
REFRESH_URL,
headers=headers,
content=payload
)
if response.status_code == 200:
token_data = response.json()
logger.info("Token refresh successful")
return token_data
else:
logger.error(f"Token refresh failed: {response.status_code}")
logger.error(f"Response: {response.text}")
return {}
except Exception as e:
logger.error(f"Error refreshing token: {e}")
return {}
def update_env_file(new_jwt: str, is_anonymous: bool = False) -> bool:
"""更新JWT token,匿名用户保存到内存,普通用户保存到.env文件"""
if is_anonymous:
set_memory_jwt_token(new_jwt, is_anonymous=True)
return True
else:
env_path = Path(".env")
try:
set_key(str(env_path), "WARP_JWT", new_jwt)
logger.info("Updated .env file with new JWT token")
return True
except Exception as e:
logger.error(f"Error updating .env file: {e}")
# 如果.env文件更新失败,尝试保存到内存
logger.warning("尝试将JWT token保存到内存作为备用方案")
set_memory_jwt_token(new_jwt, is_anonymous=True)
return True
def update_env_refresh_token(refresh_token: str, is_anonymous: bool = False) -> bool:
"""更新refresh token,匿名用户保存到内存,普通用户保存到.env文件"""
if is_anonymous:
set_memory_refresh_token(refresh_token, is_anonymous=True)
return True
else:
env_path = Path(".env")
try:
set_key(str(env_path), "WARP_REFRESH_TOKEN", refresh_token)
logger.info("Updated .env with WARP_REFRESH_TOKEN")
return True
except Exception as e:
logger.error(f"Error updating .env WARP_REFRESH_TOKEN: {e}")
# 如果.env文件更新失败,尝试保存到内存
logger.warning("尝试将refresh token保存到内存作为备用方案")
set_memory_refresh_token(refresh_token, is_anonymous=True)
return True
async def check_and_refresh_token() -> bool:
# 优先检查内存中的JWT token(匿名用户)
current_jwt = get_memory_jwt_token()
is_anon = is_anonymous_user()
if not current_jwt:
# 如果内存中没有,检查环境变量
current_jwt = os.getenv("WARP_JWT")
if not current_jwt:
logger.warning("No JWT token found in memory or environment")
token_data = await refresh_jwt_token()
if token_data and "access_token" in token_data:
# 如果使用的是内存中的refresh token,说明是匿名用户
memory_refresh = get_memory_refresh_token()
is_anonymous = bool(memory_refresh)
return update_env_file(token_data["access_token"], is_anonymous=is_anonymous)
return False
logger.debug("Checking current JWT token expiration...")
if is_token_expired(current_jwt, buffer_minutes=15):
logger.info("JWT token is expired or expiring soon, refreshing...")
token_data = await refresh_jwt_token()
if token_data and "access_token" in token_data:
new_jwt = token_data["access_token"]
if not is_token_expired(new_jwt, buffer_minutes=0):
logger.info("New token is valid")
# 判断是否为匿名用户
memory_refresh = get_memory_refresh_token()
is_anonymous = bool(memory_refresh) or is_anon
return update_env_file(new_jwt, is_anonymous=is_anonymous)
else:
logger.warning("New token appears to be invalid or expired")
return False
else:
logger.error("Failed to get new token from refresh")
return False
else:
payload = decode_jwt_payload(current_jwt)
if payload and 'exp' in payload:
expiry_time = payload['exp']
time_left = expiry_time - time.time()
hours_left = time_left / 3600
logger.debug(f"Current token is still valid ({hours_left:.1f} hours remaining)")
else:
logger.debug("Current token appears valid")
return True
async def get_valid_jwt() -> str:
from dotenv import load_dotenv as _load
_load(override=True)
# 优先检查内存中的JWT token(匿名用户)
jwt = get_memory_jwt_token()
if not jwt:
# 如果内存中没有,检查环境变量
jwt = os.getenv("WARP_JWT")
if not jwt:
logger.info("No JWT token found in memory or environment, attempting to refresh...")
if await check_and_refresh_token():
# 刷新后重新获取token
jwt = get_memory_jwt_token()
if not jwt:
_load(override=True)
jwt = os.getenv("WARP_JWT")
if not jwt:
raise RuntimeError("WARP_JWT is not set and refresh failed")
if is_token_expired(jwt, buffer_minutes=2):
logger.info("JWT token is expired or expiring soon, attempting to refresh...")
if await check_and_refresh_token():
# 刷新后重新获取token
jwt = get_memory_jwt_token()
if not jwt:
_load(override=True)
jwt = os.getenv("WARP_JWT")
if not jwt or is_token_expired(jwt, buffer_minutes=0):
logger.warning("Warning: New token has short expiry but proceeding anyway")
else:
logger.warning("Warning: JWT token refresh failed, trying to use existing token")
return jwt
def get_jwt_token() -> str:
"""获取JWT token,优先从内存获取(匿名用户),然后从环境变量获取"""
# 优先检查内存中的JWT token(匿名用户)
jwt = get_memory_jwt_token()
if jwt:
return jwt
# 如果内存中没有,检查环境变量
from dotenv import load_dotenv as _load
_load()
return os.getenv("WARP_JWT", "")
async def refresh_jwt_if_needed() -> bool:
try:
return await check_and_refresh_token()
except Exception as e:
logger.error(f"JWT refresh failed: {e}")
return False
# ============ Anonymous token acquisition (quota refresh) ============
_ANON_GQL_URL = "https://app.warp.dev/graphql/v2?op=CreateAnonymousUser"
_IDENTITY_TOOLKIT_BASE = "https://identitytoolkit.googleapis.com/v1/accounts:signInWithCustomToken"
def _extract_google_api_key_from_refresh_url() -> str:
try:
# REFRESH_URL like: https://app.warp.dev/proxy/token?key=API_KEY
from urllib.parse import urlparse, parse_qs
parsed = urlparse(REFRESH_URL)
qs = parse_qs(parsed.query)
key = qs.get("key", [""])[0]
return key
except Exception:
return ""
async def _create_anonymous_user() -> dict:
headers = {
"accept-encoding": "gzip, br",
"content-type": "application/json",
"x-warp-client-version": CLIENT_VERSION,
"x-warp-os-category": OS_CATEGORY,
"x-warp-os-name": OS_NAME,
"x-warp-os-version": OS_VERSION,
}
# GraphQL payload per anonymous.MD
query = (
"mutation CreateAnonymousUser($input: CreateAnonymousUserInput!, $requestContext: RequestContext!) {\n"
" createAnonymousUser(input: $input, requestContext: $requestContext) {\n"
" __typename\n"
" ... on CreateAnonymousUserOutput {\n"
" expiresAt\n"
" anonymousUserType\n"
" firebaseUid\n"
" idToken\n"
" isInviteValid\n"
" responseContext { serverVersion }\n"
" }\n"
" ... on UserFacingError {\n"
" error { __typename message }\n"
" responseContext { serverVersion }\n"
" }\n"
" }\n"
"}\n"
)
variables = {
"input": {
"anonymousUserType": "NATIVE_CLIENT_ANONYMOUS_USER_FEATURE_GATED",
"expirationType": "NO_EXPIRATION",
"referralCode": None
},
"requestContext": {
"clientContext": {"version": CLIENT_VERSION},
"osContext": {
"category": OS_CATEGORY,
"linuxKernelVersion": None,
"name": OS_NAME,
"version": OS_VERSION,
}
}
}
body = {"query": query, "variables": variables, "operationName": "CreateAnonymousUser"}
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
resp = await client.post(_ANON_GQL_URL, headers=headers, json=body)
if resp.status_code != 200:
raise RuntimeError(f"CreateAnonymousUser failed: HTTP {resp.status_code} {resp.text[:200]}")
data = resp.json()
return data
async def _exchange_id_token_for_refresh_token(id_token: str) -> dict:
key = _extract_google_api_key_from_refresh_url()
url = f"{_IDENTITY_TOOLKIT_BASE}?key={key}" if key else f"{_IDENTITY_TOOLKIT_BASE}?key=AIzaSyBdy3O3S9hrdayLJxJ7mriBR4qgUaUygAs"
headers = {
"accept-encoding": "gzip, br",
"content-type": "application/x-www-form-urlencoded",
"x-warp-client-version": CLIENT_VERSION,
"x-warp-os-category": OS_CATEGORY,
"x-warp-os-name": OS_NAME,
"x-warp-os-version": OS_VERSION,
}
form = {
"returnSecureToken": "true",
"token": id_token,
}
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
resp = await client.post(url, headers=headers, data=form)
if resp.status_code != 200:
raise RuntimeError(f"signInWithCustomToken failed: HTTP {resp.status_code} {resp.text[:200]}")
return resp.json()
async def acquire_anonymous_access_token() -> str:
"""Acquire a new anonymous access token (quota refresh) and persist to memory.
Returns the new access token string. Raises on failure.
"""
logger.info("Acquiring anonymous access token via GraphQL + Identity Toolkit…")
data = await _create_anonymous_user()
id_token = None
try:
id_token = data["data"]["createAnonymousUser"].get("idToken")
except Exception:
pass
if not id_token:
raise RuntimeError(f"CreateAnonymousUser did not return idToken: {data}")
signin = await _exchange_id_token_for_refresh_token(id_token)
refresh_token = signin.get("refreshToken")
if not refresh_token:
raise RuntimeError(f"signInWithCustomToken did not return refreshToken: {signin}")
# 匿名用户的refresh token保存到内存中
update_env_refresh_token(refresh_token, is_anonymous=True)
# Now call Warp proxy token endpoint to get access_token using this refresh token
payload = f"grant_type=refresh_token&refresh_token={refresh_token}".encode("utf-8")
headers = {
"x-warp-client-version": CLIENT_VERSION,
"x-warp-os-category": OS_CATEGORY,
"x-warp-os-name": OS_NAME,
"x-warp-os-version": OS_VERSION,
"content-type": "application/x-www-form-urlencoded",
"accept": "*/*",
"accept-encoding": "gzip, br",
"content-length": str(len(payload))
}
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
resp = await client.post(REFRESH_URL, headers=headers, content=payload)
if resp.status_code != 200:
raise RuntimeError(f"Acquire access_token failed: HTTP {resp.status_code} {resp.text[:200]}")
token_data = resp.json()
access = token_data.get("access_token")
if not access:
raise RuntimeError(f"No access_token in response: {token_data}")
# 匿名用户的access token保存到内存中
update_env_file(access, is_anonymous=True)
return access
def print_token_info():
# 优先检查内存中的JWT token
current_jwt = get_memory_jwt_token()
if not current_jwt:
current_jwt = os.getenv("WARP_JWT")
if not current_jwt:
logger.info("No JWT token found in memory or environment")
return
payload = decode_jwt_payload(current_jwt)
if not payload:
logger.info("Cannot decode JWT token")
return
logger.info("=== JWT Token Information ===")
if 'email' in payload:
logger.info(f"Email: {payload['email']}")
if 'user_id' in payload:
logger.info(f"User ID: {payload['user_id']}")