jira-ai-assistant / modules /interface /csv_analysis_ui.py
DocUA's picture
Єдиний коміт - очищення історії
4ad5efa
import os
import gradio as gr
import logging
from modules.interface.local_data_helper import LocalDataHelper
from datetime import datetime
logger = logging.getLogger("jira_assistant_interface")
def simplified_analyze_csv(file_obj, inactive_days, app):
"""
Спрощений аналіз CSV-файлу, викликає методи app для аналізу (без індексування).
"""
if file_obj is None:
return "Помилка: файл не вибрано"
from pathlib import Path
import shutil
import pandas as pd
try:
logger.info(f"Отримано файл: {file_obj.name}, тип: {type(file_obj)}")
# Створення директорій
Path("temp/indices").mkdir(exist_ok=True, parents=True)
data_dir = Path("data")
data_dir.mkdir(exist_ok=True, parents=True)
# Формування шляху для збереження
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
base_dir = os.path.dirname(os.path.abspath(__file__))
temp_file_path = os.path.join(base_dir, "../../data", f"imported_data_{timestamp}.csv")
logger.info(f"Шлях для збереження: {temp_file_path}")
logger.info(f"Робоча директорія: {os.getcwd()}")
# Копіюємо/записуємо файл
if hasattr(file_obj, 'name'):
source_path = file_obj.name
shutil.copy2(source_path, temp_file_path)
logger.info(f"Файл скопійовано з {source_path} у {temp_file_path}")
else:
with open(temp_file_path, "wb") as f:
f.write(file_obj.read())
logger.info(f"Файл створено у {temp_file_path}")
if not os.path.exists(temp_file_path):
logger.error(f"Помилка: файл {temp_file_path} не було створено")
return "Помилка: не вдалося створити файл даних"
file_size = os.path.getsize(temp_file_path)
logger.info(f"Розмір файлу: {file_size} байт")
if file_size == 0:
logger.error("Помилка: порожній файл")
return "Помилка: файл порожній"
# Перевірка, що CSV читається
try:
df_test = pd.read_csv(temp_file_path)
logger.info(f"Файл успішно прочитано. Кількість рядків: {len(df_test)}, колонок: {len(df_test.columns)}")
app.current_data = df_test
except Exception as csv_err:
logger.error(f"Помилка при читанні CSV: {csv_err}")
# Виклик методу аналізу без AI і без індексування
result = app.analyze_csv_file(
temp_file_path,
inactive_days=inactive_days,
include_ai=False,
skip_indexing=True # Важливо: пропускаємо створення індексів
)
if result.get("error"):
logger.error(f"Помилка аналізу: {result.get('error')}")
return result.get("error")
report = result.get("report", "")
app.last_loaded_csv = temp_file_path
logger.info(f"Шлях до файлу збережено в app.last_loaded_csv: {app.last_loaded_csv}")
if not os.path.exists(app.last_loaded_csv):
logger.error(f"Помилка: файл {app.last_loaded_csv} зник після аналізу")
return "Файл проаналізовано, але не збережено для подальшого використання. Спробуйте ще раз."
# Логування вмісту директорії data
try:
logger.info(f"Вміст директорії data: {os.listdir(os.path.join(base_dir, '../../data'))}")
except Exception as dir_err:
logger.error(f"Не вдалося отримати вміст директорії: {dir_err}")
return report
except Exception as e:
import traceback
error_msg = f"Помилка аналізу: {str(e)}\n\n{traceback.format_exc()}"
logger.error(error_msg)
return error_msg
def local_files_analyze_csv(file_obj, inactive_days, app):
"""
Аналіз CSV з локальних файлів або через нове завантаження.
Якщо file_obj = None, використовуємо дані з app.last_loaded_csv.
"""
if file_obj is None:
if hasattr(app, 'current_data') and app.current_data is not None and \
hasattr(app, 'last_loaded_csv') and app.last_loaded_csv is not None:
try:
temp_file_path = app.last_loaded_csv
if not os.path.exists(temp_file_path):
return "Помилка: файл не знайдено. Спочатку ініціалізуйте дані."
# Аналіз без індексування
result = app.analyze_csv_file(
temp_file_path,
inactive_days=inactive_days,
include_ai=False,
skip_indexing=True # Важливо: пропускаємо створення індексів
)
if result.get("error"):
return result.get("error")
return result.get("report", "")
except Exception as e:
return f"Помилка аналізу: {str(e)}"
else:
return "Помилка: файл не вибрано. Спочатку ініціалізуйте дані або завантажте CSV файл."
return simplified_analyze_csv(file_obj, inactive_days, app)
def init_and_analyze(selected_files, uploaded_file, inactive_days, app, local_helper):
"""
Об'єднує ініціалізацію даних та аналіз без створення індексів FAISS/BM25:
1) Викликається initialize_data_without_indices для підготовки даних без індексування
2) Якщо ініціалізація успішна, викликається local_files_analyze_csv
Повертає об'єднаний звіт у форматі Markdown, який містить статус ініціалізації та результати аналізу.
"""
# КРОК 1: Ініціалізація - без створення індексів
status_md, data_info = initialize_data_without_indices(selected_files, uploaded_file, app, local_helper)
if data_info is None:
return status_md
# КРОК 2: Аналіз
analysis_report = local_files_analyze_csv(uploaded_file, inactive_days, app)
# Об'єднуємо результати (форматуємо як Markdown)
combined_md = (
f"{status_md}\n\n---\n\n"
"### Результати аналізу\n\n"
f"{analysis_report}"
)
return combined_md
def initialize_data_without_indices(selected_files, uploaded_file, app, local_helper):
"""
Модифікована версія initialize_data, яка не створює індекси FAISS/BM25.
Виконує тільки підготовку даних для аналізу.
Args:
selected_files (list): Список вибраних файлів
uploaded_file: Завантажений файл
app: Екземпляр JiraAssistantApp
local_helper: Екземпляр LocalDataHelper
Returns:
tuple: (status_html, data_info) - статус ініціалізації та інформація про дані
"""
try:
session_id = local_helper.get_or_create_session()
app.current_session_id = session_id
# Отримуємо інформацію про локальні файли
local_files_info = local_helper.data_manager.get_local_files()
local_files_dict = {info['name']: info['path'] for info in local_files_info}
# Визначаємо шляхи до вибраних файлів
selected_paths = []
for selected in selected_files:
file_name = selected.split(" (")[0].strip() if " (" in selected else selected.strip()
if file_name in local_files_dict:
selected_paths.append(local_files_dict[file_name])
# Обробка завантаженого файлу
uploaded_file_path = None
if uploaded_file:
if hasattr(uploaded_file, 'name'):
uploaded_file_path = uploaded_file.name
else:
uploaded_file_path = uploaded_file
# Перевірка наявності файлів
if not selected_paths and not uploaded_file_path:
return "<p style='color:red;'>Помилка: не вибрано жодного файлу для обробки</p>", None
# Ініціалізація даних без створення індексів
success, result_info = initialize_session_data_no_indices(
local_helper.data_manager,
session_id,
selected_paths,
uploaded_file_path
)
if not success:
error_msg = result_info.get("error", "Невідома помилка")
return f"<p style='color:red;'>Помилка при ініціалізації даних: {error_msg}</p>", None
# Зберігаємо результати в app
merged_df = result_info.get("merged_df")
if merged_df is not None:
app.current_data = merged_df
app.last_loaded_csv = result_info.get("merged_file")
# ВАЖЛИВО: НЕ встановлюємо шлях до індексів, щоб уникнути їх створення
# Це відрізняється від оригінальної функції initialize_data
logger.info("Успішна ініціалізація даних без створення індексів")
# Формуємо HTML-відповідь про успішну ініціалізацію
status_html = "<h3 style='color:green;'>✅ Дані успішно ініціалізовано</h3>"
status_html += f"<p>Об'єднано {result_info.get('source_files_count', 0)} файлів</p>"
status_html += f"<p>Загальна кількість рядків: {result_info.get('rows_count', 0)}</p>"
status_html += f"<p>Кількість колонок: {result_info.get('columns_count', 0)}</p>"
files_info = {
"session_id": session_id,
"merged_file": result_info.get("merged_file"),
"rows_count": result_info.get("rows_count", 0),
"columns_count": result_info.get("columns_count", 0),
"source_files_count": result_info.get("source_files_count", 0)
}
return status_html, files_info
except Exception as e:
logger.error(f"Помилка при ініціалізації даних без індексів: {e}")
import traceback
error_details = traceback.format_exc()
logger.error(error_details)
return f"<p style='color:red;'>Помилка при ініціалізації даних: {str(e)}</p>", None
def initialize_session_data_no_indices(data_manager, session_id, selected_paths, uploaded_file_path=None):
"""
Модифікована версія initialize_session_data, яка не створює індекси.
Args:
data_manager: Екземпляр DataManager
session_id (str): ID сесії
selected_paths (list): Список шляхів до вибраних файлів
uploaded_file_path (str, optional): Шлях до завантаженого файлу
Returns:
tuple: (success, result_info) - успішність операції та інформація про результат
"""
try:
# Копіюємо вибрані файли в сесію
copied_files = data_manager.copy_files_to_session(session_id, selected_paths)
# Додаємо завантажений файл, якщо він є
if uploaded_file_path and os.path.exists(uploaded_file_path):
# Копіюємо файл до сесії
session_data_dir = data_manager.session_manager.get_session_data_dir(session_id)
if not session_data_dir:
return False, {"error": "Не вдалося отримати директорію даних сесії"}
# Створюємо унікальне ім'я для завантаженого файлу
from pathlib import Path
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
dest_filename = f"uploaded_{timestamp}_{Path(uploaded_file_path).name}"
dest_path = session_data_dir / dest_filename
# Копіюємо файл
import shutil
shutil.copyfile(uploaded_file_path, dest_path)
# Додаємо інформацію про файл до сесії
data_manager.session_manager.add_data_file(
session_id,
str(dest_path),
file_type="uploaded",
description=f"Uploaded file: {Path(uploaded_file_path).name}"
)
copied_files.append(str(dest_path))
# Якщо немає файлів для обробки, повертаємо помилку
if not copied_files:
return False, {"error": "Не вибрано жодного файлу для обробки"}
# Завантажуємо дані з усіх файлів
loaded_data = data_manager.load_data_from_files(session_id, copied_files)
# Фільтруємо тільки успішно завантажені файли
valid_data = [(path, df) for path, df, success in loaded_data if success and df is not None]
if not valid_data:
return False, {"error": "Не вдалося завантажити жодного файлу"}
# Отримуємо список DataFrame
dataframes = [df for _, df in valid_data]
# Об'єднуємо дані
merged_df, output_path = data_manager.merge_dataframes(
session_id,
dataframes,
output_name=f"merged_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
)
if merged_df is None or not output_path:
return False, {"error": "Не вдалося об'єднати дані"}
result_info = {
"merged_file": output_path,
"rows_count": len(merged_df),
"columns_count": len(merged_df.columns),
"source_files_count": len(valid_data),
"merged_df": merged_df # Передаємо DataFrame для подальшого використання
}
logger.info(f"Дані успішно ініціалізовано без створення індексів: {output_path}")
return True, result_info
except Exception as e:
logger.error(f"Помилка при ініціалізації даних сесії {session_id}: {e}")
return False, {"error": f"Помилка при ініціалізації даних: {str(e)}"}
def cleanup_temp_data_handler(app):
"""
Обробник для кнопки очищення тимчасових даних.
Виконує очищення даних та скидає відповідні змінні в додатку.
Args:
app: Екземпляр JiraAssistantApp
Returns:
str: HTML-відформатований результат очищення
"""
try:
import builtins
from pathlib import Path
# Перевіряємо, чи є data_manager у додатку
if hasattr(app, 'data_manager'):
data_manager = app.data_manager
else:
# Створюємо новий екземпляр, якщо відсутній
from modules.data_management.data_manager import DataManager
data_manager = DataManager()
# Запам'ятовуємо стан перед очищенням
had_indices_path = hasattr(app, 'indices_path') and app.indices_path is not None
had_session_id = hasattr(app, 'current_session_id') and app.current_session_id is not None
had_loaded_csv = hasattr(app, 'last_loaded_csv') and app.last_loaded_csv is not None
# Виконуємо очищення
result = data_manager.cleanup_temp_data()
# Скидаємо змінні додатку, які вказують на видалені дані
reset_info = ""
# Скидаємо indices_path
if hasattr(app, 'indices_path'):
old_path = app.indices_path
app.indices_path = None
reset_info += f"<p>• Скинуто шлях до індексів: {old_path}</p>"
# Скидаємо current_session_id
if hasattr(app, 'current_session_id'):
old_session = app.current_session_id
app.current_session_id = None
reset_info += f"<p>• Скинуто ID сесії: {old_session}</p>"
# Скидаємо шлях до останнього завантаженого файлу, якщо він був у тимчасовій папці
if hasattr(app, 'last_loaded_csv') and app.last_loaded_csv:
last_file_path = Path(app.last_loaded_csv)
if any(temp_dir in str(last_file_path) for temp_dir in ["temp/", "reports/", "data/"]):
old_path = app.last_loaded_csv
app.last_loaded_csv = None
reset_info += f"<p>• Скинуто шлях до файлу CSV: {old_path}</p>"
# Також скидаємо current_data, якщо він був завантажений з цього файлу
if hasattr(app, 'current_data') and app.current_data is not None:
app.current_data = None
reset_info += "<p>• Очищено завантажені дані DataFrame</p>"
# Скидаємо кешовані індекси в глобальних об'єктах
try:
# Скидаємо глобальні змінні, якщо вони існують
if hasattr(builtins, 'app') and hasattr(builtins.app, 'indices_path'):
builtins.app.indices_path = None
reset_info += "<p>• Скинуто глобальний шлях до індексів</p>"
if hasattr(builtins, 'index_manager') and hasattr(builtins.index_manager, 'last_indices_path'):
builtins.index_manager.last_indices_path = None
reset_info += "<p>• Скинуто глобальний шлях до останніх індексів</p>"
# Якщо є кеш індексів в JiraHybridChat, очищаємо його
if hasattr(app, 'chat_instances_cache'):
app.chat_instances_cache = {}
reset_info += "<p>• Очищено кеш екземплярів чату</p>"
# Перевірка наявності статичного кешу у класі JiraHybridChat
from modules.ai_analysis.jira_hybrid_chat import JiraHybridChat
if hasattr(JiraHybridChat, 'chat_instances_cache') and JiraHybridChat.chat_instances_cache:
JiraHybridChat.chat_instances_cache = {}
reset_info += "<p>• Очищено статичний кеш чату</p>"
except Exception as e:
logger.warning(f"Помилка при очищенні глобальних змінних: {e}")
if result.get("success", False):
stats = result.get("stats", {})
# Формуємо HTML-відповідь
html_response = "<h3 style='color:green;'>✅ Тимчасові дані успішно очищено</h3>"
html_response += "<div style='background-color:#e9f7ef; padding:15px; border-radius:5px; margin-top:10px;'>"
html_response += "<p><b>Результати очищення:</b></p>"
html_response += f"<p>• Видалено тимчасових файлів: {stats.get('temp_files_removed', 0)}</p>"
html_response += f"<p>• Видалено директорій сесій: {stats.get('session_dirs_removed', 0)}</p>"
html_response += f"<p>• Видалено директорій індексів: {stats.get('indices_dirs_removed', 0)}</p>"
html_response += f"<p>• Видалено звітів і візуалізацій: {stats.get('reports_removed', 0)}</p>"
html_response += "</div>"
# Додаємо інформацію про скинуті змінні
if reset_info:
html_response += "<div style='background-color:#FDEBD0; padding:15px; border-radius:5px; margin-top:10px;'>"
html_response += "<p><b>Скинуто наступні посилання на дані:</b></p>"
html_response += reset_info
html_response += "</div>"
# Додаємо інформацію про стан перед/після
if had_indices_path or had_session_id or had_loaded_csv:
html_response += """
<div style='margin-top:15px;'>
<p><i>⚠️ Увага: Для подальшого аналізу потрібно заново ініціалізувати дані</i></p>
</div>
"""
return html_response
else:
error_msg = result.get("error", "Невідома помилка")
return f"<h3 style='color:red;'>❌ Помилка при очищенні тимчасових даних</h3><p>{error_msg}</p>"
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Помилка при очищенні тимчасових даних: {e}\n{error_details}")
return f"<h3 style='color:red;'>❌ Помилка при очищенні тимчасових даних</h3><p>{str(e)}</p>"
def create_csv_analysis_tab(app):
"""
Створює вкладку "CSV Аналіз" у Gradio інтерфейсі:
- Завантаження файлів та перегляд локальних файлів.
- Об'єднаний аналіз: ініціалізація даних та аналіз через одну кнопку.
- Очищення тимчасових даних через кнопку.
В результаті звіт відображається як Markdown.
"""
with gr.Tab("CSV Аналіз"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Завантаження CSV")
local_file_input = gr.File(label="Завантажити CSV файл Jira")
local_inactive_days = gr.Slider(
minimum=1, maximum=90, value=14, step=1,
label="Кількість днів для визначення неактивних тікетів"
)
gr.Markdown("### Локальні файли")
refresh_btn = gr.Button("Оновити список файлів", variant="secondary")
local_helper = LocalDataHelper(app)
local_files_list, local_files_info = local_helper.list_local_files()
local_files_dropdown = gr.Dropdown(
choices=local_files_list,
multiselect=True,
label="Виберіть файли з директорії current_data"
)
local_files_info_md = gr.Markdown(local_files_info)
gr.Markdown("### Перегляд вибраного файлу")
preview_file_dropdown = gr.Dropdown(
choices=local_files_list,
multiselect=False,
label="Виберіть файл для перегляду"
)
preview_btn = gr.Button("Переглянути", variant="secondary")
file_preview_md = gr.Markdown("Виберіть файл для перегляду")
# Секція для очищення тимчасових даних
with gr.Accordion("Обслуговування", open=False):
gr.Markdown("""
### Очищення тимчасових даних
Ця функція видаляє всі тимчасові файли і директорії, крім файлів у папці **current_data**.
**Будуть очищені:**
- Тимчасові файли індексів (temp/indices)
- Сесії користувачів (temp/sessions)
- Тимчасові звіти (reports)
- Інші файли в директорії temp
""")
cleanup_btn = gr.Button("🧹 Очистити тимчасові дані", variant="secondary")
cleanup_result = gr.HTML(label="Результат очищення", visible=True)
gr.Markdown("### Об'єднаний аналіз (Ініціалізація + Аналіз)")
init_analyze_btn = gr.Button("Ініціалізація та Аналіз", variant="primary")
with gr.Column(scale=2):
gr.Markdown("### Результати ініціалізації")
combined_output = gr.Markdown(
label="Об'єднаний звіт",
value="Тут буде відображено статус ініціалізації та результати аналізу"
)
gr.Markdown("""
<style>
/* Додаткова стилізація */
.cleanup-note {
margin-top: 15px;
padding: 10px;
background-color: #f8f9fa;
border-left: 4px solid #6c757d;
}
</style>
""")
def refresh_local_files():
files_list, files_info = local_helper.list_local_files()
return files_list, files_info, files_list
refresh_btn.click(
refresh_local_files,
inputs=[],
outputs=[local_files_dropdown, local_files_info_md, preview_file_dropdown]
)
preview_btn.click(
local_helper.get_file_preview,
inputs=[preview_file_dropdown],
outputs=[file_preview_md]
)
init_analyze_btn.click(
fn=lambda sel_files, upl_file, days: init_and_analyze(sel_files, upl_file, days, app, local_helper),
inputs=[local_files_dropdown, local_file_input, local_inactive_days],
outputs=[combined_output]
)
# Підключаємо обробник до кнопки очищення тимчасових даних
cleanup_btn.click(
fn=lambda: cleanup_temp_data_handler(app),
inputs=[],
outputs=[cleanup_result]
)