|
import gradio as gr |
|
import pandas as pd |
|
import matplotlib.pyplot as plt |
|
from sklearn.ensemble import IsolationForest |
|
from datetime import datetime, timedelta |
|
import os |
|
import logging |
|
from reportlab.lib.pagesizes import letter |
|
from reportlab.pdfgen import canvas |
|
import tempfile |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s,%(msecs)03d - %(levelname)s - %(message)s') |
|
|
|
def process_files(uploaded_files): |
|
""" |
|
Process uploaded CSV files, generate usage plots, detect anomalies, and process AMC expiries. |
|
Returns a dataframe, plot path, PDF path, and AMC expiry message. |
|
""" |
|
|
|
logging.info(f"Received uploaded files: {uploaded_files}") |
|
|
|
if not uploaded_files: |
|
logging.warning("No files uploaded.") |
|
return None, None, None, "Please upload at least one valid CSV file." |
|
|
|
valid_files = [f for f in uploaded_files if f.name.endswith('.csv')] |
|
logging.info(f"Processing {len(valid_files)} valid files: {valid_files}") |
|
|
|
if not valid_files: |
|
logging.warning("No valid CSV files uploaded.") |
|
return None, None, None, "Please upload at least one valid CSV file." |
|
|
|
logging.info("Loading logs from uploaded files...") |
|
all_data = [] |
|
|
|
|
|
for file in valid_files: |
|
try: |
|
df = pd.read_csv(file.name) |
|
logging.info(f"Loaded {len(df)} records from {file.name}") |
|
all_data.append(df) |
|
except Exception as e: |
|
logging.error(f"Failed to load {file.name}: {str(e)}") |
|
return None, None, None, f"Error loading {file.name}: {str(e)}" |
|
|
|
if not all_data: |
|
logging.warning("No data loaded from uploaded files.") |
|
return None, None, None, "No valid data found in uploaded files." |
|
|
|
combined_df = pd.concat(all_data, ignore_index=True) |
|
logging.info(f"Combined {len(combined_df)} total records.") |
|
logging.info(f"Loaded {len(combined_df)} log records from uploaded files.") |
|
|
|
|
|
logging.info("Generating usage plot...") |
|
plot_path = generate_usage_plot(combined_df) |
|
if plot_path: |
|
logging.info("Usage plot generated successfully.") |
|
else: |
|
logging.error("Failed to generate usage plot.") |
|
return combined_df, None, None, "Failed to generate usage plot." |
|
|
|
|
|
logging.info("Detecting anomalies...") |
|
anomaly_df = detect_anomalies(combined_df) |
|
if anomaly_df is None: |
|
logging.error("Failed to detect anomalies.") |
|
else: |
|
logging.info(f"Detected {sum(anomaly_df['anomaly'] == -1)} anomalies.") |
|
|
|
|
|
logging.info("Processing AMC expiries...") |
|
amc_message, amc_df = process_amc_expiries(combined_df) |
|
|
|
|
|
pdf_path = generate_pdf_report(combined_df, anomaly_df, amc_df) |
|
|
|
|
|
output_df = combined_df.copy() |
|
if anomaly_df is not None: |
|
output_df['anomaly'] = anomaly_df['anomaly'] |
|
|
|
return output_df, plot_path, pdf_path, amc_message |
|
|
|
def generate_usage_plot(df): |
|
""" |
|
Generate a bar plot of usage_count by equipment and status. |
|
Returns the path to the saved plot. |
|
""" |
|
try: |
|
plt.figure(figsize=(10, 6)) |
|
for status in df['status'].unique(): |
|
subset = df[df['status'] == status] |
|
plt.bar(subset['equipment'] + f" ({status})", subset['usage_count'], label=status) |
|
plt.xlabel("Equipment (Status)") |
|
plt.ylabel("Usage Count") |
|
plt.title("Usage Count by Equipment and Status") |
|
plt.legend() |
|
plt.xticks(rotation=45) |
|
plt.tight_layout() |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as tmp: |
|
plt.savefig(tmp.name, format='png') |
|
plot_path = tmp.name |
|
plt.close() |
|
return plot_path |
|
except Exception as e: |
|
logging.error(f"Failed to generate usage plot: {str(e)}") |
|
return None |
|
|
|
def detect_anomalies(df): |
|
""" |
|
Detect anomalies in usage_count using Isolation Forest. |
|
Returns a dataframe with an 'anomaly' column (-1 for anomalies, 1 for normal). |
|
""" |
|
try: |
|
model = IsolationForest(contamination=0.1, random_state=42) |
|
anomalies = model.fit_predict(df[['usage_count']].values) |
|
anomaly_df = df.copy() |
|
anomaly_df['anomaly'] = anomalies |
|
return anomaly_df |
|
except Exception as e: |
|
logging.error(f"Failed to detect anomalies: {str(e)}") |
|
return None |
|
|
|
def process_amc_expiries(df): |
|
""" |
|
Identify devices with AMC expiries within 7 days from 2025-06-05. |
|
Returns a message and a dataframe of devices with upcoming expiries. |
|
""" |
|
try: |
|
current_date = datetime(2025, 6, 5) |
|
threshold = current_date + timedelta(days=7) |
|
df['amc_expiry'] = pd.to_datetime(df['amc_expiry']) |
|
upcoming_expiries = df[df['amc_expiry'] <= threshold] |
|
unique_devices = upcoming_expiries['equipment'].unique() |
|
message = f"Found {len(unique_devices)} devices with upcoming AMC expiries." |
|
logging.info(message) |
|
return message, upcoming_expiries |
|
except Exception as e: |
|
logging.error(f"Failed to process AMC expiries: {str(e)}") |
|
return f"Error processing AMC expiries: {str(e)}", None |
|
|
|
def generate_pdf_report(original_df, anomaly_df, amc_df): |
|
""" |
|
Generate a PDF report with data summary, anomalies, and AMC expiries. |
|
Returns the path to the saved PDF. |
|
""" |
|
try: |
|
if original_df is None: |
|
logging.warning("No data available for PDF generation.") |
|
return None |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp: |
|
c = canvas.Canvas(tmp.name, pagesize=letter) |
|
c.drawString(100, 750, "Equipment Log Analysis Report") |
|
y = 700 |
|
|
|
|
|
c.drawString(100, y, f"Total Records: {len(original_df)}") |
|
c.drawString(100, y-20, f"Devices: {', '.join(original_df['equipment'].unique())}") |
|
y -= 40 |
|
|
|
|
|
if anomaly_df is not None: |
|
num_anomalies = sum(anomaly_df['anomaly'] == -1) |
|
c.drawString(100, y, f"Anomalies Detected: {num_anomalies}") |
|
if num_anomalies > 0: |
|
anomaly_equipment = anomaly_df[anomaly_df['anomaly'] == -1]['equipment'].unique() |
|
c.drawString(100, y-20, f"Anomalous Devices: {', '.join(anomaly_equipment)}") |
|
y -= 40 |
|
else: |
|
c.drawString(100, y, "Anomaly detection failed.") |
|
y -= 20 |
|
|
|
|
|
if amc_df is not None and not amc_df.empty: |
|
c.drawString(100, y, f"Devices with Upcoming AMC Expiries: {len(amc_df['equipment'].unique())}") |
|
for _, row in amc_df.iterrows(): |
|
c.drawString(100, y-20, f"{row['equipment']}: {row['amc_expiry'].strftime('%Y-%m-%d')}") |
|
y -= 20 |
|
else: |
|
c.drawString(100, y, "No AMC expiry data available.") |
|
y -= 20 |
|
|
|
c.showPage() |
|
c.save() |
|
return tmp.name |
|
except Exception as e: |
|
logging.error(f"Failed to generate PDF report: {str(e)}") |
|
return None |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Equipment Log Analysis") |
|
with gr.Row(): |
|
file_input = gr.File(file_count="multiple", label="Upload CSV Files") |
|
process_button = gr.Button("Process Files") |
|
with gr.Row(): |
|
output_df = gr.Dataframe(label="Processed Data") |
|
output_plot = gr.Image(label="Usage Plot") |
|
with gr.Row(): |
|
output_message = gr.Textbox(label="AMC Expiry Status") |
|
output_pdf = gr.File(label="Download PDF Report") |
|
|
|
process_button.click( |
|
fn=process_files, |
|
inputs=[file_input], |
|
outputs=[output_df, output_plot, output_pdf, output_message] |
|
) |
|
|
|
if __name__ == "__main__": |
|
logging.info("Application starting...") |
|
demo.launch(server_name="0.0.0.0", server_port=7860) |