Spaces:
Running
Running
import os | |
import logging | |
from pathlib import Path | |
from datetime import datetime | |
import traceback | |
import builtins | |
import uuid | |
import json | |
# Налаштування логування | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler("jira_assistant.log"), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger("jira_assistant") | |
# Створення необхідних директорій | |
for directory in ["data", "reports", "temp", "logs"]: | |
Path(directory).mkdir(exist_ok=True, parents=True) | |
# Імпорт необхідних модулів | |
from modules.data_import.csv_importer import JiraCsvImporter | |
from modules.data_analysis.statistics import JiraDataAnalyzer | |
from modules.data_analysis.visualizations import JiraVisualizer | |
from modules.reporting.report_generator import ReportGenerator | |
from modules.core.app_manager import AppManager | |
from modules.ai_analysis.jira_hybrid_chat import JiraHybridChat | |
class JiraAssistantApp: | |
""" | |
Головний клас додатку, який координує роботу всіх компонентів | |
""" | |
def __init__(self): | |
try: | |
# Отримуємо глобальний менеджер індексів | |
self.index_manager = builtins.index_manager | |
logger.info("Використовуємо глобальний менеджер індексів") | |
except AttributeError: | |
# Якщо глобальний менеджер не знайдено, створюємо новий | |
from modules.data_management.unified_index_manager import UnifiedIndexManager | |
self.index_manager = UnifiedIndexManager() | |
logger.info("Створено новий менеджер індексів") | |
self.app_manager = AppManager() | |
self.current_data = None | |
self.current_analysis = None | |
self.visualizations = None | |
self.last_loaded_csv = None | |
self.current_session_id = None | |
def analyze_csv_file(self, file_path, inactive_days=14, include_ai=False, api_key=None, model_type="openai", skip_indexing=True): | |
""" | |
Аналіз CSV-файлу Jira без створення індексів. | |
Args: | |
file_path (str): Шлях до CSV-файлу | |
inactive_days (int): Кількість днів для визначення неактивних тікетів | |
include_ai (bool): Чи використовувати AI-аналіз | |
api_key (str): API ключ для LLM (якщо include_ai=True) | |
model_type (str): Тип моделі LLM ("openai" або "gemini") | |
skip_indexing (bool): Пропустити створення індексів FAISS/BM25 | |
Returns: | |
dict: Результати аналізу | |
""" | |
try: | |
logger.info(f"Аналіз файлу: {file_path}") | |
# Генеруємо ідентифікатор сесії | |
import uuid | |
from datetime import datetime | |
self.current_session_id = f"{uuid.uuid4()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
# Завантаження даних | |
from modules.data_import.csv_importer import JiraCsvImporter | |
csv_importer = JiraCsvImporter(file_path) | |
self.current_data = csv_importer.load_data() | |
if self.current_data is None: | |
return {"error": "Не вдалося завантажити дані з CSV-файлу"} | |
# Створюємо індекси для даних, тільки якщо не вказано пропустити | |
if not skip_indexing: | |
indices_result = self.index_manager.get_or_create_indices( | |
self.current_data, | |
self.current_session_id | |
) | |
if isinstance(indices_result, dict) and "error" not in indices_result: | |
logger.info(f"Індекси успішно створено: {indices_result.get('indices_dir', 'невідомо')}") | |
self.current_indices_dir = indices_result.get("indices_dir", None) | |
self.indices_path = indices_result.get("indices_dir", None) | |
else: | |
logger.info("Створення індексів пропущено згідно з налаштуваннями") | |
# Аналіз даних | |
from modules.data_analysis.statistics import JiraDataAnalyzer | |
analyzer = JiraDataAnalyzer(self.current_data) | |
# Базова статистика | |
stats = analyzer.generate_basic_statistics() | |
# Аналіз неактивних тікетів | |
inactive_issues = analyzer.analyze_inactive_issues(days=inactive_days) | |
# Створення візуалізацій | |
from modules.data_analysis.visualizations import JiraVisualizer | |
visualizer = JiraVisualizer(self.current_data) | |
self.visualizations = { | |
"status": visualizer.plot_status_counts(), | |
"priority": visualizer.plot_priority_counts(), | |
"type": visualizer.plot_type_counts(), | |
"created_timeline": visualizer.plot_created_timeline(), | |
"inactive": visualizer.plot_inactive_issues(days=inactive_days) | |
} | |
# AI аналіз, якщо потрібен | |
ai_analysis = None | |
if include_ai and api_key: | |
from modules.ai_analysis.llm_connector import LLMConnector | |
llm = LLMConnector(api_key=api_key, model_type=model_type) | |
ai_analysis = llm.analyze_jira_data(stats, inactive_issues) | |
# Генерація звіту | |
from modules.reporting.report_generator import ReportGenerator | |
report_generator = ReportGenerator(self.current_data, stats, inactive_issues, ai_analysis) | |
report = report_generator.create_markdown_report(inactive_days=inactive_days) | |
# Зберігаємо поточний аналіз | |
self.current_analysis = { | |
"stats": stats, | |
"inactive_issues": inactive_issues, | |
"report": report, | |
"ai_analysis": ai_analysis | |
} | |
# Зберігаємо інформацію про сесію | |
session_info = { | |
"session_id": self.current_session_id, | |
"file_path": str(file_path), | |
"file_name": Path(file_path).name, | |
"rows_count": len(self.current_data), | |
"columns_count": len(self.current_data.columns), | |
"indices_dir": getattr(self, "current_indices_dir", None), | |
"created_at": datetime.now().isoformat() | |
} | |
# Зберігаємо інформацію про сесію у файл | |
sessions_dir = Path("temp/sessions") | |
sessions_dir.mkdir(exist_ok=True, parents=True) | |
session_file = sessions_dir / f"{self.current_session_id}.json" | |
with open(session_file, "w", encoding="utf-8") as f: | |
json.dump(session_info, f, ensure_ascii=False, indent=2) | |
return { | |
"report": report, | |
"visualizations": self.visualizations, | |
"ai_analysis": ai_analysis, | |
"error": None, | |
"session_id": self.current_session_id | |
} | |
except Exception as e: | |
error_msg = f"Помилка аналізу: {str(e)}\n\n{traceback.format_exc()}" | |
logger.error(error_msg) | |
return {"error": error_msg} | |
def save_report(self, format_type="markdown", include_visualizations=True, filepath=None): | |
""" | |
Збереження звіту у файл | |
Args: | |
format_type (str): Формат звіту ("markdown", "html", "pdf") | |
include_visualizations (bool): Чи включати візуалізації у звіт | |
filepath (str): Шлях для збереження файлу | |
Returns: | |
str: Шлях до збереженого файлу або повідомлення про помилку | |
""" | |
try: | |
if not self.current_analysis or "report" not in self.current_analysis: | |
return "Помилка: спочатку виконайте аналіз даних" | |
# Створення імені файлу, якщо не вказано | |
if not filepath: | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
report_filename = f"jira_report_{timestamp}" | |
reports_dir = Path("reports") | |
if format_type == "markdown": | |
filepath = reports_dir / f"{report_filename}.md" | |
elif format_type == "html": | |
filepath = reports_dir / f"{report_filename}.html" | |
elif format_type == "pdf": | |
filepath = reports_dir / f"{report_filename}.pdf" | |
# Створення генератора звітів | |
report_generator = ReportGenerator( | |
self.current_data, | |
self.current_analysis.get("stats"), | |
self.current_analysis.get("inactive_issues"), | |
self.current_analysis.get("ai_analysis") | |
) | |
# Збереження звіту | |
saved_path = report_generator.save_report( | |
filepath=filepath, | |
format=format_type, | |
include_visualizations=include_visualizations, | |
visualization_data=self.visualizations if include_visualizations else None | |
) | |
if saved_path: | |
return f"Звіт успішно збережено: {saved_path}" | |
else: | |
return "Не вдалося зберегти звіт" | |
except Exception as e: | |
error_msg = f"Помилка при збереженні звіту: {str(e)}\n\n{traceback.format_exc()}" | |
logger.error(error_msg) | |
return error_msg | |
def test_jira_connection(self, jira_url, username, api_token): | |
""" | |
Тестування підключення до Jira | |
Args: | |
jira_url (str): URL сервера Jira | |
username (str): Ім'я користувача | |
api_token (str): API токен | |
Returns: | |
bool: True якщо підключення успішне, False інакше | |
""" | |
from modules.data_import.jira_api import JiraConnector | |
return JiraConnector.test_connection(jira_url, username, api_token) | |
def generate_visualization(self, viz_type, limit=10, groupby="day"): | |
""" | |
Генерація конкретної візуалізації | |
Args: | |
viz_type (str): Тип візуалізації | |
limit (int): Ліміт для топ-N елементів | |
groupby (str): Групування для часових діаграм ('day', 'week', 'month') | |
Returns: | |
matplotlib.figure.Figure: Об'єкт figure | |
""" | |
if self.current_data is None: | |
logger.error("Немає даних для візуалізації") | |
return None | |
# Створюємо візуалізатор | |
visualizer = JiraVisualizer(self.current_data) | |
# Вибір типу візуалізації | |
if viz_type == "Статуси": | |
return visualizer.plot_status_counts() | |
elif viz_type == "Пріоритети": | |
return visualizer.plot_priority_counts() | |
elif viz_type == "Типи тікетів": | |
return visualizer.plot_type_counts() | |
elif viz_type == "Призначені користувачі": | |
return visualizer.plot_assignee_counts(limit=limit) | |
elif viz_type == "Активність створення": | |
return visualizer.plot_timeline(date_column='Created', groupby=groupby, cumulative=False) | |
elif viz_type == "Активність оновлення": | |
return visualizer.plot_timeline(date_column='Updated', groupby=groupby, cumulative=False) | |
elif viz_type == "Кумулятивне створення": | |
return visualizer.plot_timeline(date_column='Created', groupby=groupby, cumulative=True) | |
elif viz_type == "Неактивні тікети": | |
return visualizer.plot_inactive_issues() | |
elif viz_type == "Теплова карта: Типи/Статуси": | |
return visualizer.plot_heatmap(row_col='Issue Type', column_col='Status') | |
elif viz_type == "Часова шкала проекту": | |
timeline_plots = visualizer.plot_project_timeline() | |
return timeline_plots[0] if timeline_plots[0] is not None else None | |
elif viz_type == "Склад статусів з часом": | |
timeline_plots = visualizer.plot_project_timeline() | |
return timeline_plots[1] if timeline_plots[1] is not None else None | |
else: | |
logger.error(f"Невідомий тип візуалізації: {viz_type}") | |
return None | |
def generate_infographic(self): | |
""" | |
Генерація інфографіки з основними показниками | |
Returns: | |
matplotlib.figure.Figure: Об'єкт figure з інфографікою | |
""" | |
if self.current_data is None or self.current_analysis is None: | |
logger.error("Немає даних для створення інфографіки") | |
return None | |
visualizer = JiraVisualizer(self.current_data) | |
return visualizer.create_infographic(self.current_analysis["stats"]) | |
def generate_ai_report(self, api_key, model_type="gemini", temperature=0.2, custom_prompt=None): | |
""" | |
Генерація AI-звіту на основі даних | |
Args: | |
api_key (str): API ключ для LLM | |
model_type (str): Тип моделі ("openai" або "gemini") | |
temperature (float): Температура генерації | |
custom_prompt (str): Користувацький промпт | |
Returns: | |
str: Згенерований звіт або повідомлення про помилку | |
""" | |
try: | |
if self.current_data is None or self.current_analysis is None: | |
return "Помилка: спочатку виконайте аналіз даних" | |
# Перевіряємо наявність індексів | |
indices_dir = getattr(self, "current_indices_dir", None) | |
# Якщо індекси не створені, створюємо їх | |
if not indices_dir: | |
logger.info("Індекси не знайдено. Створюємо нові індекси.") | |
indices_result = self.index_manager.get_or_create_indices( | |
self.current_data, | |
self.current_session_id or f"temp_{uuid.uuid4()}" | |
) | |
if "error" in indices_result: | |
logger.error(f"Помилка при створенні індексів: {indices_result['error']}") | |
return f"Помилка при створенні індексів: {indices_result['error']}" | |
indices_dir = indices_result["indices_dir"] | |
self.current_indices_dir = indices_dir | |
# Імпортуємо AI асистента | |
JiraHybridChat | |
# Створюємо AI асистента | |
ai_assistant = JiraHybridChat( | |
api_key_openai=api_key if model_type == "openai" else None, | |
api_key_gemini=api_key if model_type == "gemini" else None, | |
model_type=model_type, temperature=temperature | |
) | |
# Генеруємо звіт | |
report_result = ai_assistant.generate_report( | |
self.current_data, | |
indices_dir=indices_dir, | |
custom_prompt=custom_prompt | |
) | |
if "error" in report_result: | |
logger.error(f"Помилка при генерації AI-звіту: {report_result['error']}") | |
return f"Помилка при генерації AI-звіту: {report_result['error']}" | |
# Зберігаємо звіт | |
report_path = Path("reports") / f"ai_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md" | |
with open(report_path, "w", encoding="utf-8") as f: | |
f.write(report_result["report"]) | |
logger.info(f"AI-звіт успішно згенеровано та збережено: {report_path}") | |
return report_result["report"] | |
except Exception as e: | |
error_msg = f"Помилка при генерації AI-звіту: {str(e)}\n\n{traceback.format_exc()}" | |
logger.error(error_msg) | |
return error_msg | |
def chat_with_data(self, question, api_key, model_type="gemini", temperature=0.2, chat_history=None): | |
""" | |
Чат з даними через AI | |
Args: | |
question (str): Питання користувача | |
api_key (str): API ключ для LLM | |
model_type (str): Тип моделі ("openai" або "gemini") | |
temperature (float): Температура генерації | |
chat_history (list): Історія чату | |
Returns: | |
dict: Відповідь AI та метадані | |
""" | |
try: | |
if self.current_data is None: | |
return {"error": "Помилка: спочатку виконайте аналіз даних"} | |
# Перевіряємо наявність індексів | |
indices_dir = getattr(self, "current_indices_dir", None) | |
ai_assistant = JiraHybridChat( | |
indices_dir=indices_dir, # Передаємо індексну директорію | |
app=self, # Передаємо посилання на app | |
api_key_openai=api_key if model_type == "openai" else None, | |
api_key_gemini=api_key if model_type == "gemini" else None, | |
model_type=model_type, | |
temperature=temperature | |
) | |
ai_assistant.df = self.current_data | |
# Виконуємо чат | |
chat_result = ai_assistant.chat_with_hybrid_search(question, chat_history) | |
if "error" in chat_result: | |
logger.error(f"Помилка при виконанні чату: {chat_result['error']}") | |
return {"error": f"Помилка при виконанні чату: {chat_result['error']}"} | |
logger.info(f"Чат успішно виконано, токенів: {chat_result['metadata']['total_tokens']}") | |
return chat_result | |
except Exception as e: | |
error_msg = f"Помилка при виконанні чату: {str(e)}\n\n{traceback.format_exc()}" | |
logger.error(error_msg) | |
return {"error": error_msg} | |
def get_data_statistics(self): | |
""" | |
Отримання статистики даних | |
Returns: | |
dict: Статистика даних | |
""" | |
if self.current_data is None or self.current_analysis is None: | |
return {"error": "Немає даних для отримання статистики"} | |
return self.current_analysis["stats"] | |
def get_inactive_issues(self): | |
""" | |
Отримання неактивних тікетів | |
Returns: | |
dict: Неактивні тікети | |
""" | |
if self.current_data is None or self.current_analysis is None: | |
return {"error": "Немає даних для отримання неактивних тікетів"} | |
return self.current_analysis["inactive_issues"] | |
def get_data_sample(self, rows=5): | |
""" | |
Отримання зразка даних | |
Args: | |
rows (int): Кількість рядків | |
Returns: | |
dict: Зразок даних | |
""" | |
if self.current_data is None: | |
return {"error": "Немає даних для отримання зразка"} | |
try: | |
sample = self.current_data.head(rows).to_dict(orient="records") | |
return {"sample": sample, "columns": list(self.current_data.columns)} | |
except Exception as e: | |
return {"error": f"Помилка при отриманні зразка даних: {str(e)}"} | |
def get_model_info(self, api_key, model_type="gemini"): | |
""" | |
Отримання інформації про модель | |
Args: | |
api_key (str): API ключ для LLM | |
model_type (str): Тип моделі ("openai" або "gemini") | |
Returns: | |
dict: Інформація про модель | |
""" | |
try: | |
ai_assistant = JiraHybridChat( | |
api_key_openai=api_key if model_type == "openai" else None, | |
api_key_gemini=api_key if model_type == "gemini" else None, | |
model_type=model_type | |
) | |
return ai_assistant.get_model_info() | |
except Exception as e: | |
error_msg = f"Помилка при отриманні інформації про модель: {str(e)}" | |
logger.error(error_msg) | |
return {"error": error_msg} | |
def check_api_keys(self, api_key_openai=None, api_key_gemini=None): | |
""" | |
Перевірка API ключів | |
Args: | |
api_key_openai (str): API ключ для OpenAI | |
api_key_gemini (str): API ключ для Gemini | |
Returns: | |
dict: Результати перевірки | |
""" | |
try: | |
ai_assistant = JiraHybridChat( | |
api_key_openai=api_key_openai, | |
api_key_gemini=api_key_gemini | |
) | |
return ai_assistant.check_api_keys() | |
except Exception as e: | |
error_msg = f"Помилка при перевірці API ключів: {str(e)}" | |
logger.error(error_msg) | |
return {"error": error_msg} | |
def cleanup_old_indices(self, max_age_days=7, max_indices=20): | |
""" | |
Очищення застарілих індексів | |
Args: | |
max_age_days (int): Максимальний вік індексів у днях | |
max_indices (int): Максимальна кількість індексів для зберігання | |
Returns: | |
dict: Результат очищення | |
""" | |
try: | |
deleted_count = self.index_manager.cleanup_old_indices(max_age_days, max_indices) | |
logger.info(f"Очищено {deleted_count} застарілих індексів") | |
return { | |
"success": True, | |
"deleted_count": deleted_count, | |
"message": f"Очищено {deleted_count} застарілих індексів" | |
} | |
except Exception as e: | |
error_msg = f"Помилка при очищенні застарілих індексів: {str(e)}" | |
logger.error(error_msg) | |
return {"error": error_msg} |