|
from loguru import logger |
|
import sys |
|
import os |
|
from datetime import datetime |
|
import shutil |
|
|
|
|
|
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") |
|
os.makedirs(log_dir, exist_ok=True) |
|
|
|
|
|
logger.remove() |
|
|
|
|
|
logger.add( |
|
sys.stdout, |
|
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>", |
|
level="INFO", |
|
) |
|
|
|
|
|
logger.add( |
|
os.path.join(log_dir, "stock_scanner_{time:YYYY-MM-DD}.log"), |
|
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{line} - {message}", |
|
level="DEBUG", |
|
rotation="00:00", |
|
retention="7 days", |
|
compression="zip", |
|
enqueue=True |
|
) |
|
|
|
|
|
logger.add( |
|
os.path.join(log_dir, "error_{time:YYYY-MM-DD}.log"), |
|
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{line} - {message}", |
|
level="ERROR", |
|
rotation="00:00", |
|
retention="7 days", |
|
compression="zip", |
|
enqueue=True |
|
) |
|
|
|
def clean_old_logs(max_days=7): |
|
"""清理超过指定天数的日志文件""" |
|
try: |
|
today = datetime.now() |
|
for filename in os.listdir(log_dir): |
|
file_path = os.path.join(log_dir, filename) |
|
|
|
if os.path.isdir(file_path): |
|
continue |
|
|
|
|
|
file_time = datetime.fromtimestamp(os.path.getmtime(file_path)) |
|
days_old = (today - file_time).days |
|
|
|
|
|
if days_old > max_days: |
|
os.remove(file_path) |
|
logger.info(f"已删除过期日志文件: {filename}") |
|
except Exception as e: |
|
logger.error(f"清理日志文件时出错: {e}") |
|
|
|
def get_logger(): |
|
"""获取通用日志器""" |
|
|
|
clean_old_logs() |
|
return logger |
|
|