#!/usr/bin/python3 # -*- coding: utf-8 -*- import asyncio import httpx import json import logging import os from pathlib import Path import random import string import aiofiles logger = logging.getLogger("toolbox") __ac_nonce = "".join(random.choices(string.ascii_letters + string.digits, k=16)) DEFAULT_HEADERS ={ "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36", "referer": "https://www.douyin.com/", "cookie": "" "sid_guard=166574208279efc3c59c031c7130b41b%7C1754101144%7C5184000%7CWed%2C+01-Oct-2025+02%3A19%3A04+GMT; " "ttwid=1%7CN05Iw2OUzw_zu1lcgD2Nnt7DlhY3U6BnjTZUnGZMlj4%7C1755256961%7C86396afa0f78587ebc26f398c5c8335f390cfbd081c341af6a4325fd36f3c860; " f"__ac_nonce={__ac_nonce}; " "__ac_signature=_02B4Z6wo00f01zYpLFgAAIDBBbzaGE6bBP82CSjAAKUf26; " "", } class Douyin(object): def __init__(self, platform: str, headers: dict = None, check_interval: int = 10 * 60, file_output_file: str = "output.json", ): self.platform = platform self.headers = headers or DEFAULT_HEADERS self.check_interval = check_interval self.file_output_file: Path = Path(file_output_file) # client self.client: httpx.AsyncClient = self.get_client() def get_client(self) -> httpx.AsyncClient: client = httpx.AsyncClient( http2=True, timeout=self.check_interval, limits=httpx.Limits(max_keepalive_connections=100, keepalive_expiry=self.check_interval * 2), headers=self.headers, ) return client async def request(self, method: str, url: str, **kwargs): try: response = await self.client.request(method, url, **kwargs) return response except httpx.ReadError as e: raise ConnectionError(f"request failed; error type: {type(e)}, error text: {str(e)}, url: {url}") except httpx.ConnectError as e: raise ConnectionError(f"request failed; error type: {type(e)}, error text: {str(e)}, url: {url}") async def get_follow_live_info(self): url = "https://www.douyin.com/webcast/web/feed/follow/" params = { "device_platform": "webapp", "aid": 6383, "channel": "channel_pc_web", "scene": "aweme_pc_follow_top", "update_version_code": 170400, "pc_client_type": 1, "pc_libra_divert": "Mac", "version_code": "170400", "version_name": "17.4.0", "cookie_enabled": "true", } response = await self.request("GET", url, params=params) if response.status_code != 200: raise AssertionError(f"request failed, status_code: {response.status_code}, text: {response.text}") js = response.json() data = js["data"]["data"] result = list() for row in data: room = row["room"] title = room["title"] stream_url = room["stream_url"] owner = room["owner"] sec_uid = owner["sec_uid"] nickname = owner["nickname"] room_id = row["web_rid"] stream_data = json.loads(stream_url["live_core_sdk_data"]["pull_data"]["stream_data"]) stream_data = stream_data["data"] row_ = { "nickname": nickname, "sec_uid": sec_uid, "room_id": room_id, # "status": 2, # "title": title, # "stream_data": stream_data, } result.append(row_) return result async def save_follow_live_info(self): live_info_dict = dict() if self.file_output_file.exists(): async with aiofiles.open(self.file_output_file, "r", encoding="utf-8") as f: live_info_dict = json.loads(await f.read()) follow_live_info_list = await self.get_follow_live_info() for live_info in follow_live_info_list: live_info_ = json.dumps(live_info, ensure_ascii=False) room_id = live_info["room_id"] if room_id not in live_info_dict.keys(): logger.info(f"新增直播信息;{live_info_}") live_info_dict[room_id] = live_info self.file_output_file.parent.mkdir(parents=True, exist_ok=True) async with aiofiles.open(self.file_output_file.as_posix(), "w", encoding="utf-8") as f: live_info_dict_str = json.dumps(live_info_dict, ensure_ascii=False, indent=2) await f.write(f"{live_info_dict_str}") return async def run(self): await self.save_follow_live_info() return async def start(self): while True: try: await self.run() logger.info(f"关注用户直播信息检测... 刷新间隔 {self.check_interval}s") await asyncio.sleep(self.check_interval) except Exception as error: logger.exception(f"关注用户直播信息检测错误\n{repr(error)}") await asyncio.sleep(self.check_interval) continue async def main(): douyin = Douyin(platform="Douyin") await douyin.run() return if __name__ == "__main__": asyncio.run(main())