dorogan
Update: changes in search API (teasers and docs texts were separated)
ac7cbfc
import json
import os
from flask import Flask, jsonify, request
from semantic_search import SemanticSearch
from datetime import datetime
search = SemanticSearch()
app = Flask(__name__)
app.config['JSON_AS_ASCII'] = False
# Set the path for log files
LOGS_BASE_PATH = os.getenv("LOGS_BASE_PATH", "logs")
# Create logs directory if it doesn't exist
if not os.path.exists(LOGS_BASE_PATH):
os.makedirs(LOGS_BASE_PATH)
# Check if logs are enabled
ENABLE_LOGS = os.getenv("ENABLE_LOGS", "0") == "1"
def log_query_result(query, top, request_id, result):
if not ENABLE_LOGS:
return
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
log_file_path = os.path.join(LOGS_BASE_PATH, f"{timestamp}.json")
log_data = {
"timestamp": timestamp,
"query": query,
"top": top,
"request_id": request_id,
"result": result
}
with open(log_file_path, 'w') as log_file:
json.dump(log_data, log_file, indent=2)
@app.route('/health', methods=['GET'])
def health():
return jsonify({"status": "ok"})
@app.route('/search', methods=['POST'])
def search_route():
data = request.get_json()
query = data.get('query', '')
top = data.get('top', 10)
use_llm_for_teasers = data.get('use_llm_for_teasers', False)
request_id = data.get('request_id', '')
titles, docs, teasers, scores = search.search(query, top, use_llm_for_teasers)
result = [{'title': str(item1), 'text': str(item2), 'teaser': (item3), 'relevance': str(item4)}
for item1, item2, item3, item4 in zip(titles, docs, teasers, scores)]
# Log the query and result if ENABLE_LOGS is True
log_query_result(query, top, request_id, result)
return jsonify(result)
@app.route('/read_logs', methods=['GET'])
def read_logs():
logs = []
for log_file in os.listdir(LOGS_BASE_PATH):
if log_file.endswith(".json"):
with open(os.path.join(LOGS_BASE_PATH, log_file), 'r') as file:
log_data = json.load(file)
logs.append(log_data)
return jsonify(logs)
@app.route('/analyze_logs', methods=['GET'])
def analyze_logs():
logs_by_query_top = {}
for log_file in os.listdir(LOGS_BASE_PATH):
if log_file.endswith(".json"):
with open(os.path.join(LOGS_BASE_PATH, log_file), 'r') as file:
log_data = json.load(file)
query = log_data.get("query", "")
top = log_data.get("top", "")
request_id = log_data.get("request_id", "")
# Group logs by query and top
key = f"{query}_{top}"
if key not in logs_by_query_top:
logs_by_query_top[key] = []
logs_by_query_top[key].append(log_data)
# Analyze logs and filter out logs with different results for the same query and top
invalid_logs = []
for key, logs in logs_by_query_top.items():
if len(set(json.dumps(log['result']) for log in logs)) > 1:
invalid_logs.extend(logs)
return jsonify(invalid_logs)
if __name__ == '__main__':
app.run(debug=False, host='0.0.0.0')