Spaces:
Running
Running
File size: 5,492 Bytes
f176037 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import asyncio
import httpx
import json
import logging
import os
from pathlib import Path
import random
import string
import aiofiles
logger = logging.getLogger("toolbox")
__ac_nonce = "".join(random.choices(string.ascii_letters + string.digits, k=16))
DEFAULT_HEADERS ={
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
"referer": "https://www.douyin.com/",
"cookie": ""
"sid_guard=166574208279efc3c59c031c7130b41b%7C1754101144%7C5184000%7CWed%2C+01-Oct-2025+02%3A19%3A04+GMT; "
"ttwid=1%7CN05Iw2OUzw_zu1lcgD2Nnt7DlhY3U6BnjTZUnGZMlj4%7C1755256961%7C86396afa0f78587ebc26f398c5c8335f390cfbd081c341af6a4325fd36f3c860; "
f"__ac_nonce={__ac_nonce}; "
"__ac_signature=_02B4Z6wo00f01zYpLFgAAIDBBbzaGE6bBP82CSjAAKUf26; "
"",
}
class Douyin(object):
def __init__(self,
platform: str,
headers: dict = None,
check_interval: int = 10 * 60,
file_output_file: str = "output.json",
):
self.platform = platform
self.headers = headers or DEFAULT_HEADERS
self.check_interval = check_interval
self.file_output_file: Path = Path(file_output_file)
# client
self.client: httpx.AsyncClient = self.get_client()
def get_client(self) -> httpx.AsyncClient:
client = httpx.AsyncClient(
http2=True,
timeout=self.check_interval,
limits=httpx.Limits(max_keepalive_connections=100, keepalive_expiry=self.check_interval * 2),
headers=self.headers,
)
return client
async def request(self, method: str, url: str, **kwargs):
try:
response = await self.client.request(method, url, **kwargs)
return response
except httpx.ReadError as e:
raise ConnectionError(f"request failed; error type: {type(e)}, error text: {str(e)}, url: {url}")
except httpx.ConnectError as e:
raise ConnectionError(f"request failed; error type: {type(e)}, error text: {str(e)}, url: {url}")
async def get_follow_live_info(self):
url = "https://www.douyin.com/webcast/web/feed/follow/"
params = {
"device_platform": "webapp",
"aid": 6383,
"channel": "channel_pc_web",
"scene": "aweme_pc_follow_top",
"update_version_code": 170400,
"pc_client_type": 1,
"pc_libra_divert": "Mac",
"version_code": "170400",
"version_name": "17.4.0",
"cookie_enabled": "true",
}
response = await self.request("GET", url, params=params)
if response.status_code != 200:
raise AssertionError(f"request failed, status_code: {response.status_code}, text: {response.text}")
js = response.json()
data = js["data"]["data"]
result = list()
for row in data:
room = row["room"]
title = room["title"]
stream_url = room["stream_url"]
owner = room["owner"]
sec_uid = owner["sec_uid"]
nickname = owner["nickname"]
room_id = row["web_rid"]
stream_data = json.loads(stream_url["live_core_sdk_data"]["pull_data"]["stream_data"])
stream_data = stream_data["data"]
row_ = {
"nickname": nickname,
"sec_uid": sec_uid,
"room_id": room_id,
# "status": 2,
# "title": title,
# "stream_data": stream_data,
}
result.append(row_)
return result
async def save_follow_live_info(self):
live_info_dict = dict()
if self.file_output_file.exists():
async with aiofiles.open(self.file_output_file, "r", encoding="utf-8") as f:
live_info_dict = json.loads(await f.read())
follow_live_info_list = await self.get_follow_live_info()
for live_info in follow_live_info_list:
live_info_ = json.dumps(live_info, ensure_ascii=False)
room_id = live_info["room_id"]
if room_id not in live_info_dict.keys():
logger.info(f"新增直播信息;{live_info_}")
live_info_dict[room_id] = live_info
self.file_output_file.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(self.file_output_file.as_posix(), "w", encoding="utf-8") as f:
live_info_dict_str = json.dumps(live_info_dict, ensure_ascii=False, indent=2)
await f.write(f"{live_info_dict_str}")
return
async def run(self):
await self.save_follow_live_info()
return
async def start(self):
while True:
try:
await self.run()
logger.info(f"关注用户直播信息检测... 刷新间隔 {self.check_interval}s")
await asyncio.sleep(self.check_interval)
except Exception as error:
logger.exception(f"关注用户直播信息检测错误\n{repr(error)}")
await asyncio.sleep(self.check_interval)
continue
async def main():
douyin = Douyin(platform="Douyin")
await douyin.run()
return
if __name__ == "__main__":
asyncio.run(main())
|