abnormalanomaly / app.py
karthikmn's picture
Update app.py
fe68a78 verified
import gradio as gr
import tensorflow as tf
import tensorflow_hub as hub
import os
from preprocessing import preprocess
import numpy as np
import shutil
import cv2
import constants as const
from get_drive_model import ensure_model_download
import atexit # Import atexit module
from simple_salesforce import Salesforce
from datetime import datetime
# Salesforce login
sf = Salesforce(
username='karthikm@sathkrutha.com',
password='Navya@1223',
security_token='FDWBkm0pbrNFkv6bwznbW1SKn',
domain='login' # use 'test' for sandbox, 'login' for production
)
# Salesforce object API name
SALESFORCE_OBJECT = 'Anomaly_Result__c'
def log_to_salesforce(video_path, prediction_text):
try:
result = sf.__getattr__(SALESFORCE_OBJECT).create({
'Video_Name__c': os.path.basename(video_path),
'Prediction_Result__c': prediction_text,
'Timestamp__c': datetime.utcnow().isoformat()
})
print("Salesforce Record Created:", result)
except Exception as e:
print("Salesforce Logging Failed:", e)
# Define the path for the directory
FRAMES_FOLDER = 'static/frames'
# Create the directory if it doesn't exist
os.makedirs(FRAMES_FOLDER, exist_ok=True)
# Ensure models are downloaded
ensure_model_download(const.ANOMALY_DETECTION_MODEL_FILE_ID, 'anomaly_detection_model.h5')
ensure_model_download(const.ANOMALY_CLASSIFICATION_MODEL_FILE_ID, 'anomaly_classification_model.h5')
# Load the model once at startup
first_model = tf.keras.models.load_model('anomaly_detection_model.h5', custom_objects={'KerasLayer': hub.KerasLayer})
second_model = tf.keras.models.load_model('anomaly_classification_model.h5', custom_objects={'KerasLayer': hub.KerasLayer})
UPLOAD_FOLDER = 'uploads'
FRAMES_FOLDER = 'static/frames'
def process_video(filepath):
print(f"Processing video: {filepath}")
frames_for_prediction, frames_for_display = preprocess(filepath)
print(f"Shape of frames for prediction: {frames_for_prediction.shape}")
print(f"Shape of frames for display: {frames_for_display.shape}")
print(first_model.summary())
anomaly_prediction = first_model.predict(frames_for_prediction)[0][0]
print(f"Anomaly Prediction: {anomaly_prediction}")
if anomaly_prediction < 0.5:
classification_prediction = second_model.predict(frames_for_prediction)[0][0]
if classification_prediction < 0.5:
prediction_label = f'The video is an Anomaly type.\nanomaly prediction with {(1-anomaly_prediction)*100:.2f}% confidence\nExplosion Detected with {(1-classification_prediction)*100:.2f}% confidence'
else:
prediction_label = f'The video is an Anomaly type.\nanomaly prediction with {(1-anomaly_prediction)* 100:.2f}% confidence\nViolent Activity Detected with {classification_prediction * 100:.2f}% confidence'
else:
prediction_label = f'No Anomalous Activity with {anomaly_prediction * 100:.2f}% confidence.'
frame_paths = save_frames_to_filesystem(frames_for_display)
# Log prediction to Salesforce
log_to_salesforce(filepath, prediction_label)
return prediction_label, frame_paths
def save_frames_to_filesystem(frames):
frame_paths = []
for i, frame in enumerate(frames):
frame_uint8 = frame.astype(np.uint8)
frame_filename = f'frame_{i}.png'
frame_path = os.path.join(FRAMES_FOLDER, frame_filename)
cv2.imwrite(frame_path, frame_uint8)
frame_paths.append(frame_path)
return frame_paths
def cleanup_uploads_folder():
if os.path.exists(UPLOAD_FOLDER):
for filename in os.listdir(UPLOAD_FOLDER):
file_path = os.path.join(UPLOAD_FOLDER, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print(f'Failed to delete {file_path}. Reason: {e}')
if os.path.exists(FRAMES_FOLDER):
for filename in os.listdir(FRAMES_FOLDER):
file_path = os.path.join(FRAMES_FOLDER, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print(f'Failed to delete {file_path}. Reason: {e}')
print("Uploads and frames folders cleaned")
# Register the cleanup function
atexit.register(cleanup_uploads_folder)
# Create Gradio Interface
iface = gr.Interface(
fn=process_video,
inputs=gr.File(type="filepath"),
outputs=[
gr.Textbox(label="Prediction", elem_id="prediction-box"),
gr.Gallery(label="Video Frames", elem_id="frame-gallery", columns=5, rows=10)
],
title="Anomaly Detection in Videos",
description="Upload a video file and detect anomalies, violent activity, or explosions.",
theme="default",
css="""
body {
background-color: #f0f8ff;
}
.interface {
border-radius: 20px;
background: #ffffff;
padding: 20px;
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.2);
}
.interface-title {
color: #333399;
font-size: 32px;
}
.interface-description {
color: #6666cc;
font-size: 18px;
}
#prediction-box {
background: #e6e6ff;
border-radius: 10px;
}
#frame-gallery {
background: #e6e6ff;
border-radius: 10px;
}
.gallery-item img {
border: 2px solid #6666cc;
border-radius: 10px;
}
.btn-primary {
background-color: #6666cc;
border-color: #6666cc;
}
.btn-primary:hover {
background-color: #333399;
border-color: #333399;
}
"""
)
# Additional interfaces for individual triggers
violent_iface = gr.Interface(
fn=process_video,
inputs=gr.File(type="filepath"),
outputs=[
gr.Textbox(label="Prediction"),
gr.Gallery(label="Video Frames", columns=5, rows=10)
],
title="Violent Detection"
)
explosion_iface = gr.Interface(
fn=process_video,
inputs=gr.File(type="filepath"),
outputs=[
gr.Textbox(label="Prediction"),
gr.Gallery(label="Video Frames", columns=5, rows=10)
],
title="Explosion Detection"
)
normal_iface = gr.Interface(
fn=process_video,
inputs=gr.File(type="filepath"),
outputs=[
gr.Textbox(label="Prediction"),
gr.Gallery(label="Video Frames", columns=5, rows=10)
],
title="Normal Detection"
)
# Combine all interfaces into a single application
combined_iface = gr.TabbedInterface([iface, violent_iface, explosion_iface, normal_iface],
["Upload Video", "Violent Detection", "Explosion Detection", "Suspicious Activities"])
if __name__ == "__main__":
combined_iface.launch()