Spaces:
GIZ
/
Running on CPU Upgrade

Asistente_EUDR / utils /logger.py
Romulan12's picture
cleaning up local files after write
6b49e60
import json
import logging
from datetime import datetime
from uuid import uuid4
import requests
from pathlib import Path
from datasets import load_dataset, Dataset
import os
from huggingface_hub import CommitScheduler, HfApi
import random
class ChatLogger:
def __init__(self, scheduler):
"""Initialize the chat logger with paths and configurations"""
if not scheduler:
raise ValueError("Scheduler is required")
self.scheduler = scheduler
self.json_dataset_dir = scheduler.folder_path
self.logs_path = self.json_dataset_dir / f"logs-{uuid4()}.jsonl"
def get_client_ip(self, request=None):
"""Get the client IP address from the request context"""
try:
if request:
# Try different headers that might contain the real IP
ip = request.client.host
# Check for proxy headers
forwarded_for = request.headers.get('X-Forwarded-For')
if forwarded_for:
# X-Forwarded-For can contain multiple IPs - first one is the client
ip = forwarded_for.split(',')[0].strip()
logging.debug(f"Client IP detected: {ip}")
return ip
except Exception as e:
logging.error(f"Error getting client IP: {e}")
return "127.0.0.1"
def get_client_location(self, ip_address):
"""Get geolocation info using ipapi.co"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
try:
response = requests.get(
f'https://ipapi.co/{ip_address}/json/',
headers=headers,
timeout=5
)
if response.status_code == 200:
data = response.json()
# Add random noise between -0.01 and 0.01 degrees (roughly ±1km)
lat = data.get('latitude')
lon = data.get('longitude')
if lat is not None and lon is not None:
lat += random.uniform(-0.01, 0.01)
lon += random.uniform(-0.01, 0.01)
return {
'city': data.get('city'),
'region': data.get('region'),
'country': data.get('country_name'),
'latitude': lat,
'longitude': lon
}
elif response.status_code == 429:
logging.warning(f"Rate limit exceeded for IP lookup")
return None
else:
logging.error(f"Error in IP lookup: Status code {response.status_code}")
return None
except requests.exceptions.RequestException as e:
logging.error(f"Request failed in IP lookup: {str(e)}")
return None
def create_log_entry(self, query, answer, retrieved_content, feedback=None, request=None):
"""Create a structured log entry with all required fields"""
timestamp = datetime.now().timestamp()
# Get client location if request is provided
ip = self.get_client_ip(request) if request else None
location = self.get_client_location(ip) if ip else None
log_entry = {
"record_id": str(uuid4()),
"session_id": str(uuid4()), # In practice, this should be passed in from the session
"time": str(timestamp),
"client_location": location,
"question": query,
"answer": answer,
"retrieved_content": retrieved_content if isinstance(retrieved_content, list) else [retrieved_content],
"feedback": feedback
}
return log_entry
def cleanup_local_files(self):
"""Delete local JSON files after successful upload"""
try:
# List all files in json_dataset directory
for file in self.json_dataset_dir.glob("*.json*"):
try:
file.unlink() # Delete file
logging.info(f"Deleted local file: {file}")
except Exception as e:
logging.error(f"Error deleting file {file}: {e}")
# Optionally remove the directory if empty
if not any(self.json_dataset_dir.iterdir()):
self.json_dataset_dir.rmdir()
logging.info("Removed empty json_dataset directory")
except Exception as e:
logging.error(f"Error in cleanup: {e}")
def save_local(self, log_entry):
"""Save log entry to local JSONL file"""
try:
# Reorder fields for consistency
field_order = [
"record_id",
"session_id",
"time",
"client_location",
"question",
"answer",
"retrieved_content",
"feedback"
]
ordered_logs = {k: log_entry.get(k) for k in field_order if k in log_entry}
with self.scheduler.lock:
with open(self.logs_path, 'a') as f:
json.dump(ordered_logs, f)
f.write('\n')
logging.info("Log entry saved")
# After successful write, trigger cleanup
self.cleanup_local_files()
return True
except Exception as e:
logging.error(f"Error saving to local file: {str(e)}")
return False
def log(self, query, answer, retrieved_content, feedback=None, request=None):
"""Main logging method that handles both local and HF storage"""
# Create log entry
log_entry = self.create_log_entry(
query=query,
answer=answer,
retrieved_content=retrieved_content,
feedback=feedback,
request=request
)
# Save locally with thread safety
return self.save_local(log_entry)