File size: 6,165 Bytes
408c946
 
 
 
 
b3cf31b
 
408c946
b3cf31b
bcca9d9
408c946
 
 
 
 
b3cf31b
408c946
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b3cf31b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
408c946
 
b3cf31b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
408c946
 
 
b3cf31b
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#

import asyncio
import aiohttp
import requests
from urllib.parse import quote
from config import CONTENT_EXTRACTION, SEARCH_SELECTION
from src.core.web_loader import web_loader

class BrowserEngine:
    def __init__(self, configuration):
        self.config = configuration

    def generate_headers(self):
        ipv4 = web_loader.get_ipv4()
        ipv6 = web_loader.get_ipv6()
        user_agent = web_loader.get_user_agent()
        origin = web_loader.get_origin()
        referrer = web_loader.get_referrer()
        location = web_loader.get_location()
        
        return {
            "User-Agent": user_agent,
            "X-Forwarded-For": f"{ipv4}, {ipv6}",
            "X-Real-IP": ipv4,
            "X-Originating-IP": ipv4,
            "X-Remote-IP": ipv4,
            "X-Remote-Addr": ipv4,
            "X-Client-IP": ipv4,
            "X-Forwarded-Host": origin.replace("https://", "").replace("http://", ""),
            "Origin": origin,
            "Referer": referrer,
            "Accept-Language": f"{location['language']},en;q=0.9",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
            "Accept-Encoding": "gzip, deflate, br",
            "DNT": "1",
            "Connection": "keep-alive",
            "Upgrade-Insecure-Requests": "1",
            "Sec-Fetch-Dest": "document",
            "Sec-Fetch-Mode": "navigate",
            "Sec-Fetch-Site": "cross-site",
            "Sec-Fetch-User": "?1",
            "Cache-Control": "max-age=0",
            "X-Country": location['country'],
            "X-Timezone": location['timezone']
        }

    def _build_search_url_and_selector(self, search_query: str, search_provider: str = "google"):
        if search_provider == "baidu":
            return (
                f"{self.config.content_reader_api}{self.config.baidu_endpoint}?wd={quote(search_query)}",
                "#content_left"
            )
        provider_prefix = "!go" if search_provider == "google" else "!bi"
        return (
            f"{self.config.content_reader_api}{self.config.searxng_endpoint}?q={quote(f'{provider_prefix} {search_query}')}",
            "#urls"
        )

    async def _async_post(self, url: str, data: dict, headers: dict):
        timeout = aiohttp.ClientTimeout(total=self.config.request_timeout)
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.post(url, data=data, headers=headers) as response:
                text = await response.text()
                if response.status >= 400:
                    raise aiohttp.ClientResponseError(
                        request_info=response.request_info,
                        history=response.history,
                        status=response.status,
                        message=text,
                        headers=response.headers
                    )
                return text

    async def _async_get(self, url: str, headers: dict):
        timeout = aiohttp.ClientTimeout(total=self.config.request_timeout)
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get(url, headers=headers) as response:
                text = await response.text()
                if response.status >= 400:
                    raise aiohttp.ClientResponseError(
                        request_info=response.request_info,
                        history=response.history,
                        status=response.status,
                        message=text,
                        headers=response.headers
                    )
                return text

    def _sync_post(self, url: str, data: dict, headers: dict):
        response = requests.post(url, data=data, headers=headers, timeout=self.config.request_timeout)
        response.raise_for_status()
        return response.text

    def _sync_get(self, url: str, headers: dict):
        response = requests.get(url, headers=headers, timeout=self.config.request_timeout)
        response.raise_for_status()
        return response.text

    async def async_extract_page_content(self, target_url: str) -> str:
        headers = self.generate_headers()
        payload = {"url": target_url}
        extracted_content = await self._async_post(self.config.content_reader_api, payload, headers)
        return f"{extracted_content}\n\n\n{CONTENT_EXTRACTION}\n\n\n"

    def extract_page_content(self, target_url: str) -> str:
        try:
            return asyncio.run(self.async_extract_page_content(target_url))
        except Exception:
            try:
                headers = self.generate_headers()
                payload = {"url": target_url}
                extracted_content = self._sync_post(self.config.content_reader_api, payload, headers)
                return f"{extracted_content}\n\n\n{CONTENT_EXTRACTION}\n\n\n"
            except Exception as error:
                return f"Error reading URL: {str(error)}"

    async def async_perform_search(self, search_query: str, search_provider: str = "google") -> str:
        headers = self.generate_headers()
        full_url, selector = self._build_search_url_and_selector(search_query, search_provider)
        headers["X-Target-Selector"] = selector
        search_results = await self._async_get(full_url, headers)
        return f"{search_results}\n\n\n{SEARCH_SELECTION}\n\n\n"

    def perform_search(self, search_query: str, search_provider: str = "google") -> str:
        try:
            return asyncio.run(self.async_perform_search(search_query, search_provider))
        except Exception:
            try:
                headers = self.generate_headers()
                full_url, selector = self._build_search_url_and_selector(search_query, search_provider)
                headers["X-Target-Selector"] = selector
                search_results = self._sync_get(full_url, headers)
                return f"{search_results}\n\n\n{SEARCH_SELECTION}\n\n\n"
            except Exception as error:
                return f"Error during search: {str(error)}"