import pandas as pd import numpy as np from datetime import datetime, timedelta import logging from modules.data_management.data_manager import safe_strftime logger = logging.getLogger(__name__) class JiraDataAnalyzer: """ Клас для аналізу даних Jira """ def __init__(self, df): """ Ініціалізація аналізатора даних. Args: df (pandas.DataFrame): DataFrame з даними Jira """ self.df = df def _get_column_counts(self, column_name, limit=None): """ Допоміжний метод для отримання частот значень колонки. Args: column_name (str): Назва колонки limit (int, optional): Обмеження кількості результатів Returns: dict: Словник з частотами або порожній словник """ if column_name not in self.df.columns: return {} counts = self.df[column_name].value_counts() if limit: counts = counts.head(limit) return counts.to_dict() def _check_datetime_column(self, column_name): """ Перевірка наявності та коректності колонки з датами. Args: column_name (str): Назва колонки Returns: bool: True якщо колонка існує і містить дати, False інакше """ return (column_name in self.df.columns and pd.api.types.is_datetime64_dtype(self.df[column_name])) def generate_basic_statistics(self): """ Генерація базової статистики по даним Jira. Returns: dict: Словник з базовою статистикою """ try: stats = { 'total_tickets': len(self.df), 'status_counts': self._get_column_counts('Status'), 'type_counts': self._get_column_counts('Issue Type'), 'priority_counts': self._get_column_counts('Priority'), 'assignee_counts': self._get_column_counts('Assignee', limit=10), 'created_stats': {}, 'updated_stats': {} } # Статистика за часом створення if self._check_datetime_column('Created'): created_min = self.df['Created'].min() created_max = self.df['Created'].max() # Групування за місяцями if 'Created_Month' in self.df.columns: created_by_month = self.df['Created_Month'].value_counts().sort_index() stats['created_by_month'] = {str(k): v for k, v in created_by_month.items()} stats['created_stats'] = { 'min': safe_strftime(created_min, "%Y-%m-%d"), 'max': safe_strftime(created_max, "%Y-%m-%d"), 'last_7_days': len(self.df[self.df['Created'] > (datetime.now() - timedelta(days=7))]) } # Статистика за часом оновлення if self._check_datetime_column('Updated'): updated_min = self.df['Updated'].min() updated_max = self.df['Updated'].max() stats['updated_stats'] = { 'min': safe_strftime(updated_min, "%Y-%m-%d"), 'max': safe_strftime(updated_max, "%Y-%m-%d"), 'last_7_days': len(self.df[self.df['Updated'] > (datetime.now() - timedelta(days=7))]) } logger.info("Базова статистика успішно згенерована") return stats except Exception as e: logger.error(f"Помилка при генерації базової статистики: {e}") return {'error': str(e)} def analyze_inactive_issues(self, days=14): """ Аналіз неактивних тікетів (не оновлювались протягом певної кількості днів). Args: days (int): Кількість днів неактивності Returns: dict: Інформація про неактивні тікети """ try: if not self._check_datetime_column('Updated'): logger.warning("Колонка 'Updated' відсутня або не містить дат") return {'error': "Неможливо аналізувати неактивні тікети"} # Визначення неактивних тікетів cutoff_date = datetime.now() - timedelta(days=days) inactive_issues = self.df[self.df['Updated'] < cutoff_date] inactive_data = { 'total_count': len(inactive_issues), 'percentage': round(len(inactive_issues) / len(self.df) * 100, 2) if len(self.df) > 0 else 0, 'by_status': {}, 'by_priority': {}, 'top_inactive': [] } # Розподіл за статусами та пріоритетами if len(inactive_issues) > 0: inactive_data['by_status'] = inactive_issues['Status'].value_counts().to_dict() if 'Status' in inactive_issues.columns else {} inactive_data['by_priority'] = inactive_issues['Priority'].value_counts().to_dict() if 'Priority' in inactive_issues.columns else {} # Топ 5 неактивних тікетів top_inactive = inactive_issues.sort_values('Updated', ascending=True).head(5) for _, row in top_inactive.iterrows(): issue_data = { 'key': row.get('Issue key', 'Unknown'), 'summary': row.get('Summary', 'Unknown'), 'status': row.get('Status', 'Unknown'), 'last_updated': safe_strftime(row['Updated'], '%Y-%m-%d'), 'days_inactive': (datetime.now() - row['Updated']).days if pd.notna(row['Updated']) else 'Unknown' } inactive_data['top_inactive'].append(issue_data) logger.info(f"Знайдено {len(inactive_issues)} неактивних тікетів (>{days} днів)") return inactive_data except Exception as e: logger.error(f"Помилка при аналізі неактивних тікетів: {e}") return {'error': str(e)} def analyze_timeline(self): """ Аналіз часової шкали проекту (зміна стану тікетів з часом). Returns: pandas.DataFrame: Дані для візуалізації або None у випадку помилки """ try: if not self._check_datetime_column('Created') or not self._check_datetime_column('Updated'): logger.warning("Відсутні необхідні колонки з датами для аналізу часової шкали") return None # Визначення часового діапазону min_date = self.df['Created'].min().date() max_date = self.df['Updated'].max().date() # Створення часового ряду для кожного дня date_range = pd.date_range(start=min_date, end=max_date, freq='D') timeline_data = [] for date in date_range: current_date = date.date() date_str = safe_strftime(date, '%Y-%m-%d') # Тікети, створені до цієї дати created_until = self.df[self.df['Created'].dt.date <= current_date] # Статуси тікетів на цю дату status_counts = {} # Для кожного тікета визначаємо його статус на цю дату for _, row in created_until.iterrows(): if pd.notna(row['Updated']) and row['Updated'].date() >= current_date: status = row.get('Status', 'Unknown') status_counts[status] = status_counts.get(status, 0) + 1 # Додаємо запис для цієї дати timeline_data.append({ 'Date': date_str, 'Total': len(created_until), **status_counts }) # Створення DataFrame timeline_df = pd.DataFrame(timeline_data) logger.info("Часова шкала успішно проаналізована") return timeline_df except Exception as e: logger.error(f"Помилка при аналізі часової шкали: {e}") return None def analyze_lead_time(self): """ Аналіз часу виконання тікетів (Lead Time). Returns: dict: Статистика по часу виконання """ try: if not self._check_datetime_column('Created'): logger.warning("Колонка 'Created' відсутня або не містить дат") return {'error': "Неможливо аналізувати час виконання"} if 'Resolved' not in self.df.columns: logger.warning("Колонка 'Resolved' відсутня") return {'error': "Неможливо аналізувати час виконання"} # Конвертація колонки Resolved до datetime, якщо потрібно resolved_column = self.df['Resolved'] if not pd.api.types.is_datetime64_dtype(resolved_column): resolved_column = pd.to_datetime(resolved_column, errors='coerce') # Фільтрація завершених тікетів df_with_resolved = self.df.copy() df_with_resolved['Resolved'] = resolved_column completed_issues = df_with_resolved.dropna(subset=['Resolved']) if len(completed_issues) == 0: logger.warning("Немає завершених тікетів для аналізу") return {'total_count': 0} # Обчислення Lead Time (в днях) completed_issues['Lead_Time_Days'] = (completed_issues['Resolved'] - completed_issues['Created']).dt.days # Фільтрація некоректних значень valid_lead_time = completed_issues[completed_issues['Lead_Time_Days'] >= 0] # Якщо немає валідних записів після фільтрації if len(valid_lead_time) == 0: logger.warning("Немає валідних записів для аналізу Lead Time") return {'total_count': 0, 'error': "Немає валідних записів для аналізу Lead Time"} lead_time_stats = { 'total_count': len(valid_lead_time), 'avg_lead_time': round(valid_lead_time['Lead_Time_Days'].mean(), 2), 'median_lead_time': round(valid_lead_time['Lead_Time_Days'].median(), 2), 'min_lead_time': valid_lead_time['Lead_Time_Days'].min(), 'max_lead_time': valid_lead_time['Lead_Time_Days'].max(), 'by_type': {}, 'by_priority': {} } # Розподіл за типами і пріоритетами if 'Issue Type' in valid_lead_time.columns: lead_time_by_type = valid_lead_time.groupby('Issue Type')['Lead_Time_Days'].mean().round(2) lead_time_stats['by_type'] = lead_time_by_type.to_dict() if 'Priority' in valid_lead_time.columns: lead_time_by_priority = valid_lead_time.groupby('Priority')['Lead_Time_Days'].mean().round(2) lead_time_stats['by_priority'] = lead_time_by_priority.to_dict() logger.info("Час виконання успішно проаналізований") return lead_time_stats except Exception as e: logger.error(f"Помилка при аналізі часу виконання: {e}") return {'error': str(e)}