|
|
|
|
|
import os
|
|
import json
|
|
from datetime import datetime,timedelta
|
|
from collections import defaultdict
|
|
|
|
LOG_DIR = "/opt/nav-fronted/logs"
|
|
ACCESS_LOG = os.path.join(LOG_DIR, "access.log")
|
|
SUBMISSION_LOG = os.path.join(LOG_DIR, "submissions.log")
|
|
|
|
os.makedirs(LOG_DIR, exist_ok=True)
|
|
|
|
IP_REQUEST_RECORDS = defaultdict(list)
|
|
IP_LIMIT = 5
|
|
|
|
def is_request_allowed(ip: str) -> bool:
|
|
now = datetime.now()
|
|
IP_REQUEST_RECORDS[ip] = [t for t in IP_REQUEST_RECORDS[ip] if now - t < timedelta(minutes=1)]
|
|
if len(IP_REQUEST_RECORDS[ip]) < IP_LIMIT:
|
|
IP_REQUEST_RECORDS[ip].append(now)
|
|
return True
|
|
return False
|
|
|
|
def log_access(user_ip: str = None, user_agent: str = None):
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
log_entry = {
|
|
"timestamp": timestamp,
|
|
"type": "access",
|
|
"user_ip": user_ip or "unknown",
|
|
"user_agent": user_agent or "unknown"
|
|
}
|
|
with open(ACCESS_LOG, "a") as f:
|
|
f.write(json.dumps(log_entry) + "\n")
|
|
|
|
def log_submission(scene: str, prompt: str, model: str, user: str = "anonymous", res: str = "unknown"):
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
log_entry = {
|
|
"timestamp": timestamp,
|
|
"type": "submission",
|
|
"user": user,
|
|
"scene": scene,
|
|
"prompt": prompt,
|
|
"model": model,
|
|
"res": res
|
|
}
|
|
with open(SUBMISSION_LOG, "a") as f:
|
|
f.write(json.dumps(log_entry) + "\n")
|
|
|
|
def read_logs(log_type: str = "all", max_entries: int = 50) -> list:
|
|
logs = []
|
|
if log_type in ["all", "access"]:
|
|
try:
|
|
with open(ACCESS_LOG, "r") as f:
|
|
for line in f:
|
|
logs.append(json.loads(line.strip()))
|
|
except FileNotFoundError:
|
|
pass
|
|
if log_type in ["all", "submission"]:
|
|
try:
|
|
with open(SUBMISSION_LOG, "r") as f:
|
|
for line in f:
|
|
logs.append(json.loads(line.strip()))
|
|
except FileNotFoundError:
|
|
pass
|
|
logs.sort(key=lambda x: x["timestamp"], reverse=True)
|
|
return logs[:max_entries]
|
|
|
|
def format_logs_for_display(logs: list) -> str:
|
|
if not logs:
|
|
return "No log record"
|
|
markdown = "### System Access Log\n\n"
|
|
markdown += "| Time | Type | User/IP | Details |\n"
|
|
markdown += "|------|------|---------|----------|\n"
|
|
for log in logs:
|
|
timestamp = log.get("timestamp", "unknown")
|
|
log_type = "Access" if log.get("type") == "access" else "Submission"
|
|
if log_type == "Access":
|
|
user = log.get("user_ip", "unknown")
|
|
details = f"User-Agent: {log.get('user_agent', 'unknown')}"
|
|
else:
|
|
user = log.get("user", "anonymous")
|
|
result = log.get('res', 'unknown')
|
|
if result != "success":
|
|
if len(result) > 40:
|
|
result = f"{result[:20]}...{result[-20:]}"
|
|
details = f"Scene: {log.get('scene', 'unknown')}, Prompt: {log.get('prompt', '')}, Model: {log.get('model', 'unknown')}, result: {result}"
|
|
markdown += f"| {timestamp} | {log_type} | {user} | {details} |\n"
|
|
return markdown
|
|
|