DocUA's picture
Єдиний коміт - очищення історії
4ad5efa
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
from modules.data_management.data_manager import safe_strftime
logger = logging.getLogger(__name__)
class JiraDataAnalyzer:
"""
Клас для аналізу даних Jira
"""
def __init__(self, df):
"""
Ініціалізація аналізатора даних.
Args:
df (pandas.DataFrame): DataFrame з даними Jira
"""
self.df = df
def _get_column_counts(self, column_name, limit=None):
"""
Допоміжний метод для отримання частот значень колонки.
Args:
column_name (str): Назва колонки
limit (int, optional): Обмеження кількості результатів
Returns:
dict: Словник з частотами або порожній словник
"""
if column_name not in self.df.columns:
return {}
counts = self.df[column_name].value_counts()
if limit:
counts = counts.head(limit)
return counts.to_dict()
def _check_datetime_column(self, column_name):
"""
Перевірка наявності та коректності колонки з датами.
Args:
column_name (str): Назва колонки
Returns:
bool: True якщо колонка існує і містить дати, False інакше
"""
return (column_name in self.df.columns and
pd.api.types.is_datetime64_dtype(self.df[column_name]))
def generate_basic_statistics(self):
"""
Генерація базової статистики по даним Jira.
Returns:
dict: Словник з базовою статистикою
"""
try:
stats = {
'total_tickets': len(self.df),
'status_counts': self._get_column_counts('Status'),
'type_counts': self._get_column_counts('Issue Type'),
'priority_counts': self._get_column_counts('Priority'),
'assignee_counts': self._get_column_counts('Assignee', limit=10),
'created_stats': {},
'updated_stats': {}
}
# Статистика за часом створення
if self._check_datetime_column('Created'):
created_min = self.df['Created'].min()
created_max = self.df['Created'].max()
# Групування за місяцями
if 'Created_Month' in self.df.columns:
created_by_month = self.df['Created_Month'].value_counts().sort_index()
stats['created_by_month'] = {str(k): v for k, v in created_by_month.items()}
stats['created_stats'] = {
'min': safe_strftime(created_min, "%Y-%m-%d"),
'max': safe_strftime(created_max, "%Y-%m-%d"),
'last_7_days': len(self.df[self.df['Created'] > (datetime.now() - timedelta(days=7))])
}
# Статистика за часом оновлення
if self._check_datetime_column('Updated'):
updated_min = self.df['Updated'].min()
updated_max = self.df['Updated'].max()
stats['updated_stats'] = {
'min': safe_strftime(updated_min, "%Y-%m-%d"),
'max': safe_strftime(updated_max, "%Y-%m-%d"),
'last_7_days': len(self.df[self.df['Updated'] > (datetime.now() - timedelta(days=7))])
}
logger.info("Базова статистика успішно згенерована")
return stats
except Exception as e:
logger.error(f"Помилка при генерації базової статистики: {e}")
return {'error': str(e)}
def analyze_inactive_issues(self, days=14):
"""
Аналіз неактивних тікетів (не оновлювались протягом певної кількості днів).
Args:
days (int): Кількість днів неактивності
Returns:
dict: Інформація про неактивні тікети
"""
try:
if not self._check_datetime_column('Updated'):
logger.warning("Колонка 'Updated' відсутня або не містить дат")
return {'error': "Неможливо аналізувати неактивні тікети"}
# Визначення неактивних тікетів
cutoff_date = datetime.now() - timedelta(days=days)
inactive_issues = self.df[self.df['Updated'] < cutoff_date]
inactive_data = {
'total_count': len(inactive_issues),
'percentage': round(len(inactive_issues) / len(self.df) * 100, 2) if len(self.df) > 0 else 0,
'by_status': {},
'by_priority': {},
'top_inactive': []
}
# Розподіл за статусами та пріоритетами
if len(inactive_issues) > 0:
inactive_data['by_status'] = inactive_issues['Status'].value_counts().to_dict() if 'Status' in inactive_issues.columns else {}
inactive_data['by_priority'] = inactive_issues['Priority'].value_counts().to_dict() if 'Priority' in inactive_issues.columns else {}
# Топ 5 неактивних тікетів
top_inactive = inactive_issues.sort_values('Updated', ascending=True).head(5)
for _, row in top_inactive.iterrows():
issue_data = {
'key': row.get('Issue key', 'Unknown'),
'summary': row.get('Summary', 'Unknown'),
'status': row.get('Status', 'Unknown'),
'last_updated': safe_strftime(row['Updated'], '%Y-%m-%d'),
'days_inactive': (datetime.now() - row['Updated']).days if pd.notna(row['Updated']) else 'Unknown'
}
inactive_data['top_inactive'].append(issue_data)
logger.info(f"Знайдено {len(inactive_issues)} неактивних тікетів (>{days} днів)")
return inactive_data
except Exception as e:
logger.error(f"Помилка при аналізі неактивних тікетів: {e}")
return {'error': str(e)}
def analyze_timeline(self):
"""
Аналіз часової шкали проекту (зміна стану тікетів з часом).
Returns:
pandas.DataFrame: Дані для візуалізації або None у випадку помилки
"""
try:
if not self._check_datetime_column('Created') or not self._check_datetime_column('Updated'):
logger.warning("Відсутні необхідні колонки з датами для аналізу часової шкали")
return None
# Визначення часового діапазону
min_date = self.df['Created'].min().date()
max_date = self.df['Updated'].max().date()
# Створення часового ряду для кожного дня
date_range = pd.date_range(start=min_date, end=max_date, freq='D')
timeline_data = []
for date in date_range:
current_date = date.date()
date_str = safe_strftime(date, '%Y-%m-%d')
# Тікети, створені до цієї дати
created_until = self.df[self.df['Created'].dt.date <= current_date]
# Статуси тікетів на цю дату
status_counts = {}
# Для кожного тікета визначаємо його статус на цю дату
for _, row in created_until.iterrows():
if pd.notna(row['Updated']) and row['Updated'].date() >= current_date:
status = row.get('Status', 'Unknown')
status_counts[status] = status_counts.get(status, 0) + 1
# Додаємо запис для цієї дати
timeline_data.append({
'Date': date_str,
'Total': len(created_until),
**status_counts
})
# Створення DataFrame
timeline_df = pd.DataFrame(timeline_data)
logger.info("Часова шкала успішно проаналізована")
return timeline_df
except Exception as e:
logger.error(f"Помилка при аналізі часової шкали: {e}")
return None
def analyze_lead_time(self):
"""
Аналіз часу виконання тікетів (Lead Time).
Returns:
dict: Статистика по часу виконання
"""
try:
if not self._check_datetime_column('Created'):
logger.warning("Колонка 'Created' відсутня або не містить дат")
return {'error': "Неможливо аналізувати час виконання"}
if 'Resolved' not in self.df.columns:
logger.warning("Колонка 'Resolved' відсутня")
return {'error': "Неможливо аналізувати час виконання"}
# Конвертація колонки Resolved до datetime, якщо потрібно
resolved_column = self.df['Resolved']
if not pd.api.types.is_datetime64_dtype(resolved_column):
resolved_column = pd.to_datetime(resolved_column, errors='coerce')
# Фільтрація завершених тікетів
df_with_resolved = self.df.copy()
df_with_resolved['Resolved'] = resolved_column
completed_issues = df_with_resolved.dropna(subset=['Resolved'])
if len(completed_issues) == 0:
logger.warning("Немає завершених тікетів для аналізу")
return {'total_count': 0}
# Обчислення Lead Time (в днях)
completed_issues['Lead_Time_Days'] = (completed_issues['Resolved'] - completed_issues['Created']).dt.days
# Фільтрація некоректних значень
valid_lead_time = completed_issues[completed_issues['Lead_Time_Days'] >= 0]
# Якщо немає валідних записів після фільтрації
if len(valid_lead_time) == 0:
logger.warning("Немає валідних записів для аналізу Lead Time")
return {'total_count': 0, 'error': "Немає валідних записів для аналізу Lead Time"}
lead_time_stats = {
'total_count': len(valid_lead_time),
'avg_lead_time': round(valid_lead_time['Lead_Time_Days'].mean(), 2),
'median_lead_time': round(valid_lead_time['Lead_Time_Days'].median(), 2),
'min_lead_time': valid_lead_time['Lead_Time_Days'].min(),
'max_lead_time': valid_lead_time['Lead_Time_Days'].max(),
'by_type': {},
'by_priority': {}
}
# Розподіл за типами і пріоритетами
if 'Issue Type' in valid_lead_time.columns:
lead_time_by_type = valid_lead_time.groupby('Issue Type')['Lead_Time_Days'].mean().round(2)
lead_time_stats['by_type'] = lead_time_by_type.to_dict()
if 'Priority' in valid_lead_time.columns:
lead_time_by_priority = valid_lead_time.groupby('Priority')['Lead_Time_Days'].mean().round(2)
lead_time_stats['by_priority'] = lead_time_by_priority.to_dict()
logger.info("Час виконання успішно проаналізований")
return lead_time_stats
except Exception as e:
logger.error(f"Помилка при аналізі часу виконання: {e}")
return {'error': str(e)}