import os import logging import json import shutil from pathlib import Path import pandas as pd from datetime import datetime, timedelta import hashlib import uuid import faiss from modules.data_management.index_utils import validate_index_directory from modules.data_management.index_utils import check_indexing_availability, initialize_embedding_model from modules.data_management.hash_utils import generate_data_hash from modules.data_management.index_utils import check_index_integrity from modules.config.paths import INDICES_DIR from modules.config.ai_settings import ( CHUNK_SIZE, CHUNK_OVERLAP, EXCLUDED_EMBED_METADATA_KEYS, EXCLUDED_LLM_METADATA_KEYS ) logger = logging.getLogger(__name__) # Перевірка доступності модулів для індексування INDEXING_AVAILABLE = check_indexing_availability() INDEXING_MODULES = { "VectorStoreIndex": None, "StorageContext": None, "SimpleDocumentStore": None, "TokenTextSplitter": None, "BM25Retriever": None, "FaissVectorStore": None, "Settings": None } def _generate_data_hash(self, df): """ Генерація хешу для DataFrame для ідентифікації унікальних даних. Args: df (pandas.DataFrame): DataFrame для хешування Returns: str: Хеш даних """ # Використовуємо основні колонки для хешування key_columns = ['Issue key', 'Summary', 'Status', 'Issue Type', 'Created', 'Updated'] return generate_data_hash(df, key_columns) class IndexManager: """ Менеджер для створення та управління індексами даних (FAISS, BM25). """ def __init__(self, base_indices_dir="temp/indices"): """ Ініціалізація менеджера індексів. Args: base_indices_dir (str): Базова директорія для зберігання індексів """ self.base_indices_dir = Path(base_indices_dir) if base_indices_dir else INDICES_DIR self.base_indices_dir.mkdir(exist_ok=True, parents=True) # Перевірка доступності модулів для індексування self.indexing_available = INDEXING_AVAILABLE if not self.indexing_available: logger.warning("Функціональність індексування недоступна. Встановіть необхідні пакети.") def create_indices_for_session(self, session_id, merged_df, indices_dir=None): """ Створення індексів для даних сесії. Args: session_id (str): Ідентифікатор сесії merged_df (pandas.DataFrame): DataFrame з об'єднаними даними indices_dir (str, optional): Директорія для збереження індексів. Якщо None, використовується директорія сесії. Returns: dict: Інформація про створені індекси """ if not self.indexing_available: return {"error": "Функціональність індексування недоступна. Встановіть необхідні пакети."} try: # Визначаємо директорію для індексів indices_path = Path(indices_dir) if indices_dir else self.base_indices_dir / session_id indices_path.mkdir(exist_ok=True, parents=True) # Генеруємо хеш для даних data_hash = self._generate_data_hash(merged_df) # Перевіряємо, чи існують індекси для цих даних existing_indices = self._find_indices_by_hash(data_hash) if existing_indices: return self._reuse_existing_indices(existing_indices, indices_path, session_id, data_hash, merged_df) # Створюємо нові індекси return self._create_new_indices(indices_path, session_id, data_hash, merged_df) except Exception as e: logger.error(f"Помилка при створенні індексів: {e}") return {"error": f"Помилка при створенні індексів: {str(e)}"} def _reuse_existing_indices(self, existing_indices, indices_path, session_id, data_hash, merged_df): """ Повторне використання існуючих індексів. Args: existing_indices (str): Шлях до існуючих індексів indices_path (Path): Шлях для нових індексів session_id (str): Ідентифікатор сесії data_hash (str): Хеш даних merged_df (pandas.DataFrame): DataFrame з даними Returns: dict: Інформація про скопійовані індекси """ logger.info(f"Знайдено існуючі індекси для даних з хешем {data_hash}") try: # Спочатку очищаємо цільову директорію if indices_path.exists(): for item in indices_path.iterdir(): if item.is_file(): item.unlink() elif item.is_dir(): shutil.rmtree(item) # Копіюємо індекси for item in Path(existing_indices).iterdir(): if item.is_file(): shutil.copy2(item, indices_path) elif item.is_dir(): shutil.copytree(item, indices_path / item.name) logger.info(f"Індекси успішно скопійовано в {indices_path}") # Оновлюємо метадані metadata = { "session_id": session_id, "created_at": datetime.now().isoformat(), "data_hash": data_hash, "rows_count": len(merged_df), "columns_count": len(merged_df.columns), "copied_from": str(existing_indices) } with open(indices_path / "metadata.json", "w", encoding="utf-8") as f: json.dump(metadata, f, ensure_ascii=False, indent=2) return { "success": True, "indices_dir": str(indices_path), "data_hash": data_hash, "reused_existing": True, "source": str(existing_indices) } except Exception as copy_err: logger.error(f"Помилка при копіюванні індексів: {copy_err}") # Продовжуємо створення нових індексів return self._create_new_indices(indices_path, session_id, data_hash, merged_df) def _create_new_indices(self, indices_path, session_id, data_hash, merged_df): """ Створення нових індексів. Зберігає індекси у форматі, сумісному з jira_hybrid_chat.py. """ if not INDEXING_AVAILABLE: return {"error": "Функціональність індексування недоступна"} try: logger.info(f"Створення нових індексів для сесії {session_id}") # Імпортуємо необхідні модулі напряму from llama_index.core import VectorStoreIndex, StorageContext, Settings from llama_index.core.storage.docstore import SimpleDocumentStore from llama_index.core.node_parser import TokenTextSplitter from llama_index.retrievers.bm25 import BM25Retriever from llama_index.vector_stores.faiss import FaissVectorStore import faiss # Ініціалізуємо модель ембедингів from modules.data_management.index_utils import initialize_embedding_model embed_model = initialize_embedding_model() # Отримуємо розмірність ембедингів динамічно import numpy as np test_embedding = embed_model.get_text_embedding("Тестовий текст") embed_dim = len(test_embedding) logger.info(f"Розмірність ембедингів: {embed_dim}") # Конвертуємо DataFrame в документи documents = self._convert_dataframe_to_documents(merged_df) # Створюємо розділювач тексту text_splitter = TokenTextSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP ) # Встановлюємо формат збереження на JSON через глобальні налаштування # Це важливо для сумісності з jira_hybrid_chat.py Settings.persist_json_format = True # Створюємо FAISS індекс faiss_index = faiss.IndexFlatL2(embed_dim) # Створюємо контекст зберігання docstore = SimpleDocumentStore() vector_store = FaissVectorStore(faiss_index=faiss_index) storage_context = StorageContext.from_defaults( docstore=docstore, vector_store=vector_store ) # Встановлюємо модель ембедингів у налаштуваннях Settings.embed_model = embed_model # Створюємо індекс index = VectorStoreIndex.from_documents( documents, storage_context=storage_context, transformations=[text_splitter] ) # Зберігаємо індекс у форматі JSON (через глобальні налаштування) # НЕ передаємо json_format як аргумент index.storage_context.persist(persist_dir=str(indices_path)) # Створюємо BM25 retriever bm25_retriever = BM25Retriever.from_defaults( docstore=docstore, similarity_top_k=10 ) # Зберігаємо параметри BM25 bm25_dir = indices_path / "bm25" bm25_dir.mkdir(exist_ok=True) with open(bm25_dir / "params.json", "w", encoding="utf-8") as f: json.dump({"similarity_top_k": 10}, f) # Зберігаємо метадані metadata = { "session_id": session_id, "created_at": datetime.now().isoformat(), "data_hash": data_hash, "rows_count": len(merged_df), "columns_count": len(merged_df.columns), "embedding_model": embed_model.__class__.__name__, "embedding_dim": embed_dim, "format": "json" # Вказуємо використаний формат збереження } with open(indices_path / "metadata.json", "w", encoding="utf-8") as f: json.dump(metadata, f, ensure_ascii=False, indent=2) with open(indices_path / "indices.valid", "w", encoding="utf-8") as f: f.write(f"Indices created at {datetime.now().isoformat()}") logger.info(f"Створено файл-маркер indices.valid") return { "success": True, "indices_dir": str(indices_path), "data_hash": data_hash } except Exception as e: logger.error(f"Помилка при створенні нових індексів: {e}") return {"error": f"Помилка при створенні нових індексів: {str(e)}"} def _save_bm25_data(self, indices_path, bm25_retriever): """ Збереження даних для BM25 retriever. Args: indices_path (Path): Шлях до директорії індексів bm25_retriever (BM25Retriever): Об'єкт BM25Retriever Returns: bool: True, якщо дані успішно збережені, False у випадку помилки """ try: # Створюємо директорію для BM25 bm25_dir = indices_path / "bm25" bm25_dir.mkdir(exist_ok=True) # Зберігаємо параметри BM25 bm25_params = { "similarity_top_k": bm25_retriever.similarity_top_k, "alpha": getattr(bm25_retriever, "alpha", 0.75), "beta": getattr(bm25_retriever, "beta", 0.75), "index_creation_time": datetime.now().isoformat() } with open(bm25_dir / "params.json", "w", encoding="utf-8") as f: json.dump(bm25_params, f, ensure_ascii=False, indent=2) logger.info(f"Дані BM25 збережено в {bm25_dir}") return True except Exception as e: logger.error(f"Помилка при збереженні даних BM25: {e}") return False def _convert_dataframe_to_documents(self, df): """ Конвертує DataFrame в документи для індексування. Args: df (pandas.DataFrame): DataFrame для конвертації Returns: list: Список документів """ try: # Імпортуємо Document напряму from llama_index.core import Document documents = [] # Перебираємо рядки DataFrame for idx, row in df.iterrows(): # Створюємо текст документа text = f"Issue Key: {row.get('Issue key', '')}\n" text += f"Summary: {row.get('Summary', '')}\n" text += f"Status: {row.get('Status', '')}\n" text += f"Issue Type: {row.get('Issue Type', '')}\n" # Додаємо опис, якщо він є if 'Description' in row and pd.notna(row['Description']): text += f"Description: {row['Description']}\n" # Додаємо коментарі, якщо вони є if 'Comments' in row and pd.notna(row['Comments']): text += f"Comments: {row['Comments']}\n" # Створюємо метадані metadata = { "issue_key": row.get('Issue key', ''), "summary": row.get('Summary', ''), "status": row.get('Status', ''), "issue_type": row.get('Issue Type', ''), "created": str(row.get('Created', '')), "updated": str(row.get('Updated', '')) } # Створюємо документ doc = Document( text=text, metadata=metadata ) documents.append(doc) logger.info(f"Створено {len(documents)} документів з DataFrame") return documents except Exception as e: logger.error(f"Помилка при конвертації DataFrame в документи: {e}") raise def _generate_data_hash(self, df): """ Генерація хешу для DataFrame для ідентифікації унікальних даних. Args: df (pandas.DataFrame): DataFrame для хешування Returns: str: Хеш даних """ try: # Використовуємо основні колонки для хешування key_columns = ['Issue key', 'Summary', 'Status', 'Issue Type', 'Created', 'Updated'] # Фільтруємо тільки наявні колонки available_columns = [col for col in key_columns if col in df.columns] if not available_columns: # Якщо немає жодної ключової колонки, використовуємо всі дані data_str = df.to_json() else: # Інакше використовуємо тільки ключові колонки data_str = df[available_columns].to_json() # Створюємо хеш hash_object = hashlib.sha256(data_str.encode()) data_hash = hash_object.hexdigest() return data_hash except Exception as e: logger.error(f"Помилка при генерації хешу даних: {e}") # У випадку помилки повертаємо випадковий хеш return str(uuid.uuid4()) def _find_indices_by_hash(self, data_hash): """ Пошук існуючих індексів за хешем даних. Args: data_hash (str): Хеш даних Returns: str: Шлях до директорії з індексами або None, якщо не знайдено """ try: # Перебираємо всі піддиректорії в базовій директорії індексів for index_dir in self.base_indices_dir.iterdir(): if not index_dir.is_dir(): continue # Перевіряємо метадані metadata_file = index_dir / "metadata.json" if not metadata_file.exists(): continue try: with open(metadata_file, "r", encoding="utf-8") as f: metadata = json.load(f) # Перевіряємо хеш if metadata.get("data_hash") == data_hash: # Перевіряємо наявність необхідних файлів if validate_index_directory(index_dir): logger.info(f"Знайдено існуючі індекси з відповідним хешем: {index_dir}") return str(index_dir) else: logger.warning(f"Знайдено індекси з відповідним хешем, але вони неповні: {index_dir}") except Exception as e: logger.error(f"Помилка при перевірці метаданих {metadata_file}: {e}") logger.info(f"Не знайдено існуючих індексів з хешем {data_hash}") return None except Exception as e: logger.error(f"Помилка при пошуку індексів за хешем: {e}") return None def cleanup_old_indices(self, max_age_days=7, max_indices=20): """ Очищення застарілих індексів. Args: max_age_days (int): Максимальний вік індексів у днях max_indices (int): Максимальна кількість індексів для зберігання Returns: int: Кількість видалених директорій індексів """ try: # Перевіряємо, чи існує базова директорія if not self.base_indices_dir.exists(): return 0 # Отримуємо список директорій індексів index_dirs = [] for index_dir in self.base_indices_dir.iterdir(): if not index_dir.is_dir(): continue # Перевіряємо метадані для отримання часу створення metadata_file = index_dir / "metadata.json" created_at = None if metadata_file.exists(): try: with open(metadata_file, "r", encoding="utf-8") as f: metadata = json.load(f) created_at = metadata.get("created_at") except Exception: pass # Якщо немає метаданих, використовуємо час створення директорії if not created_at: created_at = datetime.fromtimestamp(index_dir.stat().st_mtime).isoformat() # Додаємо інформацію про директорію index_dirs.append({ "path": str(index_dir), "created_at": created_at }) # Якщо немає директорій для обробки, повертаємо 0 if not index_dirs: return 0 # Сортуємо директорії за часом створення (від найновіших до найстаріших) index_dirs.sort(key=lambda x: x["created_at"], reverse=True) # Визначаємо директорії для видалення dirs_to_delete = [] # 1. Залишаємо max_indices найновіших директорій if len(index_dirs) > max_indices: dirs_to_delete.extend(index_dirs[max_indices:]) # 2. Перевіряємо, чи є серед залишених застарілі директорії cutoff_date = (datetime.now() - timedelta(days=max_age_days)).isoformat() for index_info in index_dirs[:max_indices]: if index_info["created_at"] < cutoff_date: dirs_to_delete.append(index_info) # Видаляємо директорії deleted_count = 0 for dir_info in dirs_to_delete: try: dir_path = Path(dir_info["path"]) if dir_path.exists(): shutil.rmtree(dir_path) logger.info(f"Видалено застарілу директорію індексів: {dir_path}") deleted_count += 1 except Exception as e: logger.error(f"Помилка при видаленні директорії {dir_info['path']}: {e}") return deleted_count except Exception as e: logger.error(f"Помилка при очищенні застарілих індексів: {e}") return 0 def load_indices(self, indices_dir): """ Завантаження індексів з директорії. Args: indices_dir (str): Шлях до директорії з індексами Returns: tuple: (VectorStoreIndex, BM25Retriever) або (None, None) у випадку помилки """ if not self.indexing_available: logger.warning("Функціональність індексування недоступна. Встановіть необхідні пакети.") return None, None try: # Перевіряємо цілісність індексів is_valid, message = check_index_integrity(indices_dir) if not is_valid: logger.error(f"Індекси не пройшли перевірку цілісності: {message}") return None, None indices_path = Path(indices_dir) if not indices_path.exists(): logger.error(f"Директорія індексів не існує: {indices_dir}") return None, None # Перевіряємо наявність необхідних файлів if not (indices_path / "docstore.json").exists(): logger.error(f"Директорія індексів не містить необхідних файлів: {indices_dir}") return None, None # Імпортуємо необхідні модулі StorageContext = INDEXING_MODULES.get("StorageContext") VectorStoreIndex = INDEXING_MODULES.get("VectorStoreIndex") BM25Retriever = INDEXING_MODULES.get("BM25Retriever") # Завантажуємо контекст зберігання storage_context = StorageContext.from_defaults(persist_dir=str(indices_path)) # Завантажуємо індекс index = VectorStoreIndex.from_storage_context(storage_context) # Створюємо BM25 retriever bm25_retriever = BM25Retriever.from_defaults( docstore=storage_context.docstore, similarity_top_k=10 ) # Завантажуємо параметри BM25, якщо вони є bm25_params_file = indices_path / "bm25" / "params.json" if bm25_params_file.exists(): try: with open(bm25_params_file, "r", encoding="utf-8") as f: bm25_params = json.load(f) # Встановлюємо параметри if "similarity_top_k" in bm25_params: bm25_retriever.similarity_top_k = bm25_params["similarity_top_k"] except Exception as e: logger.warning(f"Помилка при завантаженні параметрів BM25: {e}") logger.info(f"Індекси успішно завантажено з {indices_dir}") return index, bm25_retriever except Exception as e: logger.error(f"Помилка при завантаженні індексів: {e}") return None, None