import gradio as gr import tensorflow as tf import tensorflow_hub as hub import os from preprocessing import preprocess import numpy as np import shutil import cv2 import constants as const from get_drive_model import ensure_model_download import atexit # Import atexit module from simple_salesforce import Salesforce from datetime import datetime # Salesforce login sf = Salesforce( username='karthikm@sathkrutha.com', password='Navya@1223', security_token='FDWBkm0pbrNFkv6bwznbW1SKn', domain='login' # use 'test' for sandbox, 'login' for production ) # Salesforce object API name SALESFORCE_OBJECT = 'Anomaly_Result__c' def log_to_salesforce(video_path, prediction_text): try: result = sf.__getattr__(SALESFORCE_OBJECT).create({ 'Video_Name__c': os.path.basename(video_path), 'Prediction_Result__c': prediction_text, 'Timestamp__c': datetime.utcnow().isoformat() }) print("Salesforce Record Created:", result) except Exception as e: print("Salesforce Logging Failed:", e) # Define the path for the directory FRAMES_FOLDER = 'static/frames' # Create the directory if it doesn't exist os.makedirs(FRAMES_FOLDER, exist_ok=True) # Ensure models are downloaded ensure_model_download(const.ANOMALY_DETECTION_MODEL_FILE_ID, 'anomaly_detection_model.h5') ensure_model_download(const.ANOMALY_CLASSIFICATION_MODEL_FILE_ID, 'anomaly_classification_model.h5') # Load the model once at startup first_model = tf.keras.models.load_model('anomaly_detection_model.h5', custom_objects={'KerasLayer': hub.KerasLayer}) second_model = tf.keras.models.load_model('anomaly_classification_model.h5', custom_objects={'KerasLayer': hub.KerasLayer}) UPLOAD_FOLDER = 'uploads' FRAMES_FOLDER = 'static/frames' def process_video(filepath): print(f"Processing video: {filepath}") frames_for_prediction, frames_for_display = preprocess(filepath) print(f"Shape of frames for prediction: {frames_for_prediction.shape}") print(f"Shape of frames for display: {frames_for_display.shape}") print(first_model.summary()) anomaly_prediction = first_model.predict(frames_for_prediction)[0][0] print(f"Anomaly Prediction: {anomaly_prediction}") if anomaly_prediction < 0.5: classification_prediction = second_model.predict(frames_for_prediction)[0][0] if classification_prediction < 0.5: prediction_label = f'The video is an Anomaly type.\nanomaly prediction with {(1-anomaly_prediction)*100:.2f}% confidence\nExplosion Detected with {(1-classification_prediction)*100:.2f}% confidence' else: prediction_label = f'The video is an Anomaly type.\nanomaly prediction with {(1-anomaly_prediction)* 100:.2f}% confidence\nViolent Activity Detected with {classification_prediction * 100:.2f}% confidence' else: prediction_label = f'No Anomalous Activity with {anomaly_prediction * 100:.2f}% confidence.' frame_paths = save_frames_to_filesystem(frames_for_display) # Log prediction to Salesforce log_to_salesforce(filepath, prediction_label) return prediction_label, frame_paths def save_frames_to_filesystem(frames): frame_paths = [] for i, frame in enumerate(frames): frame_uint8 = frame.astype(np.uint8) frame_filename = f'frame_{i}.png' frame_path = os.path.join(FRAMES_FOLDER, frame_filename) cv2.imwrite(frame_path, frame_uint8) frame_paths.append(frame_path) return frame_paths def cleanup_uploads_folder(): if os.path.exists(UPLOAD_FOLDER): for filename in os.listdir(UPLOAD_FOLDER): file_path = os.path.join(UPLOAD_FOLDER, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: print(f'Failed to delete {file_path}. Reason: {e}') if os.path.exists(FRAMES_FOLDER): for filename in os.listdir(FRAMES_FOLDER): file_path = os.path.join(FRAMES_FOLDER, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: print(f'Failed to delete {file_path}. Reason: {e}') print("Uploads and frames folders cleaned") # Register the cleanup function atexit.register(cleanup_uploads_folder) # Create Gradio Interface iface = gr.Interface( fn=process_video, inputs=gr.File(type="filepath"), outputs=[ gr.Textbox(label="Prediction", elem_id="prediction-box"), gr.Gallery(label="Video Frames", elem_id="frame-gallery", columns=5, rows=10) ], title="Anomaly Detection in Videos", description="Upload a video file and detect anomalies, violent activity, or explosions.", theme="default", css=""" body { background-color: #f0f8ff; } .interface { border-radius: 20px; background: #ffffff; padding: 20px; box-shadow: 0 4px 8px rgba(0, 0, 0, 0.2); } .interface-title { color: #333399; font-size: 32px; } .interface-description { color: #6666cc; font-size: 18px; } #prediction-box { background: #e6e6ff; border-radius: 10px; } #frame-gallery { background: #e6e6ff; border-radius: 10px; } .gallery-item img { border: 2px solid #6666cc; border-radius: 10px; } .btn-primary { background-color: #6666cc; border-color: #6666cc; } .btn-primary:hover { background-color: #333399; border-color: #333399; } """ ) # Additional interfaces for individual triggers violent_iface = gr.Interface( fn=process_video, inputs=gr.File(type="filepath"), outputs=[ gr.Textbox(label="Prediction"), gr.Gallery(label="Video Frames", columns=5, rows=10) ], title="Violent Detection" ) explosion_iface = gr.Interface( fn=process_video, inputs=gr.File(type="filepath"), outputs=[ gr.Textbox(label="Prediction"), gr.Gallery(label="Video Frames", columns=5, rows=10) ], title="Explosion Detection" ) normal_iface = gr.Interface( fn=process_video, inputs=gr.File(type="filepath"), outputs=[ gr.Textbox(label="Prediction"), gr.Gallery(label="Video Frames", columns=5, rows=10) ], title="Normal Detection" ) # Combine all interfaces into a single application combined_iface = gr.TabbedInterface([iface, violent_iface, explosion_iface, normal_iface], ["Upload Video", "Violent Detection", "Explosion Detection", "Suspicious Activities"]) if __name__ == "__main__": combined_iface.launch()