LoremPizza's picture
Update mediaflow_proxy/main.py
4a108e6 verified
import logging
from importlib import resources
import httpx
import re
import string
from fastapi import FastAPI, Depends, Security, HTTPException
from fastapi.security import APIKeyQuery, APIKeyHeader
from starlette.responses import RedirectResponse
from starlette.staticfiles import StaticFiles
from mediaflow_proxy.configs import settings
from mediaflow_proxy.routes import proxy_router
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
app = FastAPI()
api_password_query = APIKeyQuery(name="api_password", auto_error=False)
api_password_header = APIKeyHeader(name="api_password", auto_error=False)
async def verify_api_key(api_key: str = Security(api_password_query), api_key_alt: str = Security(api_password_header)):
"""
Verifies the API key for the request.
Args:
api_key (str): The API key to validate.
api_key_alt (str): The alternative API key to validate.
Raises:
HTTPException: If the API key is invalid.
"""
if api_key == settings.api_password or api_key_alt == settings.api_password:
return
raise HTTPException(status_code=403, detail="Could not validate credentials")
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.get("/real-link")
async def real_link():
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.10; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
'Accept-Language': 'en-US,en;q=0.5'
}
async with httpx.AsyncClient() as client:
real_link = "https://mixdrop.ps/e/7ro4946lhee7l8"
response = await client.get(real_link, headers=headers, follow_redirects=True,timeout = 30)
[s1, s2] = re.search(r"\}\('(.+)',.+,'(.+)'\.split", response.text).group(1, 2)
schema = s1.split(";")[2][5:-1]
terms = s2.split("|")
charset = string.digits + string.ascii_letters
d = dict()
for i in range(len(terms)):
d[charset[i]] = terms[i] or charset[i]
s = 'https:'
for c in schema:
s += d[c] if c in d else c
response = await client.head(s)
print(response)
print(response.headers)
return s
@app.get("/favicon.ico")
async def get_favicon():
return RedirectResponse(url="/logo.png")
app.include_router(proxy_router, prefix="/proxy", tags=["proxy"], dependencies=[Depends(verify_api_key)])
static_path = resources.files("mediaflow_proxy").joinpath("static")
app.mount("/", StaticFiles(directory=str(static_path), html=True), name="static")
def run():
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8888)
if __name__ == "__main__":
run()