Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
File size: 6,136 Bytes
95c8547 6b49e60 95c8547 6b49e60 95c8547 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
import json
import logging
from datetime import datetime
from uuid import uuid4
import requests
from pathlib import Path
from datasets import load_dataset, Dataset
import os
from huggingface_hub import CommitScheduler, HfApi
import random
class ChatLogger:
def __init__(self, scheduler):
"""Initialize the chat logger with paths and configurations"""
if not scheduler:
raise ValueError("Scheduler is required")
self.scheduler = scheduler
self.json_dataset_dir = scheduler.folder_path
self.logs_path = self.json_dataset_dir / f"logs-{uuid4()}.jsonl"
def get_client_ip(self, request=None):
"""Get the client IP address from the request context"""
try:
if request:
# Try different headers that might contain the real IP
ip = request.client.host
# Check for proxy headers
forwarded_for = request.headers.get('X-Forwarded-For')
if forwarded_for:
# X-Forwarded-For can contain multiple IPs - first one is the client
ip = forwarded_for.split(',')[0].strip()
logging.debug(f"Client IP detected: {ip}")
return ip
except Exception as e:
logging.error(f"Error getting client IP: {e}")
return "127.0.0.1"
def get_client_location(self, ip_address):
"""Get geolocation info using ipapi.co"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
try:
response = requests.get(
f'https://ipapi.co/{ip_address}/json/',
headers=headers,
timeout=5
)
if response.status_code == 200:
data = response.json()
# Add random noise between -0.01 and 0.01 degrees (roughly ±1km)
lat = data.get('latitude')
lon = data.get('longitude')
if lat is not None and lon is not None:
lat += random.uniform(-0.01, 0.01)
lon += random.uniform(-0.01, 0.01)
return {
'city': data.get('city'),
'region': data.get('region'),
'country': data.get('country_name'),
'latitude': lat,
'longitude': lon
}
elif response.status_code == 429:
logging.warning(f"Rate limit exceeded for IP lookup")
return None
else:
logging.error(f"Error in IP lookup: Status code {response.status_code}")
return None
except requests.exceptions.RequestException as e:
logging.error(f"Request failed in IP lookup: {str(e)}")
return None
def create_log_entry(self, query, answer, retrieved_content, feedback=None, request=None):
"""Create a structured log entry with all required fields"""
timestamp = datetime.now().timestamp()
# Get client location if request is provided
ip = self.get_client_ip(request) if request else None
location = self.get_client_location(ip) if ip else None
log_entry = {
"record_id": str(uuid4()),
"session_id": str(uuid4()), # In practice, this should be passed in from the session
"time": str(timestamp),
"client_location": location,
"question": query,
"answer": answer,
"retrieved_content": retrieved_content if isinstance(retrieved_content, list) else [retrieved_content],
"feedback": feedback
}
return log_entry
def cleanup_local_files(self):
"""Delete local JSON files after successful upload"""
try:
# List all files in json_dataset directory
for file in self.json_dataset_dir.glob("*.json*"):
try:
file.unlink() # Delete file
logging.info(f"Deleted local file: {file}")
except Exception as e:
logging.error(f"Error deleting file {file}: {e}")
# Optionally remove the directory if empty
if not any(self.json_dataset_dir.iterdir()):
self.json_dataset_dir.rmdir()
logging.info("Removed empty json_dataset directory")
except Exception as e:
logging.error(f"Error in cleanup: {e}")
def save_local(self, log_entry):
"""Save log entry to local JSONL file"""
try:
# Reorder fields for consistency
field_order = [
"record_id",
"session_id",
"time",
"client_location",
"question",
"answer",
"retrieved_content",
"feedback"
]
ordered_logs = {k: log_entry.get(k) for k in field_order if k in log_entry}
with self.scheduler.lock:
with open(self.logs_path, 'a') as f:
json.dump(ordered_logs, f)
f.write('\n')
logging.info("Log entry saved")
# After successful write, trigger cleanup
self.cleanup_local_files()
return True
except Exception as e:
logging.error(f"Error saving to local file: {str(e)}")
return False
def log(self, query, answer, retrieved_content, feedback=None, request=None):
"""Main logging method that handles both local and HF storage"""
# Create log entry
log_entry = self.create_log_entry(
query=query,
answer=answer,
retrieved_content=retrieved_content,
feedback=feedback,
request=request
)
# Save locally with thread safety
return self.save_local(log_entry)
|