Duibonduil's picture
Upload 11 files
7a3c0ee verified
import os
from re import compile as re_compile
from re import search
from typing import Final, Iterable, Any
from urllib.parse import urlparse, urlunparse, unquote
from wsgiref.types import WSGIEnvironment
from requests.models import PreparedRequest
HTTP_REQUEST_METHOD: Final = "http.request.method"
HTTP_FLAVOR: Final = "http.flavor"
HTTP_HOST: Final = "http.host"
HTTP_SCHEME: Final = "http.scheme"
HTTP_USER_AGENT: Final = "http.user_agent"
HTTP_SERVER_NAME: Final = "http.server_name"
SERVER_ADDRESS: Final = "server.address"
SERVER_PORT: Final = "server.port"
URL_PATH: Final = "url.path"
URL_QUERY: Final = "url.query"
CLIENT_ADDRESS: Final = "client.address"
CLIENT_PORT: Final = "client.port"
URL_FULL: Final = "url.full"
HTTP_REQUEST_BODY_SIZE: Final = "http.request.body.size"
HTTP_REQUEST_HEADER: Final = "http.request.header"
HTTP_REQUEST_SIZE: Final = "http.request.size"
HTTP_RESPONSE_BODY_SIZE: Final = "http.response.body.size"
HTTP_RESPONSE_HEADER: Final = "http.response.header"
HTTP_RESPONSE_SIZE: Final = "http.response.size"
HTTP_RESPONSE_STATUS_CODE: Final = "http.response.status_code"
HTTP_ROUTE = "http.route"
def collect_request_attributes(environ: WSGIEnvironment):
attributes: dict[str] = {}
request_method = environ.get("REQUEST_METHOD", "")
request_method = request_method.upper()
attributes[HTTP_REQUEST_METHOD] = request_method
attributes[HTTP_FLAVOR] = environ.get("SERVER_PROTOCOL", "")
attributes[HTTP_SCHEME] = environ.get("wsgi.url_scheme", "")
attributes[HTTP_SERVER_NAME] = environ.get("SERVER_NAME", "")
attributes[HTTP_HOST] = environ.get("HTTP_HOST", "")
host_port = environ.get("SERVER_PORT")
if host_port:
attributes[SERVER_PORT] = host_port
target = environ.get("RAW_URI")
if target is None:
target = environ.get("REQUEST_URI")
if target:
path, query = _parse_url_query(target)
attributes[URL_PATH] = path
attributes[URL_QUERY] = query
remote_addr = environ.get("REMOTE_ADDR", "")
attributes[CLIENT_ADDRESS] = remote_addr
attributes[CLIENT_PORT] = environ.get("REMOTE_PORT", "")
remote_host = environ.get("REMOTE_HOST")
if remote_host and remote_host != remote_addr:
attributes[CLIENT_ADDRESS] = remote_host
attributes[HTTP_USER_AGENT] = environ.get("HTTP_USER_AGENT", "")
return attributes
def collect_attributes_from_request(request: PreparedRequest) -> dict[str]:
attributes: dict[str] = {}
url = remove_url_credentials(request.url)
attributes[HTTP_REQUEST_METHOD] = request.method
attributes[URL_FULL] = url
parsed_url = urlparse(url)
if parsed_url.scheme:
attributes[HTTP_SCHEME] = parsed_url.scheme
if parsed_url.hostname:
attributes[HTTP_HOST] = parsed_url.hostname
if parsed_url.port:
attributes[SERVER_PORT] = parsed_url.port
return attributes
def url_disabled(url: str, excluded_urls: Iterable[str]) -> bool:
"""
Check if the url is disabled.
Args:
url: The url to check.
excluded_urls: The excluded urls.
Returns:
True if the url is disabled, False otherwise.
"""
if excluded_urls is None:
return False
regex = re_compile("|".join(excluded_urls))
return search(regex, url)
def get_excluded_urls(instrumentation: str) -> list[str]:
"""
Get the excluded urls.
Args:
instrumentation: The instrumentation to get the excluded urls for.
Returns:
The excluded urls.
"""
excluded_urls = os.environ.get(f"{instrumentation}_EXCLUDED_URLS")
return parse_excluded_urls(excluded_urls)
def parse_excluded_urls(excluded_urls: str) -> list[str]:
"""
Parse the excluded urls.
Args:
excluded_urls: The excluded urls.
Returns:
The excluded urls.
"""
if excluded_urls:
excluded_url_list = [
excluded_url.strip() for excluded_url in excluded_urls.split(",")
]
else:
excluded_url_list = []
return excluded_url_list
def remove_url_credentials(url: str) -> str:
"""Given a string url, remove the username and password only if it is a valid url"""
try:
parsed = urlparse(url)
if all([parsed.scheme, parsed.netloc]): # checks for valid url
parsed_url = urlparse(url)
_, _, netloc = parsed.netloc.rpartition("@")
return urlunparse(
(
parsed_url.scheme,
netloc,
parsed_url.path,
parsed_url.params,
parsed_url.query,
parsed_url.fragment,
)
)
except ValueError: # an unparsable url was passed
pass
return url
def parser_host_port_url_from_asgi(scope: dict[str, Any]):
"""Returns (host, port, full_url) tuple."""
server = scope.get("server") or ["0.0.0.0", 80]
port = server[1]
server_host = server[0] + (":" + str(port) if str(port) != "80" else "")
full_path = scope.get("path", "")
http_url = scope.get("scheme", "http") + "://" + server_host + full_path
return server_host, port, http_url
def collect_request_attributes_asgi(scope: dict[str, Any]):
attributes: dict[str] = {}
server_host, port, http_url = parser_host_port_url_from_asgi(scope)
query_string = scope.get("query_string")
if query_string and http_url:
if isinstance(query_string, bytes):
query_string = query_string.decode("utf8")
http_url += "?" + unquote(query_string)
attributes[HTTP_REQUEST_METHOD] = scope.get("method", "")
attributes[HTTP_FLAVOR] = scope.get("http_version", "")
attributes[HTTP_SCHEME] = scope.get("scheme", "")
attributes[HTTP_HOST] = server_host
attributes[SERVER_PORT] = port
attributes[URL_FULL] = remove_url_credentials(http_url)
attributes[URL_PATH] = scope.get("path", "")
header = scope.get("headers")
if header:
for key, value in header:
if key == b"user-agent":
attributes[HTTP_USER_AGENT] = value.decode("utf8")
client = scope.get("client")
if client:
attributes[CLIENT_ADDRESS] = client[0]
attributes[CLIENT_PORT] = client[1]
return attributes
def _parse_url_query(url: str):
parsed_url = urlparse(url)
path = parsed_url.path
query_params = parsed_url.query
return path, query_params