|
from transformers import pipeline |
|
import pandas as pd |
|
import logging |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
def detect_anomalies(df): |
|
"""Detect anomalies in device logs using BERT-based text classification.""" |
|
logger.info("Detecting anomalies...") |
|
try: |
|
|
|
df['text'] = df.apply(lambda x: f"{x['status']} Usage:{x['usage_count']}", axis=1) |
|
|
|
|
|
classifier = pipeline( |
|
"text-classification", |
|
model="prajjwal1/bert-tiny", |
|
tokenizer="prajjwal1/bert-tiny", |
|
clean_up_tokenization_spaces=False |
|
) |
|
|
|
|
|
results = classifier(df['text'].tolist()) |
|
|
|
|
|
df['anomaly'] = [result['label'] for result in results] |
|
|
|
|
|
anomalies = df[df['anomaly'] == "POSITIVE"] |
|
|
|
logger.info(f"Detected {len(anomalies)} anomalies...") |
|
return anomalies |
|
except Exception as e: |
|
logger.error(f"Failed to detect anomalies: {e}") |
|
raise |