mpt2026 / app /utils /utils.py
chaowenguo's picture
Upload 108 files
e58b6a6 verified
import json
import locale
import os
from pathlib import Path
import threading
from typing import Any
from uuid import uuid4
import urllib3
from loguru import logger
from app.models import const
urllib3.disable_warnings()
def get_response(status: int, data: Any = None, message: str = ""):
obj = {
"status": status,
}
if data:
obj["data"] = data
if message:
obj["message"] = message
return obj
def to_json(obj):
try:
# Define a helper function to handle different types of objects
def serialize(o):
# If the object is a serializable type, return it directly
if isinstance(o, (int, float, bool, str)) or o is None:
return o
# If the object is binary data, convert it to a base64-encoded string
elif isinstance(o, bytes):
return "*** binary data ***"
# If the object is a dictionary, recursively process each key-value pair
elif isinstance(o, dict):
return {k: serialize(v) for k, v in o.items()}
# If the object is a list or tuple, recursively process each element
elif isinstance(o, (list, tuple)):
return [serialize(item) for item in o]
# If the object is a custom type, attempt to return its __dict__ attribute
elif hasattr(o, "__dict__"):
return serialize(o.__dict__)
# Return None for other cases (or choose to raise an exception)
else:
return None
# Use the serialize function to process the input object
serialized_obj = serialize(obj)
# Serialize the processed object into a JSON string
return json.dumps(serialized_obj, ensure_ascii=False, indent=4)
except Exception:
return None
def get_uuid(remove_hyphen: bool = False):
u = str(uuid4())
if remove_hyphen:
u = u.replace("-", "")
return u
def root_dir():
return os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
def storage_dir(sub_dir: str = "", create: bool = False):
d = os.path.join(root_dir(), "storage")
if sub_dir:
d = os.path.join(d, sub_dir)
if create and not os.path.exists(d):
os.makedirs(d)
return d
def resource_dir(sub_dir: str = ""):
d = os.path.join(root_dir(), "resource")
if sub_dir:
d = os.path.join(d, sub_dir)
return d
def task_dir(sub_dir: str = ""):
d = os.path.join(storage_dir(), "tasks")
if sub_dir:
d = os.path.join(d, sub_dir)
if not os.path.exists(d):
os.makedirs(d)
return d
def font_dir(sub_dir: str = ""):
d = resource_dir("fonts")
if sub_dir:
d = os.path.join(d, sub_dir)
if not os.path.exists(d):
os.makedirs(d)
return d
def song_dir(sub_dir: str = ""):
d = resource_dir("songs")
if sub_dir:
d = os.path.join(d, sub_dir)
if not os.path.exists(d):
os.makedirs(d)
return d
def public_dir(sub_dir: str = ""):
d = resource_dir("public")
if sub_dir:
d = os.path.join(d, sub_dir)
if not os.path.exists(d):
os.makedirs(d)
return d
def run_in_background(func, *args, **kwargs):
def run():
try:
func(*args, **kwargs)
except Exception as e:
logger.error(f"run_in_background error: {e}")
thread = threading.Thread(target=run)
thread.start()
return thread
def time_convert_seconds_to_hmsm(seconds) -> str:
hours = int(seconds // 3600)
seconds = seconds % 3600
minutes = int(seconds // 60)
milliseconds = int(seconds * 1000) % 1000
seconds = int(seconds % 60)
return "{:02d}:{:02d}:{:02d},{:03d}".format(hours, minutes, seconds, milliseconds)
def text_to_srt(idx: int, msg: str, start_time: float, end_time: float) -> str:
start_time = time_convert_seconds_to_hmsm(start_time)
end_time = time_convert_seconds_to_hmsm(end_time)
srt = """%d
%s --> %s
%s
""" % (
idx,
start_time,
end_time,
msg,
)
return srt
def str_contains_punctuation(word):
for p in const.PUNCTUATIONS:
if p in word:
return True
return False
def split_string_by_punctuations(s):
result = []
txt = ""
previous_char = ""
next_char = ""
for i in range(len(s)):
char = s[i]
if char == "\n":
result.append(txt.strip())
txt = ""
continue
if i > 0:
previous_char = s[i - 1]
if i < len(s) - 1:
next_char = s[i + 1]
if char == "." and previous_char.isdigit() and next_char.isdigit():
# # In the case of "withdraw 10,000, charged at 2.5% fee", the dot in "2.5" should not be treated as a line break marker
txt += char
continue
if char not in const.PUNCTUATIONS:
txt += char
else:
result.append(txt.strip())
txt = ""
result.append(txt.strip())
# filter empty string
result = list(filter(None, result))
return result
def md5(text):
import hashlib
return hashlib.md5(text.encode("utf-8")).hexdigest()
def get_system_locale():
try:
loc = locale.getdefaultlocale()
# zh_CN, zh_TW return zh
# en_US, en_GB return en
language_code = loc[0].split("_")[0]
return language_code
except Exception:
return "en"
def load_locales(i18n_dir):
_locales = {}
for root, dirs, files in os.walk(i18n_dir):
for file in files:
if file.endswith(".json"):
lang = file.split(".")[0]
with open(os.path.join(root, file), "r", encoding="utf-8") as f:
_locales[lang] = json.loads(f.read())
return _locales
def parse_extension(filename):
return Path(filename).suffix.lower().lstrip('.')