Spaces:
Running
Running
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
""" | |
https://github.com/auqhjjqdo/LiveRecorder/blob/main/live_recorder.py | |
""" | |
import aiofiles | |
import asyncio | |
import collections | |
from datetime import datetime | |
from zoneinfo import ZoneInfo # Python 3.9+ 自带,无需安装 | |
import logging | |
import json | |
import os | |
import random | |
import re | |
import string | |
import time | |
from http.cookies import SimpleCookie | |
from pathlib import Path | |
import traceback | |
from typing import Dict, List, Tuple, Union, Iterable | |
import anyio | |
from bs4 import BeautifulSoup | |
# from cacheout import cache | |
from aiocache import cached | |
import ffmpeg | |
import httpx | |
import numpy as np | |
import streamlink | |
from gradio.layouts import row | |
from pydantic import conbytes | |
from streamlink.stream import StreamIO, HTTPStream | |
from streamlink_cli.main import open_stream | |
from streamlink_cli.output import FileOutput | |
from streamlink_cli.streamrunner import StreamRunner | |
logger = logging.getLogger("toolbox") | |
class DouyinLiveRecorder(object): | |
def __init__(self, | |
platform: str, | |
room_id: str, | |
room_name: str, | |
headers: dict = None, | |
cookies: str = None, | |
check_interval: int = 10, | |
file_output_fmt: str = "flv", | |
file_output_dir: str = "output", | |
file_info_file: str = "file_info.json", | |
): | |
self.platform = platform | |
self.room_id = room_id | |
self.room_name = room_name | |
self.headers = headers or { | |
"User-Agent": "Chrome", | |
} | |
self.cookies = self.load_cookies(cookies) | |
self.check_interval = check_interval | |
self.file_output_fmt = file_output_fmt | |
self.file_output_dir: Path = Path(file_output_dir) | |
self.file_info_file: Path = Path(file_info_file) | |
self.flag = f"[{self.__class__.__name__}][{self.platform}][{self.room_name}]" | |
# success_rate | |
# List[float] | |
max_length = 50 | |
self.success_rate_of_get_live_info_by_web_enter = collections.deque(maxlen=max_length) | |
self.success_rate_of_get_live_info_by_room_url = collections.deque(maxlen=max_length) | |
self.success_rate_of_get_live_info_by_follow = collections.deque(maxlen=max_length) | |
self.success_rate_of_get_live_info_by_web_enter.extend([1] * max_length) | |
self.success_rate_of_get_live_info_by_room_url.extend([1] * max_length) | |
self.success_rate_of_get_live_info_by_follow.extend([1] * max_length) | |
# state | |
self.ssl_verify = True | |
self.run_recording = False | |
self.stream_fd: StreamIO = None | |
self.file_output: FileOutput = None | |
# client | |
self.client: httpx.AsyncClient = self.get_client() | |
def load_cookies(cookies: str = None): | |
if cookies is None: | |
return None | |
result = SimpleCookie() | |
result.load(cookies) | |
result = {k: v.value for k, v in result.items()} | |
return result | |
def get_client(self) -> httpx.AsyncClient: | |
client = httpx.AsyncClient( | |
http2=True, | |
timeout=self.check_interval, | |
limits=httpx.Limits(max_keepalive_connections=100, keepalive_expiry=self.check_interval * 2), | |
headers=self.headers, | |
cookies=self.cookies, | |
) | |
return client | |
async def request(self, method: str, url: str, **kwargs): | |
try: | |
response = await self.client.request(method, url, **kwargs) | |
return response | |
except httpx.ProtocolError as error: | |
raise ConnectionError(f"{self.flag}直播检测请求协议错误\n{error}") | |
except httpx.HTTPStatusError as error: | |
raise ConnectionError(f"{self.flag}直播检测请求状态码错误\n{error}") | |
except httpx.ReadError as error: | |
raise ConnectionError(f"{self.flag}直播检测请求错误\n{error}") | |
except anyio.EndOfStream as error: | |
raise ConnectionError(f"{self.flag}直播检测代理错误\n{error}") | |
except httpx.HTTPError as error: | |
logger.error(f"网络异常 重试...") | |
raise ConnectionError(f"{self.flag}直播检测请求错误\n{repr(error)}") | |
def get_filename(self, title, fmt: str = "flv"): | |
live_time = time.strftime('%Y%m%d_%H%M') | |
# 文件名特殊字符转换为全角字符 | |
char_dict = { | |
'"': '"', | |
'*': '*', | |
':': ':', | |
'<': '<', | |
'>': '>', | |
'?': '?', | |
'/': '/', | |
'\\': '\', | |
'|': '|' | |
} | |
for half, full in char_dict.items(): | |
title = title.replace(half, full) | |
filename = f'[{live_time}]{title[:50]} #{self.platform} #{self.room_name}.{fmt}' | |
return filename | |
def get_video_info(self, title): | |
tz = ZoneInfo("Asia/Shanghai") | |
now = datetime.now(tz) | |
create_time_str = now.strftime("%Y%m%d_%H%M%S") | |
# create_time_str = time.strftime("%Y%m%d_%H%M%S") | |
video_id = create_time_str | |
title = f"{title}({video_id}直播)" | |
tags = [self.platform, self.room_name] | |
filename = f"{title[:50]}.flv" | |
# filename = f"{video_id}.flv" | |
return create_time_str, filename, title, tags, video_id | |
def get_streamlink_session(self): | |
session = streamlink.session.Streamlink({ | |
"stream-segment-timeout": 60, | |
"hls-segment-queue-threshold": 10 | |
}) | |
logger.info(f"是否验证SSL:{self.ssl_verify}") | |
session.set_option("http-ssl-verify", self.ssl_verify) | |
if self.headers: | |
session.set_option("http-headers", self.headers) | |
if self.cookies: | |
session.set_option("http-cookies", self.cookies) | |
return session | |
def run_record(self, stream: Union[StreamIO, HTTPStream], filename: str): | |
# 获取输出文件名 | |
fmt = os.path.splitext(filename)[-1][1:] | |
if stream: | |
logger.info(f"{self.flag}开始录制:{filename}") | |
# 调用streamlink录制直播 | |
result = self.stream_writer(stream, filename) | |
# 录制成功、format配置存在且不等于直播平台默认格式时运行ffmpeg封装 | |
if result and self.file_output_fmt and self.file_output_fmt != fmt: | |
self.convert_file_fmt_by_ffmpeg(filename, fmt, self.file_output_fmt) | |
self.run_recording = False | |
self.stream_fd = None | |
self.file_output = None | |
logger.info(f"{self.flag}停止录制:{filename}") | |
else: | |
logger.error(f"{self.flag}无可用直播源:{filename}") | |
def stream_writer(self, stream, filename): | |
logger.info(f"{self.flag}获取到直播流链接:{filename}\n{stream.url}") | |
file_output = FileOutput(self.file_output_dir / filename) | |
try: | |
stream_fd, prebuffer = open_stream(stream) | |
file_output.open() | |
self.run_recording = True | |
self.stream_fd = stream_fd | |
self.file_output = file_output | |
logger.info(f"{self.flag}正在录制:{filename}") | |
StreamRunner(stream=stream_fd, output=file_output).run(prebuffer) | |
return True | |
except Exception as error: | |
if "timeout" in str(error): | |
logger.warning(f"{self.flag}直播录制超时,请检查主播是否正常开播或网络连接是否正常:{filename}\n{error}") | |
elif re.search(f"SSL: CERTIFICATE_VERIFY_FAILED", str(error)): | |
logger.warning(f"{self.flag}SSL错误,将取消SSL验证:{filename}\n{error}") | |
self.ssl_verify = False | |
elif re.search(f'(Unable to open URL|No data returned from stream)', str(error)): | |
logger.warning(f'{self.flag}直播流打开错误,请检查主播是否正常开播:{filename}\n{error}') | |
else: | |
logger.exception(f'{self.flag}直播录制错误:{filename}\n{error}') | |
finally: | |
file_output.close() | |
def convert_file_fmt_by_ffmpeg(self, filename: str, src_fmt: str, tgt_fmt: str): | |
logger.info(f"{self.flag}开始ffmpeg封装:{filename}") | |
new_filename = filename.replace(f".{src_fmt}", f".{tgt_fmt}") | |
ffmpeg.input(f"{self.file_output_dir}/{filename}").output( | |
f"{self.file_output_dir}/{new_filename}", | |
codec="copy", | |
map_metadata="-1", | |
movflags="faststart", | |
).global_args("-hide_banner").run() | |
os.remove(f"{self.file_output_dir}/{filename}") | |
async def get_live_info_by_web_enter(self): | |
success_rate = np.mean([flag for flag in self.success_rate_of_get_live_info_by_web_enter]) | |
logger.info(f"{self.flag} get_live_info_by_web_enter success_rate: {success_rate}") | |
if random.random() > max(float(success_rate), 0.01): | |
return None | |
if not self.client.cookies: | |
await self.request( | |
method="GET", | |
url="https://live.douyin.com/" | |
) | |
response = await self.request( | |
method="GET", | |
url="https://live.douyin.com/webcast/room/web/enter/", | |
params={ | |
"aid": 6383, | |
"device_platform": "web", | |
"browser_language": "zh-CN", | |
"browser_platform": "Win32", | |
"browser_name": "Chrome", | |
"browser_version": "100.0.0.0", | |
"web_rid": self.room_id | |
}, | |
) | |
if response.status_code != 200: | |
raise AssertionError(f"live enter request failed with status code {response.status_code}") | |
if len(response.text) == 0: | |
logger.info(f"{self.flag}重置cookie") | |
self.client.cookies = httpx.Cookies() | |
self.success_rate_of_get_live_info_by_web_enter.append(0) | |
return None | |
js = response.json() | |
data = js["data"]["data"] | |
if len(data) == 0: | |
self.success_rate_of_get_live_info_by_web_enter.append(1) | |
return None | |
data = data[0] | |
status = data["status"] | |
title = data["title"] | |
stream_data = json.loads(data["stream_url"]["live_core_sdk_data"]["pull_data"]["stream_data"]) | |
stream_data = stream_data["data"] | |
result = { | |
"status": status, | |
"title": title, | |
"stream_data": stream_data, | |
} | |
self.success_rate_of_get_live_info_by_web_enter.append(1) | |
return result | |
async def get_live_info_by_room_url(self): | |
success_rate = np.mean([flag for flag in self.success_rate_of_get_live_info_by_room_url]) | |
logger.info(f"{self.flag} get_live_info_by_room_url success_rate: {success_rate}") | |
if random.random() > max(float(success_rate), 0.03): | |
return None | |
if not self.client.cookies: | |
await self.request( | |
method="GET", | |
url="https://live.douyin.com/" | |
) | |
room_url = f"https://live.douyin.com/{self.room_id}" | |
__ac_nonce = "".join(random.choices(string.ascii_letters + string.digits, k=16)) | |
headers= { | |
"Cookie": f"__ac_nonce={__ac_nonce}; ", | |
**self.headers, | |
} | |
response = await self.request( | |
method="GET", | |
url=room_url, | |
headers=headers, | |
) | |
html_text = response.text | |
soup = BeautifulSoup(html_text, "html.parser") | |
result = None | |
for script in soup.find_all("script"): | |
content = str(script) | |
match = re.search( | |
pattern=r"<script nonce=\"(?:.*?)\">self.__pace_f.push\(\[1,(.*?)\]\)</script>", | |
string=content, | |
flags=re.IGNORECASE | |
) | |
if match is not None: | |
text = match.group(1) | |
try: | |
text = json.loads(text) | |
except json.JSONDecodeError: | |
continue | |
if text[1:3] != ":[": | |
continue | |
text = text[2:] | |
try: | |
text = json.loads(text) | |
except json.JSONDecodeError: | |
continue | |
if not isinstance(text, list): | |
continue | |
js = text | |
if len(js) != 4: | |
continue | |
js = js[3] | |
if not isinstance(js, dict): | |
continue | |
js = js.get("state") | |
if js is None: | |
continue | |
try: | |
room_info = js["roomStore"]["roomInfo"] | |
# 2 表示正在直播 | |
room = room_info["room"] | |
alive_status = room["status"] | |
title = room["title"] | |
web_rid = room_info["web_rid"] | |
anchor = room_info["anchor"] | |
sec_uid = anchor["sec_uid"] | |
nickname = anchor["nickname"] | |
web_stream_url = room_info["web_stream_url"] | |
stream_store = js["streamStore"] | |
stream_data = stream_store["streamData"] | |
h265_stream_data = stream_data["H265_streamData"]["stream"] | |
h264_stream_data = stream_data["H264_streamData"]["stream"] | |
camera_store = js["cameraStore"] | |
except KeyError: | |
continue | |
result = { | |
"status": alive_status, | |
"title": title, | |
"web_rid": web_rid, | |
"sec_uid": sec_uid, | |
"nickname": nickname, | |
# "web_stream_url": web_stream_url, | |
# "h265_stream_data": h265_stream_data, | |
# "h264_stream_data": h264_stream_data, | |
"stream_data": h264_stream_data, | |
# "camera_store": camera_store, | |
} | |
break | |
if result is None: | |
self.success_rate_of_get_live_info_by_room_url.append(0) | |
else: | |
self.success_rate_of_get_live_info_by_room_url.append(1) | |
return result | |
async def get_live_info_by_follow_request(self): | |
success_rate = np.mean([flag for flag in self.success_rate_of_get_live_info_by_follow]) | |
logger.info(f"{self.flag} get_live_info_by_follow_request success_rate: {success_rate}") | |
if random.random() > max(float(success_rate) / 20, 0.01): | |
return None | |
# if random.random() > max(success_rate, 0.01): | |
# return None | |
url = "https://www.douyin.com/webcast/web/feed/follow/" | |
__ac_nonce = "".join(random.choices(string.ascii_letters + string.digits, k=16)) | |
headers= { | |
"cookie": "" | |
"sid_guard=166574208279efc3c59c031c7130b41b%7C1754101144%7C5184000%7CWed%2C+01-Oct-2025+02%3A19%3A04+GMT; " | |
"__ac_signature=_02B4Z6wo00f01zYpLFgAAIDBBbzaGE6bBP82CSjAAKUf26; " | |
"ttwid=1%7CN05Iw2OUzw_zu1lcgD2Nnt7DlhY3U6BnjTZUnGZMlj4%7C1755256961%7C86396afa0f78587ebc26f398c5c8335f390cfbd081c341af6a4325fd36f3c860; " | |
f"__ac_nonce={__ac_nonce}; " | |
"", | |
**self.headers, | |
} | |
params = { | |
"device_platform": "webapp", | |
"aid": 6383, | |
"channel": "channel_pc_web", | |
"scene": "aweme_pc_follow_top", | |
"update_version_code": 170400, | |
"pc_client_type": 1, | |
"pc_libra_divert": "Mac", | |
"version_code": "170400", | |
"version_name": "17.4.0", | |
"cookie_enabled": "true", | |
} | |
response = await self.request("GET", url, headers=headers, params=params) | |
if response.status_code != 200: | |
logger.error(f"request failed, status_code: {response.status_code}, text: {response.text}") | |
self.success_rate_of_get_live_info_by_follow.append(0) | |
return None | |
js = response.json() | |
status_code = js["status_code"] | |
if status_code == 20003: | |
# 请登录后进入直播间 | |
logger.error(f"request failed, status_code: {status_code}, text: {response.text}") | |
self.success_rate_of_get_live_info_by_follow.append(0) | |
return None | |
data = js["data"]["data"] | |
result = list() | |
for row in data: | |
room = row["room"] | |
title = room["title"] | |
stream_url = room["stream_url"] | |
owner = room["owner"] | |
sec_uid = owner["sec_uid"] | |
nickname = owner["nickname"] | |
room_id = row["web_rid"] | |
stream_data = json.loads(stream_url["live_core_sdk_data"]["pull_data"]["stream_data"]) | |
stream_data = stream_data["data"] | |
row_ = { | |
"nickname": nickname, | |
"sec_uid": sec_uid, | |
"room_id": room_id, | |
"status": 2, | |
"title": title, | |
"stream_data": stream_data, | |
} | |
result.append(row_) | |
self.success_rate_of_get_live_info_by_follow.append(1) | |
return result | |
async def get_live_info_by_follow(self): | |
result = None | |
live_info_list: List[dict] = await self.get_live_info_by_follow_request() | |
if live_info_list is None: | |
return None | |
for live_info in live_info_list: | |
room_id = live_info["room_id"] | |
if room_id == self.room_id: | |
result = live_info | |
break | |
return result | |
async def run(self): | |
room_url = f"https://live.douyin.com/{self.room_id}" | |
if self.run_recording: | |
return None | |
js = await self.get_live_info_by_web_enter() | |
if js is None: | |
js = await self.get_live_info_by_room_url() | |
if js is None: | |
js = await self.get_live_info_by_follow() | |
if js is None: | |
logger.info(f"{self.flag} 直播检测失败;") | |
return None | |
status = js["status"] | |
title = js["title"] | |
stream_data = js["stream_data"] | |
if status != 2: | |
logger.info(f"{self.flag} 未开播;") | |
return None | |
live_url = None | |
for quality_code in ("origin", "uhd", "hd", "sd", "md", "ld"): | |
quality_data = stream_data.get(quality_code) | |
if quality_data is None or len(quality_data) == 0: | |
continue | |
live_url = quality_data["main"]["flv"] | |
break | |
if live_url is None: | |
return None | |
stream = HTTPStream( | |
session=self.get_streamlink_session(), | |
url=live_url | |
) | |
logger.info(f"{self.flag}开始录制直播") | |
create_time_str, filename, title, tags, video_id = self.get_video_info(title) | |
await asyncio.to_thread(self.run_record, stream, filename) | |
await self.save_downloaded_video_info(create_time_str, filename, title, tags, video_id, room_url) | |
return None | |
async def start(self): | |
while True: | |
try: | |
await self.run() | |
logger.info(f"{self.flag} 刷新间隔:{self.check_interval}s") | |
await asyncio.sleep(self.check_interval) | |
except ConnectionError as error: | |
if "直播检测请求协议错误" not in str(error): | |
logger.error(error) | |
await self.client.aclose() | |
self.client: httpx.AsyncClient = self.get_client() | |
await asyncio.sleep(self.check_interval) | |
except Exception as error: | |
logger.exception(f"{self.flag}直播检测错误\n{repr(error)}\n{traceback.format_exc()}") | |
await asyncio.sleep(self.check_interval) | |
async def save_downloaded_video_info(self, create_time: str, filename: str, title: str, tags: List[str], video_id: str, url: str) -> str: | |
filename = (self.file_output_dir / filename).as_posix() | |
video_info = await self.load_downloaded_video_info() | |
video_info[video_id] = { | |
"create_time": create_time, | |
"filename": filename, | |
"title": title, | |
"desc": "", | |
"tags": tags, | |
"url": url, | |
"video_id": video_id, | |
} | |
self.file_info_file.parent.mkdir(parents=True, exist_ok=True) | |
async with aiofiles.open(self.file_info_file.as_posix(), "w", encoding="utf-8") as f: | |
video_info_ = json.dumps(video_info, ensure_ascii=False, indent=2) | |
await f.write(f"{video_info_}\n") | |
return self.file_info_file.as_posix() | |
async def load_downloaded_video_info(self) -> Dict[str, dict]: | |
video_info = dict() | |
if self.file_info_file.exists(): | |
async with aiofiles.open(self.file_info_file.as_posix(), "r", encoding="utf-8") as f: | |
data = await f.read() | |
video_info: dict = json.loads(data) | |
return video_info | |
async def main(): | |
import log | |
from project_settings import environment, project_path, log_directory, time_zone_info | |
log.setup_size_rotating(log_directory=log_directory, tz_info=time_zone_info) | |
tasks = [ | |
{ | |
"platform": "Douyin", | |
"room_id": "616456885342", | |
"room_name": "陈昌文", | |
"file_output_dir": (project_path / "data/live_records/douyin") | |
} | |
] | |
coro_tasks = [ | |
DouyinLiveRecorder( | |
platform=task["platform"], | |
room_id=task["room_id"], | |
room_name=task["room_name"], | |
file_output_dir=task["file_output_dir"], | |
) | |
for task in tasks | |
] | |
future_tasks = [ | |
asyncio.create_task(coro.start()) for coro in coro_tasks | |
] | |
try: | |
await asyncio.wait(future_tasks) | |
except (asyncio.CancelledError, KeyboardInterrupt, SystemExit): | |
logger.warning("用户中断录制,正在关闭直播流") | |
for coro_task in coro_tasks: | |
coro_task.stream_fd.close() | |
coro_task.file_output.close() | |
if __name__ == "__main__": | |
asyncio.run(main()) | |