import pandas as pd from datetime import datetime import logging import os from pathlib import Path import io import hashlib logger = logging.getLogger(__name__) class JiraCsvImporter: """ Клас для імпорту даних з CSV-файлів Jira """ def __init__(self, file_path): """ Ініціалізація імпортера CSV. Args: file_path (str): Шлях до CSV-файлу """ self.file_path = file_path self.df = None self.file_hash = None def load_data(self): """ Завантаження даних з CSV-файлу. Returns: pandas.DataFrame: Завантажені дані або None у випадку помилки """ try: logger.info(f"Завантаження CSV-файлу: {self.file_path}") print(f"Завантаження CSV-файлу: {self.file_path}") # Додаткове логування в консоль # Перевірка існування файлу if not os.path.exists(self.file_path): logger.error(f"Файл не знайдено: {self.file_path}") print(f"Файл не знайдено: {self.file_path}") return None # Перевірка розміру файлу file_size = os.path.getsize(self.file_path) logger.info(f"Розмір файлу: {file_size} байт") if file_size == 0: logger.error("Файл порожній") return None # Генеруємо хеш файлу для відстеження змін self.file_hash = self._generate_file_hash() if self.file_hash: logger.info(f"Згенеровано хеш CSV файлу: {self.file_hash}") # Додаткове логування дозволів на файл try: import stat st = os.stat(self.file_path) permissions = stat.filemode(st.st_mode) logger.info(f"Дозволи файлу: {permissions}") except Exception as e: logger.warning(f"Не вдалося отримати дозволи файлу: {e}") # Спробуємо різні методи зчитування файлу success = False # Метод 1: Стандартний pandas.read_csv try: self.df = pd.read_csv(self.file_path) logger.info("Метод 1 (стандартний read_csv) успішний") success = True except Exception as e1: logger.warning(f"Помилка методу 1: {e1}") # Метод 2: Явно вказуємо кодування if not success: try: self.df = pd.read_csv(self.file_path, encoding='utf-8') logger.info("Метод 2 (utf-8) успішний") success = True except Exception as e2: logger.warning(f"Помилка методу 2: {e2}") # Метод 3: Альтернативне кодування if not success: try: self.df = pd.read_csv(self.file_path, encoding='latin1') logger.info("Метод 3 (latin1) успішний") success = True except Exception as e3: logger.warning(f"Помилка методу 3: {e3}") # Метод 4: Читаємо вміст файлу та використовуємо StringIO if not success: try: with open(self.file_path, 'rb') as f: content = f.read() self.df = pd.read_csv(io.StringIO(content.decode('utf-8', errors='replace'))) logger.info("Метод 4 (StringIO з utf-8 і errors='replace') успішний") success = True except Exception as e4: logger.warning(f"Помилка методу 4: {e4}") # Метод 5: Спроба з latin1 і StringIO if not success: try: with open(self.file_path, 'rb') as f: content = f.read() self.df = pd.read_csv(io.StringIO(content.decode('latin1', errors='replace'))) logger.info("Метод 5 (StringIO з latin1 і errors='replace') успішний") success = True except Exception as e5: logger.warning(f"Помилка методу 5: {e5}") if not success: logger.error("Всі методи зчитування файлу невдалі") return None # Відображення наявних колонок для діагностики print(f"Наявні колонки: {self.df.columns.tolist()}") print(f"Кількість рядків: {len(self.df)}") logger.info(f"Наявні колонки: {self.df.columns.tolist()}") logger.info(f"Кількість рядків: {len(self.df)}") # Обробка дат self._process_dates() # Очищення та підготовка даних self._clean_data() # Перевіряємо наявність індексів для цього CSV if self.file_hash: # Перевіряємо та оновлюємо метадані файлу self._check_indices_metadata() logger.info(f"Успішно завантажено {len(self.df)} записів") print(f"Успішно завантажено {len(self.df)} записів") return self.df except Exception as e: logger.error(f"Помилка при завантаженні CSV-файлу: {e}") import traceback error_details = traceback.format_exc() print(f"Помилка при завантаженні CSV-файлу: {e}") print(f"Деталі помилки: {error_details}") logger.error(error_details) return None def _generate_file_hash(self): """ Генерує хеш для CSV файлу на основі його вмісту Returns: str: Хеш файлу або None у випадку помилки """ try: # Читаємо файл блоками для ефективного хешування великих файлів sha256 = hashlib.sha256() with open(self.file_path, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256.update(byte_block) return sha256.hexdigest() except Exception as e: logger.error(f"Помилка при генерації хешу CSV: {e}") return None def _check_indices_metadata(self): """ Перевіряє наявність індексів для поточного CSV файлу та оновлює метадані при необхідності. """ try: import json from pathlib import Path # Шлях до директорії індексів indices_dir = Path("temp/indices") if not indices_dir.exists(): return # Отримання списку піддиректорій з індексами subdirs = [d for d in indices_dir.iterdir() if d.is_dir()] if not subdirs: return # Перевіряємо кожну директорію на відповідність хешу for directory in subdirs: metadata_path = directory / "metadata.json" if metadata_path.exists(): try: with open(metadata_path, "r", encoding="utf-8") as f: metadata = json.load(f) # Якщо знайдено відповідні індекси, додаємо інформацію про колонки if "csv_hash" in metadata and metadata["csv_hash"] == self.file_hash: # Оновлюємо інформацію про колонки metadata["columns"] = self.df.columns.tolist() metadata["rows_count"] = len(self.df) metadata["last_used"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Оновлюємо файл метаданих with open(metadata_path, "w", encoding="utf-8") as f: json.dump(metadata, f, ensure_ascii=False, indent=2) logger.info(f"Оновлено метадані для індексів: {directory}") break except Exception as md_err: logger.warning(f"Помилка при перевірці метаданих {metadata_path}: {md_err}") except Exception as e: logger.warning(f"Помилка при перевірці метаданих індексів: {e}") def _check_required_columns(self): """ Перевірка наявності необхідних колонок у CSV-файлі. Returns: bool: True, якщо всі необхідні колонки присутні """ # Основні колонки, які очікуються у файлі Jira basic_columns = ['Summary', 'Issue key', 'Status', 'Issue Type', 'Priority', 'Created', 'Updated'] # Альтернативні назви колонок alternative_columns = { 'Summary': ['Summary', 'Короткий опис'], 'Issue key': ['Issue key', 'Key', 'Ключ'], 'Status': ['Status', 'Статус'], 'Issue Type': ['Issue Type', 'Type', 'Тип'], 'Priority': ['Priority', 'Пріоритет'], 'Created': ['Created', 'Створено'], 'Updated': ['Updated', 'Оновлено'] } # Перевірка наявності колонок missing_columns = [] for col in basic_columns: found = False # Перевірка основної назви if col in self.df.columns: found = True else: # Перевірка альтернативних назв for alt_col in alternative_columns.get(col, []): if alt_col in self.df.columns: # Перейменування колонки до стандартного імені self.df.rename(columns={alt_col: col}, inplace=True) found = True break if not found: missing_columns.append(col) if missing_columns: logger.warning(f"Відсутні колонки: {', '.join(missing_columns)}") print(f"Відсутні колонки: {', '.join(missing_columns)}") return False return True def _process_dates(self): """ Обробка дат у DataFrame. """ try: # Перетворення колонок з датами date_columns = ['Created', 'Updated', 'Resolved', 'Due Date'] for col in date_columns: if col in self.df.columns: try: self.df[col] = pd.to_datetime(self.df[col], errors='coerce') print(f"Колонку {col} успішно конвертовано до datetime") except Exception as e: logger.warning(f"Не вдалося конвертувати колонку {col} до datetime: {e}") print(f"Не вдалося конвертувати колонку {col} до datetime: {e}") except Exception as e: logger.error(f"Помилка при обробці дат: {e}") print(f"Помилка при обробці дат: {e}") def _clean_data(self): """ Очищення та підготовка даних. """ try: # Видалення порожніх рядків if 'Issue key' in self.df.columns: self.df.dropna(subset=['Issue key'], inplace=True) print(f"Видалено порожні рядки за колонкою 'Issue key'") # Додаткова обробка даних if 'Status' in self.df.columns: self.df['Status'] = self.df['Status'].fillna('Unknown') print(f"Заповнено відсутні значення в колонці 'Status'") if 'Priority' in self.df.columns: self.df['Priority'] = self.df['Priority'].fillna('Not set') print(f"Заповнено відсутні значення в колонці 'Priority'") # Створення додаткових колонок для аналізу if 'Created' in self.df.columns and pd.api.types.is_datetime64_dtype(self.df['Created']): self.df['Created_Date'] = self.df['Created'].dt.date self.df['Created_Month'] = self.df['Created'].dt.to_period('M') print(f"Створено додаткові колонки для дат створення") if 'Updated' in self.df.columns and pd.api.types.is_datetime64_dtype(self.df['Updated']): self.df['Updated_Date'] = self.df['Updated'].dt.date self.df['Days_Since_Update'] = (datetime.now() - self.df['Updated']).dt.days print(f"Створено додаткові колонки для дат оновлення") except Exception as e: logger.error(f"Помилка при очищенні даних: {e}") print(f"Помилка при очищенні даних: {e}") def export_to_csv(self, output_path=None): """ Експорт оброблених даних у CSV-файл. Args: output_path (str): Шлях для збереження файлу. Якщо None, створюється автоматично. Returns: str: Шлях до збереженого файлу або None у випадку помилки """ if self.df is None: logger.error("Немає даних для експорту") return None try: if output_path is None: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_dir = Path("exported_data") output_dir.mkdir(exist_ok=True) output_path = output_dir / f"jira_data_{timestamp}.csv" self.df.to_csv(output_path, index=False, encoding='utf-8') logger.info(f"Дані успішно експортовано у {output_path}") return str(output_path) except Exception as e: logger.error(f"Помилка при експорті даних: {e}") return None