qgyd2021's picture
first commit
f176037
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import asyncio
import httpx
import json
import logging
import os
from pathlib import Path
import random
import string
import aiofiles
logger = logging.getLogger("toolbox")
__ac_nonce = "".join(random.choices(string.ascii_letters + string.digits, k=16))
DEFAULT_HEADERS ={
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
"referer": "https://www.douyin.com/",
"cookie": ""
"sid_guard=166574208279efc3c59c031c7130b41b%7C1754101144%7C5184000%7CWed%2C+01-Oct-2025+02%3A19%3A04+GMT; "
"ttwid=1%7CN05Iw2OUzw_zu1lcgD2Nnt7DlhY3U6BnjTZUnGZMlj4%7C1755256961%7C86396afa0f78587ebc26f398c5c8335f390cfbd081c341af6a4325fd36f3c860; "
f"__ac_nonce={__ac_nonce}; "
"__ac_signature=_02B4Z6wo00f01zYpLFgAAIDBBbzaGE6bBP82CSjAAKUf26; "
"",
}
class Douyin(object):
def __init__(self,
platform: str,
headers: dict = None,
check_interval: int = 10 * 60,
file_output_file: str = "output.json",
):
self.platform = platform
self.headers = headers or DEFAULT_HEADERS
self.check_interval = check_interval
self.file_output_file: Path = Path(file_output_file)
# client
self.client: httpx.AsyncClient = self.get_client()
def get_client(self) -> httpx.AsyncClient:
client = httpx.AsyncClient(
http2=True,
timeout=self.check_interval,
limits=httpx.Limits(max_keepalive_connections=100, keepalive_expiry=self.check_interval * 2),
headers=self.headers,
)
return client
async def request(self, method: str, url: str, **kwargs):
try:
response = await self.client.request(method, url, **kwargs)
return response
except httpx.ReadError as e:
raise ConnectionError(f"request failed; error type: {type(e)}, error text: {str(e)}, url: {url}")
except httpx.ConnectError as e:
raise ConnectionError(f"request failed; error type: {type(e)}, error text: {str(e)}, url: {url}")
async def get_follow_live_info(self):
url = "https://www.douyin.com/webcast/web/feed/follow/"
params = {
"device_platform": "webapp",
"aid": 6383,
"channel": "channel_pc_web",
"scene": "aweme_pc_follow_top",
"update_version_code": 170400,
"pc_client_type": 1,
"pc_libra_divert": "Mac",
"version_code": "170400",
"version_name": "17.4.0",
"cookie_enabled": "true",
}
response = await self.request("GET", url, params=params)
if response.status_code != 200:
raise AssertionError(f"request failed, status_code: {response.status_code}, text: {response.text}")
js = response.json()
data = js["data"]["data"]
result = list()
for row in data:
room = row["room"]
title = room["title"]
stream_url = room["stream_url"]
owner = room["owner"]
sec_uid = owner["sec_uid"]
nickname = owner["nickname"]
room_id = row["web_rid"]
stream_data = json.loads(stream_url["live_core_sdk_data"]["pull_data"]["stream_data"])
stream_data = stream_data["data"]
row_ = {
"nickname": nickname,
"sec_uid": sec_uid,
"room_id": room_id,
# "status": 2,
# "title": title,
# "stream_data": stream_data,
}
result.append(row_)
return result
async def save_follow_live_info(self):
live_info_dict = dict()
if self.file_output_file.exists():
async with aiofiles.open(self.file_output_file, "r", encoding="utf-8") as f:
live_info_dict = json.loads(await f.read())
follow_live_info_list = await self.get_follow_live_info()
for live_info in follow_live_info_list:
live_info_ = json.dumps(live_info, ensure_ascii=False)
room_id = live_info["room_id"]
if room_id not in live_info_dict.keys():
logger.info(f"新增直播信息;{live_info_}")
live_info_dict[room_id] = live_info
self.file_output_file.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(self.file_output_file.as_posix(), "w", encoding="utf-8") as f:
live_info_dict_str = json.dumps(live_info_dict, ensure_ascii=False, indent=2)
await f.write(f"{live_info_dict_str}")
return
async def run(self):
await self.save_follow_live_info()
return
async def start(self):
while True:
try:
await self.run()
logger.info(f"关注用户直播信息检测... 刷新间隔 {self.check_interval}s")
await asyncio.sleep(self.check_interval)
except Exception as error:
logger.exception(f"关注用户直播信息检测错误\n{repr(error)}")
await asyncio.sleep(self.check_interval)
continue
async def main():
douyin = Douyin(platform="Douyin")
await douyin.run()
return
if __name__ == "__main__":
asyncio.run(main())