import os import shutil import logging import pandas as pd import hashlib from pathlib import Path from datetime import datetime from modules.data_management.session_manager import SessionManager logger = logging.getLogger(__name__) class DataManager: """ Менеджер даних для роботи з файлами CSV та їх обробки. """ def __init__(self, current_data_dir="current_data", session_manager=None): """ Ініціалізація менеджера даних. Args: current_data_dir (str): Директорія з локальними файлами даних session_manager (SessionManager, optional): Менеджер сесій або None для створення нового """ self.current_data_dir = Path(current_data_dir) self.current_data_dir.mkdir(exist_ok=True, parents=True) # Ініціалізація менеджера сесій self.session_manager = session_manager or SessionManager() def get_local_files(self): """ Отримання списку локальних CSV-файлів. Returns: list: Список словників з інформацією про файли """ files_info = [] if not self.current_data_dir.exists(): logger.warning(f"Директорія {self.current_data_dir} не існує") return files_info for file_path in self.current_data_dir.glob("*.csv"): try: # Отримуємо базову інформацію про файл stat = file_path.stat() size_kb = stat.st_size / 1024 modified = datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S') # Спроба зчитати перші рядки для отримання інформації про структуру try: df_preview = pd.read_csv(file_path, nrows=5) rows_preview = len(df_preview) columns_preview = len(df_preview.columns) columns_list = df_preview.columns.tolist() except Exception as e: logger.warning(f"Не вдалося прочитати файл {file_path}: {e}") rows_preview = "?" columns_preview = "?" columns_list = [] # Формуємо інформацію про файл files_info.append({ "path": str(file_path), "name": file_path.name, "size_kb": round(size_kb, 2), "modified": modified, "rows_preview": rows_preview, "columns_preview": columns_preview, "columns_list": columns_list }) except Exception as e: logger.error(f"Помилка при обробці файлу {file_path}: {e}") # Сортуємо за часом модифікації (від найновіших до найстаріших) files_info.sort(key=lambda x: x["modified"], reverse=True) return files_info def validate_csv_file(self, file_path): """ Перевірка валідності CSV-файлу. Args: file_path (str): Шлях до файлу Returns: tuple: (is_valid, info_dict) is_valid - True, якщо файл валідний info_dict - словник з інформацією про файл """ if not Path(file_path).exists(): return False, {"error": f"Файл не знайдено: {file_path}"} try: # Отримуємо інформацію про файл file_stat = Path(file_path).stat() size_kb = file_stat.st_size / 1024 if size_kb == 0: return False, {"error": "Файл порожній"} # Спроба зчитати файл df = pd.read_csv(file_path) # Перевірка наявності очікуваних колонок required_columns = ['Summary', 'Issue key', 'Status'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: return False, { "error": f"Відсутні необхідні колонки: {', '.join(missing_columns)}", "rows": len(df), "columns": len(df.columns), "columns_list": df.columns.tolist() } # Формуємо інформацію про файл info = { "rows": len(df), "columns": len(df.columns), "columns_list": df.columns.tolist(), "size_kb": round(size_kb, 2), "first_rows": df.head(5).to_dict('records') } return True, info except Exception as e: logger.error(f"Помилка при валідації CSV-файлу {file_path}: {e}") return False, {"error": f"Помилка при читанні файлу: {str(e)}"} def copy_files_to_session(self, session_id, file_paths_list): """ Копіювання вибраних файлів до сесії користувача. Args: session_id (str): Ідентифікатор сесії file_paths_list (list): Список шляхів до файлів для копіювання Returns: list: Список скопійованих файлів у сесії """ session_data_dir = self.session_manager.get_session_data_dir(session_id) if not session_data_dir: logger.error(f"Не вдалося отримати директорію даних для сесії {session_id}") return [] copied_files = [] for file_path in file_paths_list: try: source_path = Path(file_path) if not source_path.exists(): logger.warning(f"Файл не знайдено: {file_path}") continue # Створюємо унікальне ім'я файлу в сесії timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') dest_filename = f"local_{timestamp}_{source_path.name}" dest_path = session_data_dir / dest_filename # Копіюємо файл shutil.copyfile(source_path, dest_path) # Додаємо інформацію про файл до сесії if self.session_manager.add_data_file( session_id, str(dest_path), file_type="local", description=f"Local file: {source_path.name}" ): copied_files.append(str(dest_path)) logger.info(f"Файл {source_path.name} скопійовано до сесії {session_id}") except Exception as e: logger.error(f"Помилка при копіюванні файлу {file_path} до сесії {session_id}: {e}") return copied_files def merge_dataframes(self, session_id, dataframes, output_name=None): """ Об'єднання кількох DataFrame та збереження результату в сесії. Args: session_id (str): Ідентифікатор сесії dataframes (list): Список DataFrame для об'єднання output_name (str, optional): Ім'я файлу для збереження результату Returns: tuple: (merged_df, output_path) - об'єднаний DataFrame та шлях до збереженого файлу """ if not dataframes: logger.warning("Немає даних для об'єднання") return None, None try: # Якщо є тільки один DataFrame, використовуємо його як базовий if len(dataframes) == 1: merged_df = dataframes[0].copy() else: # Об'єднуємо всі DataFrame по рядках з ігноруванням індексів merged_df = pd.concat(dataframes, ignore_index=True) # Видаляємо дублікати за ключовими колонками if 'Issue key' in merged_df.columns: merged_df.drop_duplicates(subset=['Issue key'], keep='first', inplace=True) # Зберігаємо результат output_path = self.session_manager.save_merged_data(session_id, merged_df, output_name) return merged_df, output_path except Exception as e: logger.error(f"Помилка при об'єднанні даних: {e}") return None, None def load_data_from_files(self, session_id, file_paths_list): """ Завантаження даних з файлів у DataFrame. Args: session_id (str): Ідентифікатор сесії file_paths_list (list): Список шляхів до файлів для завантаження Returns: list: Список кортежів (file_path, dataframe, success) """ results = [] for file_path in file_paths_list: try: # Перевіряємо, чи існує файл if not Path(file_path).exists(): logger.warning(f"Файл не знайдено: {file_path}") results.append((file_path, None, False)) continue # Завантажуємо файл df = pd.read_csv(file_path) # Обробка дат for date_col in ['Created', 'Updated', 'Resolved', 'Due Date']: if date_col in df.columns: df[date_col] = pd.to_datetime(df[date_col], format='%Y-%m-%dT%H:%M:%S', errors='coerce') # Підготовка додаткових колонок для аналізу if 'Created' in df.columns and pd.api.types.is_datetime64_dtype(df[date_col]): df['Created_Date'] = df['Created'].dt.date df['Created_Month'] = df['Created'].dt.to_period('M') if 'Updated' in df.columns and pd.api.types.is_datetime64_dtype(df[date_col]): df['Updated_Date'] = df['Updated'].dt.date df['Days_Since_Update'] = (datetime.now() - df['Updated']).dt.days results.append((file_path, df, True)) logger.info(f"Успішно завантажено файл {file_path}, {len(df)} рядків") except Exception as e: logger.error(f"Помилка при завантаженні файлу {file_path}: {e}") results.append((file_path, None, False)) return results def initialize_session_data(self, session_id, local_files, uploaded_file=None): """ Ініціалізація даних сесії з локальних та завантажених файлів. Args: session_id (str): Ідентифікатор сесії local_files (list): Список шляхів до локальних файлів uploaded_file (str, optional): Шлях до завантаженого файлу Returns: tuple: (success, result_info) - успішність операції та інформація про результат """ try: # Копіюємо локальні файли до сесії copied_files = self.copy_files_to_session(session_id, local_files) # Додаємо завантажений файл, якщо він є if uploaded_file and Path(uploaded_file).exists(): # Копіюємо файл до сесії session_data_dir = self.session_manager.get_session_data_dir(session_id) if not session_data_dir: return False, {"error": "Не вдалося отримати директорію даних сесії"} # Створюємо унікальне ім'я для завантаженого файлу timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') dest_filename = f"uploaded_{timestamp}_{Path(uploaded_file).name}" dest_path = session_data_dir / dest_filename # Копіюємо файл shutil.copyfile(uploaded_file, dest_path) # Додаємо інформацію про файл до сесії self.session_manager.add_data_file( session_id, str(dest_path), file_type="uploaded", description=f"Uploaded file: {Path(uploaded_file).name}" ) copied_files.append(str(dest_path)) # Якщо немає файлів для обробки, повертаємо помилку if not copied_files: return False, {"error": "Не вибрано жодного файлу для обробки"} # Завантажуємо дані з усіх файлів loaded_data = self.load_data_from_files(session_id, copied_files) # Фільтруємо тільки успішно завантажені файли valid_data = [(path, df) for path, df, success in loaded_data if success and df is not None] if not valid_data: return False, {"error": "Не вдалося завантажити жодного файлу"} # Отримуємо список DataFrame dataframes = [df for _, df in valid_data] # Об'єднуємо дані merged_df, output_path = self.merge_dataframes( session_id, dataframes, output_name=f"merged_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" ) if merged_df is None or not output_path: return False, {"error": "Не вдалося об'єднати дані"} result_info = { "merged_file": output_path, "rows_count": len(merged_df), "columns_count": len(merged_df.columns), "source_files_count": len(valid_data), "merged_df": merged_df # Передаємо DataFrame для подальшого використання } return True, result_info except Exception as e: logger.error(f"Помилка при ініціалізації даних сесії {session_id}: {e}") return False, {"error": f"Помилка при ініціалізації даних: {str(e)}"} def get_file_preview(self, file_path, max_rows=10): """ Отримання попереднього перегляду файлу CSV. Args: file_path (str): Шлях до файлу max_rows (int): Максимальна кількість рядків для перегляду Returns: dict: Словник з інформацією про файл та його вмістом """ try: if not Path(file_path).exists(): return {"error": f"Файл не знайдено: {file_path}"} # Зчитуємо перші max_rows рядків df = pd.read_csv(file_path, nrows=max_rows) # Отримуємо інформацію про файл file_stat = Path(file_path).stat() size_kb = file_stat.st_size / 1024 modified = datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S') # Підраховуємо загальну кількість рядків (обережно з великими файлами) total_rows = sum(1 for _ in open(file_path, 'r')) - 1 # -1 для заголовка # Формуємо результат result = { "filename": Path(file_path).name, "path": file_path, "size_kb": round(size_kb, 2), "modified": modified, "total_rows": total_rows, "columns": df.columns.tolist(), "columns_count": len(df.columns), "preview_rows": df.to_dict('records') } return result except Exception as e: logger.error(f"Помилка при отриманні попереднього перегляду файлу {file_path}: {e}") return {"error": f"Помилка при читанні файлу: {str(e)}"} def cleanup_temp_data(self): """ Очищення тимчасових даних, крім файлів у папці current_data. Returns: dict: Інформація про результати очищення """ try: import shutil import os from pathlib import Path cleanup_stats = { "temp_files_removed": 0, "session_dirs_removed": 0, "indices_dirs_removed": 0, "reports_removed": 0, "temp_directories": [] } # Очищення тимчасових індексів indices_dir = Path("temp/indices") if indices_dir.exists(): for item in indices_dir.iterdir(): if item.is_dir(): try: shutil.rmtree(item) cleanup_stats["indices_dirs_removed"] += 1 except Exception as e: logger.error(f"Помилка при видаленні директорії індексів {item}: {e}") # Очищення тимчасових сесій sessions_dir = Path("temp/sessions") if sessions_dir.exists(): for item in sessions_dir.iterdir(): if item.is_dir(): try: shutil.rmtree(item) cleanup_stats["session_dirs_removed"] += 1 except Exception as e: logger.error(f"Помилка при видаленні директорії сесій {item}: {e}") # Очищення інших файлів у temp temp_dir = Path("temp") if temp_dir.exists(): for item in temp_dir.iterdir(): if item.is_file(): try: item.unlink() cleanup_stats["temp_files_removed"] += 1 except Exception as e: logger.error(f"Помилка при видаленні файлу {item}: {e}") # Очищення тимчасових звітів reports_dir = Path("reports") if reports_dir.exists(): reports_count = 0 # Видаляємо файли у головній директорії reports for item in reports_dir.iterdir(): if item.is_file(): try: item.unlink() reports_count += 1 except Exception as e: logger.error(f"Помилка при видаленні звіту {item}: {e}") # Перевіряємо і очищаємо підпапку візуалізацій viz_dir = reports_dir / "visualizations" if viz_dir.exists(): for item in viz_dir.iterdir(): if item.is_file(): try: item.unlink() reports_count += 1 except Exception as e: logger.error(f"Помилка при видаленні візуалізації {item}: {e}") cleanup_stats["reports_removed"] = reports_count # Запам'ятовуємо всі очищені директорії cleanup_stats["temp_directories"] = ["temp/indices", "temp/sessions", "reports", "temp"] # Створюємо наново всі необхідні директорії for directory in ["temp", "temp/indices", "temp/sessions", "reports", "reports/visualizations"]: Path(directory).mkdir(exist_ok=True, parents=True) logger.info(f"Тимчасові дані успішно очищено: {cleanup_stats}") return { "success": True, "stats": cleanup_stats } except Exception as e: logger.error(f"Помилка при очищенні тимчасових даних: {e}") return { "success": False, "error": str(e) } # Додано функцію в модуль для обробка дат def safe_strftime(date_value, format_str="%Y-%m-%d"): """Безпечне форматування дати з обробкою None та NaT значень.""" import pandas as pd if date_value is None or pd.isna(date_value): return "Н/Д" # або будь-яке інше значення за замовчуванням try: return date_value.strftime(format_str) except Exception: return "Неправильна дата"