Spaces:
Running
Running
File size: 13,397 Bytes
4ad5efa |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
from modules.data_management.data_manager import safe_strftime
logger = logging.getLogger(__name__)
class JiraDataAnalyzer:
"""
Клас для аналізу даних Jira
"""
def __init__(self, df):
"""
Ініціалізація аналізатора даних.
Args:
df (pandas.DataFrame): DataFrame з даними Jira
"""
self.df = df
def _get_column_counts(self, column_name, limit=None):
"""
Допоміжний метод для отримання частот значень колонки.
Args:
column_name (str): Назва колонки
limit (int, optional): Обмеження кількості результатів
Returns:
dict: Словник з частотами або порожній словник
"""
if column_name not in self.df.columns:
return {}
counts = self.df[column_name].value_counts()
if limit:
counts = counts.head(limit)
return counts.to_dict()
def _check_datetime_column(self, column_name):
"""
Перевірка наявності та коректності колонки з датами.
Args:
column_name (str): Назва колонки
Returns:
bool: True якщо колонка існує і містить дати, False інакше
"""
return (column_name in self.df.columns and
pd.api.types.is_datetime64_dtype(self.df[column_name]))
def generate_basic_statistics(self):
"""
Генерація базової статистики по даним Jira.
Returns:
dict: Словник з базовою статистикою
"""
try:
stats = {
'total_tickets': len(self.df),
'status_counts': self._get_column_counts('Status'),
'type_counts': self._get_column_counts('Issue Type'),
'priority_counts': self._get_column_counts('Priority'),
'assignee_counts': self._get_column_counts('Assignee', limit=10),
'created_stats': {},
'updated_stats': {}
}
# Статистика за часом створення
if self._check_datetime_column('Created'):
created_min = self.df['Created'].min()
created_max = self.df['Created'].max()
# Групування за місяцями
if 'Created_Month' in self.df.columns:
created_by_month = self.df['Created_Month'].value_counts().sort_index()
stats['created_by_month'] = {str(k): v for k, v in created_by_month.items()}
stats['created_stats'] = {
'min': safe_strftime(created_min, "%Y-%m-%d"),
'max': safe_strftime(created_max, "%Y-%m-%d"),
'last_7_days': len(self.df[self.df['Created'] > (datetime.now() - timedelta(days=7))])
}
# Статистика за часом оновлення
if self._check_datetime_column('Updated'):
updated_min = self.df['Updated'].min()
updated_max = self.df['Updated'].max()
stats['updated_stats'] = {
'min': safe_strftime(updated_min, "%Y-%m-%d"),
'max': safe_strftime(updated_max, "%Y-%m-%d"),
'last_7_days': len(self.df[self.df['Updated'] > (datetime.now() - timedelta(days=7))])
}
logger.info("Базова статистика успішно згенерована")
return stats
except Exception as e:
logger.error(f"Помилка при генерації базової статистики: {e}")
return {'error': str(e)}
def analyze_inactive_issues(self, days=14):
"""
Аналіз неактивних тікетів (не оновлювались протягом певної кількості днів).
Args:
days (int): Кількість днів неактивності
Returns:
dict: Інформація про неактивні тікети
"""
try:
if not self._check_datetime_column('Updated'):
logger.warning("Колонка 'Updated' відсутня або не містить дат")
return {'error': "Неможливо аналізувати неактивні тікети"}
# Визначення неактивних тікетів
cutoff_date = datetime.now() - timedelta(days=days)
inactive_issues = self.df[self.df['Updated'] < cutoff_date]
inactive_data = {
'total_count': len(inactive_issues),
'percentage': round(len(inactive_issues) / len(self.df) * 100, 2) if len(self.df) > 0 else 0,
'by_status': {},
'by_priority': {},
'top_inactive': []
}
# Розподіл за статусами та пріоритетами
if len(inactive_issues) > 0:
inactive_data['by_status'] = inactive_issues['Status'].value_counts().to_dict() if 'Status' in inactive_issues.columns else {}
inactive_data['by_priority'] = inactive_issues['Priority'].value_counts().to_dict() if 'Priority' in inactive_issues.columns else {}
# Топ 5 неактивних тікетів
top_inactive = inactive_issues.sort_values('Updated', ascending=True).head(5)
for _, row in top_inactive.iterrows():
issue_data = {
'key': row.get('Issue key', 'Unknown'),
'summary': row.get('Summary', 'Unknown'),
'status': row.get('Status', 'Unknown'),
'last_updated': safe_strftime(row['Updated'], '%Y-%m-%d'),
'days_inactive': (datetime.now() - row['Updated']).days if pd.notna(row['Updated']) else 'Unknown'
}
inactive_data['top_inactive'].append(issue_data)
logger.info(f"Знайдено {len(inactive_issues)} неактивних тікетів (>{days} днів)")
return inactive_data
except Exception as e:
logger.error(f"Помилка при аналізі неактивних тікетів: {e}")
return {'error': str(e)}
def analyze_timeline(self):
"""
Аналіз часової шкали проекту (зміна стану тікетів з часом).
Returns:
pandas.DataFrame: Дані для візуалізації або None у випадку помилки
"""
try:
if not self._check_datetime_column('Created') or not self._check_datetime_column('Updated'):
logger.warning("Відсутні необхідні колонки з датами для аналізу часової шкали")
return None
# Визначення часового діапазону
min_date = self.df['Created'].min().date()
max_date = self.df['Updated'].max().date()
# Створення часового ряду для кожного дня
date_range = pd.date_range(start=min_date, end=max_date, freq='D')
timeline_data = []
for date in date_range:
current_date = date.date()
date_str = safe_strftime(date, '%Y-%m-%d')
# Тікети, створені до цієї дати
created_until = self.df[self.df['Created'].dt.date <= current_date]
# Статуси тікетів на цю дату
status_counts = {}
# Для кожного тікета визначаємо його статус на цю дату
for _, row in created_until.iterrows():
if pd.notna(row['Updated']) and row['Updated'].date() >= current_date:
status = row.get('Status', 'Unknown')
status_counts[status] = status_counts.get(status, 0) + 1
# Додаємо запис для цієї дати
timeline_data.append({
'Date': date_str,
'Total': len(created_until),
**status_counts
})
# Створення DataFrame
timeline_df = pd.DataFrame(timeline_data)
logger.info("Часова шкала успішно проаналізована")
return timeline_df
except Exception as e:
logger.error(f"Помилка при аналізі часової шкали: {e}")
return None
def analyze_lead_time(self):
"""
Аналіз часу виконання тікетів (Lead Time).
Returns:
dict: Статистика по часу виконання
"""
try:
if not self._check_datetime_column('Created'):
logger.warning("Колонка 'Created' відсутня або не містить дат")
return {'error': "Неможливо аналізувати час виконання"}
if 'Resolved' not in self.df.columns:
logger.warning("Колонка 'Resolved' відсутня")
return {'error': "Неможливо аналізувати час виконання"}
# Конвертація колонки Resolved до datetime, якщо потрібно
resolved_column = self.df['Resolved']
if not pd.api.types.is_datetime64_dtype(resolved_column):
resolved_column = pd.to_datetime(resolved_column, errors='coerce')
# Фільтрація завершених тікетів
df_with_resolved = self.df.copy()
df_with_resolved['Resolved'] = resolved_column
completed_issues = df_with_resolved.dropna(subset=['Resolved'])
if len(completed_issues) == 0:
logger.warning("Немає завершених тікетів для аналізу")
return {'total_count': 0}
# Обчислення Lead Time (в днях)
completed_issues['Lead_Time_Days'] = (completed_issues['Resolved'] - completed_issues['Created']).dt.days
# Фільтрація некоректних значень
valid_lead_time = completed_issues[completed_issues['Lead_Time_Days'] >= 0]
# Якщо немає валідних записів після фільтрації
if len(valid_lead_time) == 0:
logger.warning("Немає валідних записів для аналізу Lead Time")
return {'total_count': 0, 'error': "Немає валідних записів для аналізу Lead Time"}
lead_time_stats = {
'total_count': len(valid_lead_time),
'avg_lead_time': round(valid_lead_time['Lead_Time_Days'].mean(), 2),
'median_lead_time': round(valid_lead_time['Lead_Time_Days'].median(), 2),
'min_lead_time': valid_lead_time['Lead_Time_Days'].min(),
'max_lead_time': valid_lead_time['Lead_Time_Days'].max(),
'by_type': {},
'by_priority': {}
}
# Розподіл за типами і пріоритетами
if 'Issue Type' in valid_lead_time.columns:
lead_time_by_type = valid_lead_time.groupby('Issue Type')['Lead_Time_Days'].mean().round(2)
lead_time_stats['by_type'] = lead_time_by_type.to_dict()
if 'Priority' in valid_lead_time.columns:
lead_time_by_priority = valid_lead_time.groupby('Priority')['Lead_Time_Days'].mean().round(2)
lead_time_stats['by_priority'] = lead_time_by_priority.to_dict()
logger.info("Час виконання успішно проаналізований")
return lead_time_stats
except Exception as e:
logger.error(f"Помилка при аналізі часу виконання: {e}")
return {'error': str(e)} |