Spaces:
Running
Running
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
import asyncio | |
import httpx | |
import json | |
import logging | |
import os | |
from pathlib import Path | |
import random | |
import string | |
import aiofiles | |
logger = logging.getLogger("toolbox") | |
__ac_nonce = "".join(random.choices(string.ascii_letters + string.digits, k=16)) | |
DEFAULT_HEADERS ={ | |
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36", | |
"referer": "https://www.douyin.com/", | |
"cookie": "" | |
"sid_guard=166574208279efc3c59c031c7130b41b%7C1754101144%7C5184000%7CWed%2C+01-Oct-2025+02%3A19%3A04+GMT; " | |
"ttwid=1%7CN05Iw2OUzw_zu1lcgD2Nnt7DlhY3U6BnjTZUnGZMlj4%7C1755256961%7C86396afa0f78587ebc26f398c5c8335f390cfbd081c341af6a4325fd36f3c860; " | |
f"__ac_nonce={__ac_nonce}; " | |
"__ac_signature=_02B4Z6wo00f01zYpLFgAAIDBBbzaGE6bBP82CSjAAKUf26; " | |
"", | |
} | |
class Douyin(object): | |
def __init__(self, | |
platform: str, | |
headers: dict = None, | |
check_interval: int = 10 * 60, | |
file_output_file: str = "output.json", | |
): | |
self.platform = platform | |
self.headers = headers or DEFAULT_HEADERS | |
self.check_interval = check_interval | |
self.file_output_file: Path = Path(file_output_file) | |
# client | |
self.client: httpx.AsyncClient = self.get_client() | |
def get_client(self) -> httpx.AsyncClient: | |
client = httpx.AsyncClient( | |
http2=True, | |
timeout=self.check_interval, | |
limits=httpx.Limits(max_keepalive_connections=100, keepalive_expiry=self.check_interval * 2), | |
headers=self.headers, | |
) | |
return client | |
async def request(self, method: str, url: str, **kwargs): | |
try: | |
response = await self.client.request(method, url, **kwargs) | |
return response | |
except httpx.ReadError as e: | |
raise ConnectionError(f"request failed; error type: {type(e)}, error text: {str(e)}, url: {url}") | |
except httpx.ConnectError as e: | |
raise ConnectionError(f"request failed; error type: {type(e)}, error text: {str(e)}, url: {url}") | |
async def get_follow_live_info(self): | |
url = "https://www.douyin.com/webcast/web/feed/follow/" | |
params = { | |
"device_platform": "webapp", | |
"aid": 6383, | |
"channel": "channel_pc_web", | |
"scene": "aweme_pc_follow_top", | |
"update_version_code": 170400, | |
"pc_client_type": 1, | |
"pc_libra_divert": "Mac", | |
"version_code": "170400", | |
"version_name": "17.4.0", | |
"cookie_enabled": "true", | |
} | |
response = await self.request("GET", url, params=params) | |
if response.status_code != 200: | |
raise AssertionError(f"request failed, status_code: {response.status_code}, text: {response.text}") | |
js = response.json() | |
data = js["data"]["data"] | |
result = list() | |
for row in data: | |
room = row["room"] | |
title = room["title"] | |
stream_url = room["stream_url"] | |
owner = room["owner"] | |
sec_uid = owner["sec_uid"] | |
nickname = owner["nickname"] | |
room_id = row["web_rid"] | |
stream_data = json.loads(stream_url["live_core_sdk_data"]["pull_data"]["stream_data"]) | |
stream_data = stream_data["data"] | |
row_ = { | |
"nickname": nickname, | |
"sec_uid": sec_uid, | |
"room_id": room_id, | |
# "status": 2, | |
# "title": title, | |
# "stream_data": stream_data, | |
} | |
result.append(row_) | |
return result | |
async def save_follow_live_info(self): | |
live_info_dict = dict() | |
if self.file_output_file.exists(): | |
async with aiofiles.open(self.file_output_file, "r", encoding="utf-8") as f: | |
live_info_dict = json.loads(await f.read()) | |
follow_live_info_list = await self.get_follow_live_info() | |
for live_info in follow_live_info_list: | |
live_info_ = json.dumps(live_info, ensure_ascii=False) | |
room_id = live_info["room_id"] | |
if room_id not in live_info_dict.keys(): | |
logger.info(f"新增直播信息;{live_info_}") | |
live_info_dict[room_id] = live_info | |
self.file_output_file.parent.mkdir(parents=True, exist_ok=True) | |
async with aiofiles.open(self.file_output_file.as_posix(), "w", encoding="utf-8") as f: | |
live_info_dict_str = json.dumps(live_info_dict, ensure_ascii=False, indent=2) | |
await f.write(f"{live_info_dict_str}") | |
return | |
async def run(self): | |
await self.save_follow_live_info() | |
return | |
async def start(self): | |
while True: | |
try: | |
await self.run() | |
logger.info(f"关注用户直播信息检测... 刷新间隔 {self.check_interval}s") | |
await asyncio.sleep(self.check_interval) | |
except Exception as error: | |
logger.exception(f"关注用户直播信息检测错误\n{repr(error)}") | |
await asyncio.sleep(self.check_interval) | |
continue | |
async def main(): | |
douyin = Douyin(platform="Douyin") | |
await douyin.run() | |
return | |
if __name__ == "__main__": | |
asyncio.run(main()) | |