File size: 3,281 Bytes
ad1357a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# logging_utils.py
# 日志相关工具
import os
import json
from datetime import datetime,timedelta
from collections import defaultdict

LOG_DIR = "/opt/nav-fronted/logs"
ACCESS_LOG = os.path.join(LOG_DIR, "access.log")
SUBMISSION_LOG = os.path.join(LOG_DIR, "submissions.log")

os.makedirs(LOG_DIR, exist_ok=True)

IP_REQUEST_RECORDS = defaultdict(list)
IP_LIMIT = 5

def is_request_allowed(ip: str) -> bool:
    now = datetime.now()
    IP_REQUEST_RECORDS[ip] = [t for t in IP_REQUEST_RECORDS[ip] if now - t < timedelta(minutes=1)]
    if len(IP_REQUEST_RECORDS[ip]) < IP_LIMIT:
        IP_REQUEST_RECORDS[ip].append(now)
        return True
    return False

def log_access(user_ip: str = None, user_agent: str = None):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = {
        "timestamp": timestamp,
        "type": "access",
        "user_ip": user_ip or "unknown",
        "user_agent": user_agent or "unknown"
    }
    with open(ACCESS_LOG, "a") as f:
        f.write(json.dumps(log_entry) + "\n")

def log_submission(scene: str, prompt: str, model: str, user: str = "anonymous", res: str = "unknown"):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = {
        "timestamp": timestamp,
        "type": "submission",
        "user": user,
        "scene": scene,
        "prompt": prompt,
        "model": model,
        "res": res
    }
    with open(SUBMISSION_LOG, "a") as f:
        f.write(json.dumps(log_entry) + "\n")

def read_logs(log_type: str = "all", max_entries: int = 50) -> list:
    logs = []
    if log_type in ["all", "access"]:
        try:
            with open(ACCESS_LOG, "r") as f:
                for line in f:
                    logs.append(json.loads(line.strip()))
        except FileNotFoundError:
            pass
    if log_type in ["all", "submission"]:
        try:
            with open(SUBMISSION_LOG, "r") as f:
                for line in f:
                    logs.append(json.loads(line.strip()))
        except FileNotFoundError:
            pass
    logs.sort(key=lambda x: x["timestamp"], reverse=True)
    return logs[:max_entries]

def format_logs_for_display(logs: list) -> str:
    if not logs:
        return "No log record"
    markdown = "### System Access Log\n\n"
    markdown += "| Time | Type | User/IP | Details |\n"
    markdown += "|------|------|---------|----------|\n"
    for log in logs:
        timestamp = log.get("timestamp", "unknown")
        log_type = "Access" if log.get("type") == "access" else "Submission"
        if log_type == "Access":
            user = log.get("user_ip", "unknown")
            details = f"User-Agent: {log.get('user_agent', 'unknown')}"
        else:
            user = log.get("user", "anonymous")
            result = log.get('res', 'unknown')
            if result != "success":
                if len(result) > 40:
                    result = f"{result[:20]}...{result[-20:]}"
            details = f"Scene: {log.get('scene', 'unknown')}, Prompt: {log.get('prompt', '')}, Model: {log.get('model', 'unknown')}, result: {result}"
        markdown += f"| {timestamp} | {log_type} | {user} | {details} |\n"
    return markdown