import os import logging import json import shutil from pathlib import Path import pandas as pd from datetime import datetime, timedelta # Імпорт LlamaIndex компонентів from llama_index.core import ( VectorStoreIndex, Document, StorageContext, load_index_from_storage, Settings ) from llama_index.core.node_parser import TokenTextSplitter from llama_index.retrievers.bm25 import BM25Retriever from llama_index.vector_stores.faiss import FaissVectorStore from llama_index.core.schema import TextNode from llama_index.core.storage.docstore import SimpleDocumentStore import faiss from modules.config.paths import INDICES_DIR from modules.data_management.hash_utils import generate_data_hash from modules.data_management.index_utils import ( check_indexing_availability, initialize_embedding_model, check_index_integrity ) from modules.config.ai_settings import ( get_metadata_csv, ) # Встановлюємо формат збереження на бінарний (не JSON) Settings.persist_json_format = False logger = logging.getLogger(__name__) class UnifiedIndexManager: """ Уніфікований менеджер для створення та управління індексами даних. """ def __init__(self, base_indices_dir=None): """ Ініціалізація менеджера індексів. Args: base_indices_dir (str, optional): Базова директорія для зберігання індексів """ self.base_indices_dir = Path(base_indices_dir) if base_indices_dir else INDICES_DIR self.base_indices_dir.mkdir(exist_ok=True, parents=True) # Перевірка доступності модулів для індексування self.indexing_available = check_indexing_availability("temp/indices") if not self.indexing_available: logger.warning("Функціональність індексування недоступна. Встановіть необхідні пакети.") def get_or_create_indices(self, df, session_id=None): """ Отримання або створення індексів для даних. Args: df (pandas.DataFrame): DataFrame з даними session_id (str, optional): Ідентифікатор сесії Returns: dict: Інформація про індекси """ if not self.indexing_available: return {"error": "Функціональність індексування недоступна. Встановіть необхідні пакети."} try: # Генеруємо хеш для даних data_hash = generate_data_hash(df, key_columns=['Issue key', 'Summary', 'Status', 'Issue Type', 'Created', 'Updated']) if not data_hash: return {"error": "Не вдалося згенерувати хеш для даних"} # Перевіряємо, чи існують індекси для цих даних existing_indices = self._find_indices_by_hash(data_hash) if existing_indices: # Перевіряємо цілісність індексів is_valid, message = check_index_integrity(existing_indices) if is_valid: logger.info(f"Знайдено існуючі індекси для даних з хешем {data_hash}") return { "success": True, "indices_dir": str(existing_indices), "data_hash": data_hash, "reused_existing": True } else: logger.warning(f"Знайдено індекси з відповідним хешем, але вони не пройшли перевірку цілісності: {message}") # Створюємо нові індекси # Визначаємо директорію для індексів if session_id: indices_path = self.base_indices_dir / session_id else: # Якщо не вказано session_id, використовуємо поточну дату і час timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") indices_path = self.base_indices_dir / timestamp indices_path.mkdir(exist_ok=True, parents=True) # Створюємо нові індекси result = self._create_new_indices(indices_path, session_id, data_hash, df) # Форматуємо результат if isinstance(result, dict): return result else: return { "success": True, "indices_dir": str(indices_path), "data_hash": data_hash } except Exception as e: logger.error(f"Помилка при отриманні або створенні індексів: {e}") import traceback logger.error(traceback.format_exc()) return {"error": f"Помилка при отриманні або створенні індексів: {str(e)}"} def _find_indices_by_hash(self, data_hash): """ Пошук існуючих індексів за хешем даних. Args: data_hash (str): Хеш даних Returns: Path: Шлях до директорії з індексами або None, якщо не знайдено """ try: # Перебираємо всі піддиректорії в базовій директорії індексів for index_dir in self.base_indices_dir.iterdir(): if not index_dir.is_dir(): continue # Перевіряємо метадані metadata_file = index_dir / "metadata.json" if not metadata_file.exists(): continue try: with open(metadata_file, "r", encoding="utf-8") as f: metadata = json.load(f) # Перевіряємо хеш if metadata.get("data_hash") == data_hash: return index_dir except Exception as e: logger.error(f"Помилка при перевірці метаданих {metadata_file}: {e}") return None except Exception as e: logger.error(f"Помилка при пошуку індексів за хешем: {e}") return None def _create_new_indices(self, indices_path, session_id, data_hash, df): """ Створення нових індексів. Args: indices_path (Path): Шлях для збереження індексів session_id (str): Ідентифікатор сесії data_hash (str): Хеш даних df (pandas.DataFrame): DataFrame з даними Returns: dict: Інформація про створені індекси """ try: # Ініціалізуємо модель ембедингів embed_model = initialize_embedding_model() if not embed_model: return {"error": "Не вдалося ініціалізувати модель ембедингів"} # Отримуємо розмірність ембедингів sample_embedding = embed_model.get_text_embedding("Test") embedding_dim = len(sample_embedding) logger.info(f"Розмірність ембедингів: {embedding_dim}") # Конвертуємо DataFrame в документи documents = self._convert_dataframe_to_documents(df) if not documents: return {"error": "Не вдалося конвертувати дані в документи"} # Створюємо ноди з документів nodes = [TextNode(text=doc.text, metadata=doc.metadata) for doc in documents] # Створюємо FAISS індекс faiss_index = faiss.IndexFlatL2(embedding_dim) vector_store = FaissVectorStore(faiss_index=faiss_index) # Створюємо документне сховище docstore = SimpleDocumentStore() docstore.add_documents(nodes) # Створюємо контекст зберігання storage_context = StorageContext.from_defaults( docstore=docstore, vector_store=vector_store ) # Встановлюємо модель ембедингів Settings.embed_model = embed_model # Створюємо індекс index = VectorStoreIndex.from_documents( documents, storage_context=storage_context ) # Зберігаємо індекс у файл (бінарний формат) index.storage_context.persist(str(indices_path)) # Створюємо BM25 retriever і зберігаємо його параметри bm25_retriever = BM25Retriever.from_defaults( docstore=docstore, similarity_top_k=10 ) self._save_bm25_data(indices_path, bm25_retriever) # Зберігаємо метадані self._save_indices_metadata(indices_path, { "session_id": session_id, "created_at": datetime.now().isoformat(), "data_hash": data_hash, "documents_count": len(documents), "nodes_count": len(nodes), "rows_count": len(df), "columns_count": len(df.columns), "embedding_model": str(embed_model), "embedding_dim": embedding_dim, "storage_format": "binary" }) # Створюємо маркерний файл для перевірки валідності індексів with open(indices_path / "indices.valid", "w") as f: f.write(f"Indices created at {datetime.now().isoformat()}") logger.info(f"Індекси успішно створено в {indices_path}") # Зберігаємо шлях глобально, якщо доступно self._save_indices_path_globally(str(indices_path)) return { "success": True, "indices_dir": str(indices_path), "data_hash": data_hash, "documents_count": len(documents), "nodes_count": len(nodes), "rows_count": len(df), "reused_existing": False } except Exception as e: logger.error(f"Помилка при створенні нових індексів: {e}") import traceback logger.error(traceback.format_exc()) return {"error": f"Помилка при створенні нових індексів: {str(e)}"} def _save_indices_metadata(self, indices_path, metadata): """Зберігає метадані індексів у файл.""" try: with open(indices_path / "metadata.json", "w", encoding="utf-8") as f: json.dump(metadata, f, ensure_ascii=False, indent=2) return True except Exception as e: logger.error(f"Помилка при збереженні метаданих: {e}") return False def _save_indices_path_globally(self, indices_path): """Зберігає шлях до індексів у глобальних об'єктах (app, index_manager).""" try: import builtins if hasattr(builtins, 'app'): builtins.app.indices_path = indices_path logger.info(f"Шлях до індексів збережено глобально: {indices_path}") # Якщо також є глобальний index_manager, зберігаємо в ньому if hasattr(builtins, 'index_manager'): builtins.index_manager.last_indices_path = indices_path return True except Exception as e: logger.warning(f"Не вдалося зберегти шлях до індексів глобально: {e}") return False def _convert_dataframe_to_documents(self, df): """ Конвертує DataFrame у документи для індексування. Кожен документ представляє один рядок CSV з усіма його полями. """ try: # Перевірка типу даних if not hasattr(df, 'iterrows'): logger.error(f"Отримано не DataFrame: {type(df)}") return None # Конвертація в документи documents = [] for idx, row in df.iterrows(): # Формуємо текст документа, включаючи всі основні поля text_parts = [] # Додаємо основні поля key_fields = [ ('Issue key', 'Ключ задачі'), ('Summary', 'Заголовок'), ('Issue Type', 'Тип задачі'), ('Status', 'Статус'), ('Priority', 'Пріоритет'), ('Assignee', 'Виконавець'), ('Reporter', 'Автор'), ('Created', 'Створено'), ('Updated', 'Оновлено'), ('Project name', 'Проект') ] for field, title in key_fields: if field in row and pd.notna(row[field]): text_parts.append(f"{title}: {str(row[field])}") # Додаємо опис, якщо він є if 'Description' in row and pd.notna(row['Description']): text_parts.append(f"Опис: {str(row['Description'])}") # Додаємо коментарі, якщо вони є comments = [] for col in df.columns: if col.startswith('Comment') and pd.notna(row[col]): comments.append(str(row[col])) if comments: text_parts.append("Коментарі:") for i, comment in enumerate(comments, 1): text_parts.append(f"Коментар {i}: {comment}") # Додаємо інформацію про зв'язки, якщо вона є links = [] for col in df.columns: if col.startswith('Outward issue link') and pd.notna(row[col]): link_type = col.replace('Outward issue link (', '').replace(')', '') links.append(f"{link_type}: {str(row[col])}") if links: text_parts.append("Зв'язки:") for link in links: text_parts.append(link) # Додаємо користувацькі поля custom_fields = [] for col in df.columns: if (col.startswith('Custom field') or col.startswith('Sprint')) and pd.notna(row[col]): field_name = col.replace('Custom field (', '').replace(')', '') custom_fields.append(f"{field_name}: {str(row[col])}") if custom_fields: text_parts.append("Додаткові поля:") for field in custom_fields: text_parts.append(field) # Об'єднуємо все в один текст text = "\n".join(text_parts) # Якщо текст порожній, використовуємо хоча б заголовок if not text and 'Summary' in row and pd.notna(row['Summary']): text = f"Заголовок: {str(row['Summary'])}" elif not text: text = f"Задача {idx}" # Створюємо метадані - включаємо всі основні поля metadata = get_metadata_csv(row, idx) # Додаємо інформацію про зв'язки в метадані if 'Outward issue link (Relates)' in row and pd.notna(row['Outward issue link (Relates)']): metadata["related_issues"] = row['Outward issue link (Relates)'] # Створення документа doc = Document( text=text, metadata=metadata ) documents.append(doc) logger.info(f"Створено {len(documents)} документів з DataFrame") return documents except Exception as e: logger.error(f"Помилка при конвертації DataFrame в документи: {e}") import traceback logger.error(traceback.format_exc()) return [] def _save_bm25_data(self, indices_path, bm25_retriever): """ Збереження даних для BM25 retriever. """ try: # Створюємо директорію для BM25 bm25_dir = indices_path / "bm25" bm25_dir.mkdir(exist_ok=True) # Зберігаємо параметри BM25 bm25_params = { "similarity_top_k": bm25_retriever.similarity_top_k, "alpha": getattr(bm25_retriever, "alpha", 0.75), "beta": getattr(bm25_retriever, "beta", 0.75), "index_creation_time": datetime.now().isoformat() } with open(bm25_dir / "params.json", "w", encoding="utf-8") as f: json.dump(bm25_params, f, ensure_ascii=False, indent=2) logger.info(f"Дані BM25 збережено в {bm25_dir}") return True except Exception as e: logger.error(f"Помилка при збереженні даних BM25: {e}") return False def load_indices(self, indices_dir): """Завантаження індексів з директорії.""" try: # Перевірка наявності директорії indices_path = Path(indices_dir) if not indices_path.exists(): logger.error(f"Директорія індексів не існує: {indices_dir}") return None, None # Перевірка наявності маркерного файлу marker_path = indices_path / "indices.valid" if not marker_path.exists(): logger.warning(f"Файл маркера не знайдено в {indices_dir}. Індекси не завантажено.") return None, None try: # Спробуємо завантажити vector_store vector_store = FaissVectorStore.from_persist_dir(indices_dir) # Створюємо контекст зберігання storage_context = StorageContext.from_defaults( vector_store=vector_store, persist_dir=indices_dir ) # Завантажуємо індекс index = load_index_from_storage( storage_context=storage_context, index_cls=VectorStoreIndex ) # Створюємо BM25 retriever bm25_retriever = BM25Retriever.from_defaults( docstore=storage_context.docstore, similarity_top_k=10 ) # Перевіряємо наявність параметрів BM25 bm25_params_path = indices_path / "bm25" / "params.json" if bm25_params_path.exists(): try: with open(bm25_params_path, "r", encoding="utf-8") as f: bm25_params = json.load(f) if "similarity_top_k" in bm25_params: bm25_retriever.similarity_top_k = bm25_params["similarity_top_k"] except Exception as e: logger.warning(f"Не вдалося завантажити параметри BM25: {e}") logger.info(f"Індекси успішно завантажено з {indices_dir}") return index, bm25_retriever except Exception as e: logger.error(f"Помилка при завантаженні індексів: {e}") import traceback logger.error(traceback.format_exc()) # Діагностичні повідомлення logger.info(f"Файли у директорії {indices_dir}: {[f.name for f in indices_path.iterdir() if f.is_file()]}") return None, None except Exception as e: logger.error(f"Помилка при завантаженні індексів: {e}") return None, None def cleanup_old_indices(self, max_age_days=7, max_indices=20): """ Очищення застарілих індексів. Args: max_age_days (int): Максимальний вік індексів у днях max_indices (int): Максимальна кількість індексів для зберігання Returns: int: Кількість видалених директорій """ try: # Збираємо інформацію про всі директорії індексів index_dirs = [] for index_dir in self.base_indices_dir.iterdir(): if not index_dir.is_dir(): continue # Перевіряємо метадані metadata_file = index_dir / "metadata.json" if not metadata_file.exists(): continue try: with open(metadata_file, "r", encoding="utf-8") as f: metadata = json.load(f) # Отримуємо час створення created_at = metadata.get("created_at", "") index_dirs.append({ "path": str(index_dir), "created_at": created_at }) except Exception as e: logger.error(f"Помилка при перевірці метаданих {metadata_file}: {e}") # Якщо немає директорій, виходимо if not index_dirs: return 0 # Сортуємо директорії за часом створення (від найновіших до найстаріших) index_dirs.sort(key=lambda x: x["created_at"], reverse=True) # Визначаємо директорії для видалення dirs_to_delete = [] # 1. Залишаємо max_indices найновіших директорій if len(index_dirs) > max_indices: dirs_to_delete.extend(index_dirs[max_indices:]) # 2. Перевіряємо, чи є серед залишених застарілі директорії cutoff_date = (datetime.now() - timedelta(days=max_age_days)).isoformat() for index_info in index_dirs[:max_indices]: if index_info["created_at"] < cutoff_date: dirs_to_delete.append(index_info) # Видаляємо директорії deleted_count = 0 for dir_info in dirs_to_delete: try: dir_path = Path(dir_info["path"]) if dir_path.exists(): shutil.rmtree(dir_path) logger.info(f"Видалено застарілу директорію індексів: {dir_path}") deleted_count += 1 except Exception as e: logger.error(f"Помилка при видаленні директорії {dir_info['path']}: {e}") return deleted_count except Exception as e: logger.error(f"Помилка при очищенні застарілих індексів: {e}") return 0