Spaces:
Sleeping
Sleeping
import os | |
from re import compile as re_compile | |
from re import search | |
from typing import Final, Iterable, Any | |
from urllib.parse import urlparse, urlunparse, unquote | |
from wsgiref.types import WSGIEnvironment | |
from requests.models import PreparedRequest | |
HTTP_REQUEST_METHOD: Final = "http.request.method" | |
HTTP_FLAVOR: Final = "http.flavor" | |
HTTP_HOST: Final = "http.host" | |
HTTP_SCHEME: Final = "http.scheme" | |
HTTP_USER_AGENT: Final = "http.user_agent" | |
HTTP_SERVER_NAME: Final = "http.server_name" | |
SERVER_ADDRESS: Final = "server.address" | |
SERVER_PORT: Final = "server.port" | |
URL_PATH: Final = "url.path" | |
URL_QUERY: Final = "url.query" | |
CLIENT_ADDRESS: Final = "client.address" | |
CLIENT_PORT: Final = "client.port" | |
URL_FULL: Final = "url.full" | |
HTTP_REQUEST_BODY_SIZE: Final = "http.request.body.size" | |
HTTP_REQUEST_HEADER: Final = "http.request.header" | |
HTTP_REQUEST_SIZE: Final = "http.request.size" | |
HTTP_RESPONSE_BODY_SIZE: Final = "http.response.body.size" | |
HTTP_RESPONSE_HEADER: Final = "http.response.header" | |
HTTP_RESPONSE_SIZE: Final = "http.response.size" | |
HTTP_RESPONSE_STATUS_CODE: Final = "http.response.status_code" | |
HTTP_ROUTE = "http.route" | |
def collect_request_attributes(environ: WSGIEnvironment): | |
attributes: dict[str] = {} | |
request_method = environ.get("REQUEST_METHOD", "") | |
request_method = request_method.upper() | |
attributes[HTTP_REQUEST_METHOD] = request_method | |
attributes[HTTP_FLAVOR] = environ.get("SERVER_PROTOCOL", "") | |
attributes[HTTP_SCHEME] = environ.get("wsgi.url_scheme", "") | |
attributes[HTTP_SERVER_NAME] = environ.get("SERVER_NAME", "") | |
attributes[HTTP_HOST] = environ.get("HTTP_HOST", "") | |
host_port = environ.get("SERVER_PORT") | |
if host_port: | |
attributes[SERVER_PORT] = host_port | |
target = environ.get("RAW_URI") | |
if target is None: | |
target = environ.get("REQUEST_URI") | |
if target: | |
path, query = _parse_url_query(target) | |
attributes[URL_PATH] = path | |
attributes[URL_QUERY] = query | |
remote_addr = environ.get("REMOTE_ADDR", "") | |
attributes[CLIENT_ADDRESS] = remote_addr | |
attributes[CLIENT_PORT] = environ.get("REMOTE_PORT", "") | |
remote_host = environ.get("REMOTE_HOST") | |
if remote_host and remote_host != remote_addr: | |
attributes[CLIENT_ADDRESS] = remote_host | |
attributes[HTTP_USER_AGENT] = environ.get("HTTP_USER_AGENT", "") | |
return attributes | |
def collect_attributes_from_request(request: PreparedRequest) -> dict[str]: | |
attributes: dict[str] = {} | |
url = remove_url_credentials(request.url) | |
attributes[HTTP_REQUEST_METHOD] = request.method | |
attributes[URL_FULL] = url | |
parsed_url = urlparse(url) | |
if parsed_url.scheme: | |
attributes[HTTP_SCHEME] = parsed_url.scheme | |
if parsed_url.hostname: | |
attributes[HTTP_HOST] = parsed_url.hostname | |
if parsed_url.port: | |
attributes[SERVER_PORT] = parsed_url.port | |
return attributes | |
def url_disabled(url: str, excluded_urls: Iterable[str]) -> bool: | |
""" | |
Check if the url is disabled. | |
Args: | |
url: The url to check. | |
excluded_urls: The excluded urls. | |
Returns: | |
True if the url is disabled, False otherwise. | |
""" | |
if excluded_urls is None: | |
return False | |
regex = re_compile("|".join(excluded_urls)) | |
return search(regex, url) | |
def get_excluded_urls(instrumentation: str) -> list[str]: | |
""" | |
Get the excluded urls. | |
Args: | |
instrumentation: The instrumentation to get the excluded urls for. | |
Returns: | |
The excluded urls. | |
""" | |
excluded_urls = os.environ.get(f"{instrumentation}_EXCLUDED_URLS") | |
return parse_excluded_urls(excluded_urls) | |
def parse_excluded_urls(excluded_urls: str) -> list[str]: | |
""" | |
Parse the excluded urls. | |
Args: | |
excluded_urls: The excluded urls. | |
Returns: | |
The excluded urls. | |
""" | |
if excluded_urls: | |
excluded_url_list = [ | |
excluded_url.strip() for excluded_url in excluded_urls.split(",") | |
] | |
else: | |
excluded_url_list = [] | |
return excluded_url_list | |
def remove_url_credentials(url: str) -> str: | |
"""Given a string url, remove the username and password only if it is a valid url""" | |
try: | |
parsed = urlparse(url) | |
if all([parsed.scheme, parsed.netloc]): # checks for valid url | |
parsed_url = urlparse(url) | |
_, _, netloc = parsed.netloc.rpartition("@") | |
return urlunparse( | |
( | |
parsed_url.scheme, | |
netloc, | |
parsed_url.path, | |
parsed_url.params, | |
parsed_url.query, | |
parsed_url.fragment, | |
) | |
) | |
except ValueError: # an unparsable url was passed | |
pass | |
return url | |
def parser_host_port_url_from_asgi(scope: dict[str, Any]): | |
"""Returns (host, port, full_url) tuple.""" | |
server = scope.get("server") or ["0.0.0.0", 80] | |
port = server[1] | |
server_host = server[0] + (":" + str(port) if str(port) != "80" else "") | |
full_path = scope.get("path", "") | |
http_url = scope.get("scheme", "http") + "://" + server_host + full_path | |
return server_host, port, http_url | |
def collect_request_attributes_asgi(scope: dict[str, Any]): | |
attributes: dict[str] = {} | |
server_host, port, http_url = parser_host_port_url_from_asgi(scope) | |
query_string = scope.get("query_string") | |
if query_string and http_url: | |
if isinstance(query_string, bytes): | |
query_string = query_string.decode("utf8") | |
http_url += "?" + unquote(query_string) | |
attributes[HTTP_REQUEST_METHOD] = scope.get("method", "") | |
attributes[HTTP_FLAVOR] = scope.get("http_version", "") | |
attributes[HTTP_SCHEME] = scope.get("scheme", "") | |
attributes[HTTP_HOST] = server_host | |
attributes[SERVER_PORT] = port | |
attributes[URL_FULL] = remove_url_credentials(http_url) | |
attributes[URL_PATH] = scope.get("path", "") | |
header = scope.get("headers") | |
if header: | |
for key, value in header: | |
if key == b"user-agent": | |
attributes[HTTP_USER_AGENT] = value.decode("utf8") | |
client = scope.get("client") | |
if client: | |
attributes[CLIENT_ADDRESS] = client[0] | |
attributes[CLIENT_PORT] = client[1] | |
return attributes | |
def _parse_url_query(url: str): | |
parsed_url = urlparse(url) | |
path = parsed_url.path | |
query_params = parsed_url.query | |
return path, query_params | |