Spaces:
Sleeping
Sleeping
import gradio as gr | |
import spaces | |
from transformers import pipeline | |
import torch | |
import time | |
import re | |
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Professional Dashboard CSS - Complete Textbox Display | |
professional_css = """ | |
/* Professional SOC Dashboard - Fixed */ | |
.gradio-container { | |
max-width: 100% !important; | |
min-height: 100vh !important; | |
margin: 0 !important; | |
padding: 0 !important; | |
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif !important; | |
background: linear-gradient(135deg, #1e3c72 0%, #2a5298 100%) !important; | |
overflow-x: hidden !important; | |
overflow-y: auto !important; | |
} | |
/* Header Section */ | |
.dashboard-header { | |
background: rgba(255, 255, 255, 0.95) !important; | |
backdrop-filter: blur(10px) !important; | |
padding: 8px 20px !important; | |
margin: 8px !important; | |
border-radius: 8px !important; | |
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1) !important; | |
text-align: center !important; | |
} | |
.header-title { | |
font-size: 24px !important; | |
font-weight: 700 !important; | |
color: #1e3c72 !important; | |
margin: 0 !important; | |
} | |
.header-subtitle { | |
font-size: 14px !important; | |
color: #666 !important; | |
margin: 4px 0 0 0 !important; | |
} | |
/* Main Dashboard Grid - Fixed Heights */ | |
.dashboard-grid { | |
display: grid !important; | |
grid-template-columns: 1fr 1fr !important; | |
gap: 10px !important; | |
padding: 0 8px !important; | |
min-height: calc(100vh - 120px) !important; | |
align-items: start !important; | |
} | |
/* Task Panels - Fixed Overflow */ | |
.task-panel { | |
background: rgba(255, 255, 255, 0.98) !important; | |
border-radius: 12px !important; | |
padding: 15px !important; | |
box-shadow: 0 6px 25px rgba(0, 0, 0, 0.1) !important; | |
border: 2px solid rgba(255, 255, 255, 0.3) !important; | |
display: flex !important; | |
flex-direction: column !important; | |
min-height: 600px !important; | |
max-height: none !important; | |
overflow: visible !important; | |
} | |
.task-header { | |
background: linear-gradient(135deg, #1e3c72, #2a5298) !important; | |
color: white !important; | |
padding: 10px 15px !important; | |
margin: -15px -15px 15px -15px !important; | |
border-radius: 10px 10px 0 0 !important; | |
font-weight: 600 !important; | |
font-size: 16px !important; | |
text-align: center !important; | |
} | |
/* Input Areas - Fixed Sizing */ | |
.compact-input { | |
border: 2px solid #e1e8ed !important; | |
border-radius: 6px !important; | |
padding: 8px 12px !important; | |
font-size: 13px !important; | |
margin: 5px 0 !important; | |
background: #fafbfc !important; | |
min-height: 40px !important; | |
width: 100% !important; | |
box-sizing: border-box !important; | |
} | |
.detection-input { | |
font-family: 'Courier New', monospace !important; | |
background: #2d3748 !important; | |
color: #e2e8f0 !important; | |
border: 2px solid #4a5568 !important; | |
min-height: 120px !important; | |
resize: vertical !important; | |
} | |
.compact-input:focus { | |
border-color: #1e3c72 !important; | |
box-shadow: 0 0 0 2px rgba(30, 60, 114, 0.1) !important; | |
outline: none !important; | |
} | |
/* Output Areas - Fixed Heights */ | |
.compact-output { | |
background: #f8fafc !important; | |
border: 1px solid #e2e8f0 !important; | |
border-radius: 6px !important; | |
padding: 12px !important; | |
font-size: 12px !important; | |
line-height: 1.5 !important; | |
overflow-y: auto !important; | |
min-height: 150px !important; | |
max-height: 250px !important; | |
width: 100% !important; | |
box-sizing: border-box !important; | |
white-space: pre-wrap !important; | |
} | |
/* Buttons */ | |
.primary-btn { | |
background: linear-gradient(135deg, #1e3c72, #2a5298) !important; | |
border: none !important; | |
color: white !important; | |
padding: 10px 18px !important; | |
border-radius: 6px !important; | |
font-weight: 600 !important; | |
font-size: 13px !important; | |
margin: 3px !important; | |
transition: all 0.3s ease !important; | |
cursor: pointer !important; | |
min-height: 40px !important; | |
} | |
.primary-btn:hover { | |
transform: translateY(-1px) !important; | |
box-shadow: 0 4px 12px rgba(30, 60, 114, 0.3) !important; | |
} | |
.secondary-btn { | |
background: #64748b !important; | |
border: none !important; | |
color: white !important; | |
padding: 8px 14px !important; | |
border-radius: 4px !important; | |
font-size: 12px !important; | |
margin: 2px !important; | |
cursor: pointer !important; | |
min-height: 36px !important; | |
} | |
/* Status Indicators */ | |
.status-indicator { | |
padding: 6px 10px !important; | |
border-radius: 4px !important; | |
font-size: 11px !important; | |
font-weight: 600 !important; | |
margin: 4px 0 !important; | |
text-align: center !important; | |
min-height: 30px !important; | |
display: flex !important; | |
align-items: center !important; | |
justify-content: center !important; | |
} | |
.status-success { | |
background: #d1fae5 !important; | |
color: #065f46 !important; | |
border: 1px solid #a7f3d0 !important; | |
} | |
.status-warning { | |
background: #fef3c7 !important; | |
color: #92400e !important; | |
border: 1px solid #fcd34d !important; | |
} | |
.status-error { | |
background: #fee2e2 !important; | |
color: #991b1b !important; | |
border: 1px solid #fca5a5 !important; | |
} | |
/* Control Sections */ | |
.control-section { | |
margin: 10px 0 !important; | |
padding: 10px !important; | |
background: #f1f5f9 !important; | |
border-radius: 6px !important; | |
border-left: 4px solid #1e3c72 !important; | |
} | |
.control-label { | |
font-size: 12px !important; | |
font-weight: 600 !important; | |
color: #334155 !important; | |
margin-bottom: 6px !important; | |
display: block !important; | |
} | |
/* Results Display */ | |
.result-section { | |
flex-grow: 1 !important; | |
display: flex !important; | |
flex-direction: column !important; | |
min-height: 0 !important; | |
margin: 8px 0 !important; | |
} | |
.result-header { | |
font-size: 13px !important; | |
font-weight: 600 !important; | |
color: #1e3c72 !important; | |
margin: 8px 0 6px 0 !important; | |
padding: 6px 10px !important; | |
background: #e2e8f0 !important; | |
border-radius: 4px !important; | |
display: block !important; | |
} | |
/* Gradio specific fixes */ | |
.gradio-textbox, .gradio-textbox > label, .gradio-textbox > div { | |
min-height: inherit !important; | |
} | |
.gradio-textbox textarea { | |
min-height: 100px !important; | |
resize: vertical !important; | |
} | |
.gradio-radio { | |
margin: 8px 0 !important; | |
} | |
.gradio-radio > div { | |
flex-wrap: wrap !important; | |
gap: 8px !important; | |
} | |
/* Responsive adjustments */ | |
@media (max-width: 1200px) { | |
.dashboard-grid { | |
grid-template-columns: 1fr !important; | |
grid-template-rows: auto auto !important; | |
gap: 15px !important; | |
} | |
.task-panel { | |
min-height: 500px !important; | |
} | |
} | |
@media (max-width: 768px) { | |
.dashboard-header { | |
padding: 6px 15px !important; | |
margin: 6px !important; | |
} | |
.header-title { | |
font-size: 20px !important; | |
} | |
.header-subtitle { | |
font-size: 12px !important; | |
} | |
.task-panel { | |
padding: 12px !important; | |
min-height: 400px !important; | |
} | |
} | |
/* Custom scrollbar */ | |
.compact-output::-webkit-scrollbar { | |
width: 6px !important; | |
} | |
.compact-output::-webkit-scrollbar-track { | |
background: #f1f1f1 !important; | |
border-radius: 3px !important; | |
} | |
.compact-output::-webkit-scrollbar-thumb { | |
background: #1e3c72 !important; | |
border-radius: 3px !important; | |
} | |
.compact-output::-webkit-scrollbar-thumb:hover { | |
background: #2a5298 !important; | |
} | |
/* Sample data styling */ | |
.sample-data { | |
font-size: 11px !important; | |
background: #2d3748 !important; | |
color: #e2e8f0 !important; | |
padding: 8px !important; | |
border-radius: 4px !important; | |
font-family: 'Courier New', monospace !important; | |
margin: 6px 0 !important; | |
} | |
/* Fix for textbox containers */ | |
.gradio-container .gradio-column { | |
min-width: 0 !important; | |
} | |
.gradio-container .gradio-row { | |
flex-wrap: wrap !important; | |
} | |
""" | |
# Global model variables | |
pipe = None | |
model_status = "🔄 Loading..." | |
def load_model(): | |
"""Load GPT-OSS-20B model with improved error handling""" | |
global pipe, model_status | |
try: | |
logger.info("Starting model loading process...") | |
model_status = "🔄 Loading GPT-OSS-20B model..." | |
# Load the specific model requested | |
logger.info("Loading gpt-oss-20b model...") | |
pipe = pipeline( | |
"text-generation", | |
model="openai/gpt-oss-20b", | |
torch_dtype=torch.float16, # Use fp16 for better memory efficiency | |
device_map="auto", | |
trust_remote_code=True, | |
max_length=512, # Limit context length | |
pad_token_id=50256 # Set pad token | |
) | |
# Test the model with a simple prompt | |
logger.info("Testing model functionality...") | |
test_output = pipe( | |
"Test security analysis:", | |
max_new_tokens=10, | |
do_sample=True, | |
temperature=0.7, | |
pad_token_id=50256 | |
) | |
model_status = "✅ GPT-OSS-20B Ready" | |
logger.info("Model loaded successfully!") | |
return model_status | |
except Exception as e: | |
logger.error(f"Model loading failed: {str(e)}") | |
model_status = "⚠️ Model Loading Failed - Using Fallback" | |
pipe = None | |
return model_status | |
def detect_threats(logs, sensitivity): | |
"""Task 1: AI-powered Threat Detection""" | |
global pipe | |
if not logs.strip(): | |
return "Please provide log data.", "⚠️ No input" | |
start_time = time.time() | |
try: | |
if pipe is not None: | |
# Use GPT-OSS-20B for AI-powered detection | |
prompt = f"""Analyze these security logs for threats: | |
{logs} | |
Detection sensitivity: {sensitivity} | |
Analysis:""" | |
response = pipe( | |
prompt, | |
max_new_tokens=200, | |
do_sample=True, | |
temperature=0.3, | |
pad_token_id=50256, | |
truncation=True | |
) | |
ai_analysis = response[0]['generated_text'].split("Analysis:")[-1].strip() | |
else: | |
# Fallback to pattern-based detection | |
ai_analysis = "AI model unavailable. Using pattern-based detection." | |
# Enhanced pattern-based detection as backup/supplement | |
threats = [] | |
risk_score = 0 | |
# Authentication threats | |
failed_logins = len(re.findall(r'failed.*login|authentication.*failed', logs, re.IGNORECASE)) | |
if failed_logins > 3: | |
threats.append(f"🚨 Brute Force Attack ({failed_logins} failed attempts)") | |
risk_score += 30 | |
elif failed_logins > 0: | |
threats.append(f"⚠️ Failed Authentication ({failed_logins} attempts)") | |
risk_score += 15 | |
# Malicious execution | |
if re.search(r'powershell.*-enc|cmd\.exe|eval\(|exec\(', logs, re.IGNORECASE): | |
threats.append("🚨 Malicious Script Execution") | |
risk_score += 35 | |
# Network anomalies | |
if re.search(r'suspicious.*ip|unusual.*connection', logs, re.IGNORECASE): | |
threats.append("🚨 Suspicious Network Activity") | |
risk_score += 25 | |
# File anomalies | |
if re.search(r'unusual.*file|suspicious.*access', logs, re.IGNORECASE): | |
threats.append("⚠️ File System Anomaly") | |
risk_score += 20 | |
# Generate final result | |
if threats or pipe is not None: | |
severity = "CRITICAL" if risk_score > 50 else "HIGH" if risk_score > 30 else "MEDIUM" | |
confidence = min(95, 70 + len(threats) * 5) | |
result = f"""🚨 THREAT ANALYSIS RESULTS | |
AI ANALYSIS: | |
{ai_analysis} | |
DETECTED PATTERNS: | |
{chr(10).join(f"• {threat}" for threat in threats) if threats else "• No obvious threat patterns detected"} | |
ASSESSMENT: | |
• Risk Score: {risk_score}/100 | |
• Severity: {severity if threats else "LOW"} | |
• Confidence: {confidence}% | |
• Model: {"GPT-OSS-20B" if pipe else "Pattern-based"} | |
RECOMMENDATIONS: | |
• {"Immediate containment required" if risk_score > 40 else "Continue monitoring"} | |
• {"Escalate to L2 analyst" if risk_score > 30 else "Standard response"} | |
• Preserve all evidence | |
• Update threat intelligence""" | |
status = f"🚨 Analysis Complete - {len(threats)} threats found" if threats else "✅ Analysis Complete" | |
else: | |
result = """✅ NO THREATS DETECTED | |
Clean log analysis with no suspicious patterns identified. | |
Continue standard monitoring procedures.""" | |
status = "✅ CLEAN" | |
time_taken = round(time.time() - start_time, 1) | |
return result, f"{status} ({time_taken}s)" | |
except Exception as e: | |
logger.error(f"Detection error: {str(e)}") | |
return f"❌ Analysis failed: {str(e)}", "❌ ERROR" | |
def analyze_threat(threat, level): | |
"""Task 2: AI-powered Analyst Assistant""" | |
global pipe | |
if not threat.strip(): | |
return "Please describe the threat.", "⚠️ No input" | |
start_time = time.time() | |
try: | |
if pipe is not None: | |
# Use GPT-OSS-20B for AI analysis | |
prompt = f"""As a Level {level} SOC analyst, analyze this security threat: | |
{threat} | |
Provide detailed analysis including: | |
1. Threat assessment | |
2. Recommended actions | |
3. Priority level | |
4. Next steps | |
Analysis:""" | |
response = pipe( | |
prompt, | |
max_new_tokens=300, | |
do_sample=True, | |
temperature=0.4, | |
pad_token_id=50256, | |
truncation=True | |
) | |
ai_analysis = response[0]['generated_text'].split("Analysis:")[-1].strip() | |
result = f"""🤖 AI-POWERED {level} ANALYSIS | |
THREAT ASSESSMENT: | |
{ai_analysis} | |
MODEL: GPT-OSS-20B | |
ANALYST LEVEL: {level} | |
STATUS: AI Analysis Complete""" | |
else: | |
# Fallback analysis templates | |
templates = { | |
"L1": f"""🚨 L1 TRIAGE ANALYSIS | |
THREAT: {threat[:60]}... | |
IMMEDIATE ACTIONS: | |
• Assess severity | |
• Isolate systems | |
• Document evidence | |
• Escalate if high severity | |
DECISION: Escalate to L2 | |
PRIORITY: High""", | |
"L2": f"""🔍 L2 INVESTIGATION | |
INCIDENT: {threat[:60]}... | |
INVESTIGATION PLAN: | |
1. Evidence collection | |
2. Timeline analysis | |
3. Scope assessment | |
4. IOC identification | |
5. Containment measures | |
NEXT STEPS: Deploy monitoring""", | |
"L3": f"""🎯 L3 STRATEGIC ANALYSIS | |
THREAT ASSESSMENT: {threat[:60]}... | |
STRATEGIC RESPONSE: | |
• Executive notification | |
• Business impact review | |
• Advanced forensics | |
• Recovery planning | |
• Security improvements | |
RECOMMENDATION: Full IR activation""" | |
} | |
result = templates.get(level, templates["L2"]) | |
time_taken = round(time.time() - start_time, 1) | |
return result, f"✅ {level} Complete ({time_taken}s)" | |
except Exception as e: | |
logger.error(f"Analysis error: {str(e)}") | |
return f"❌ Analysis failed: {str(e)}", "❌ ERROR" | |
# Sample data | |
SAMPLE_LOGS = """2025-08-11 14:30:15 [AUTH] Failed login: 'admin' from 192.168.1.100 | |
2025-08-11 14:30:18 [AUTH] Failed login: 'administrator' from 192.168.1.100 | |
2025-08-11 14:30:45 [PROC] powershell.exe -WindowStyle Hidden -enc ZXhlYyBjYWxjLmV4ZQ== | |
2025-08-11 14:31:12 [NET] Suspicious connection to 45.33.22.11:443 | |
2025-08-11 14:31:30 [FILE] Unusual file access pattern detected | |
2025-08-11 14:32:01 [NET] Multiple connections from same source IP""" | |
SAMPLE_THREAT = "Multiple failed login attempts detected from IP 192.168.1.100, followed by encoded PowerShell execution and suspicious outbound network connections to known malicious IP addresses. Lateral movement indicators present." | |
# Main Dashboard Interface | |
with gr.Blocks(title="SOC LLM Dashboard", theme=gr.themes.Soft(), css=professional_css) as demo: | |
# Compact Header | |
gr.HTML(""" | |
<div class="dashboard-header"> | |
<div class="header-title">🛡️ SOC LLM Dashboard</div> | |
<div class="header-subtitle">Professional Security Operations Center • GPT-OSS-20B Powered Detection & Analysis</div> | |
</div> | |
""") | |
# System Status Bar | |
with gr.Row(): | |
system_status = gr.Textbox( | |
value="🔄 Initializing GPT-OSS-20B...", | |
label="System Status", | |
interactive=False, | |
elem_classes=["status-indicator", "status-warning"], | |
scale=2 | |
) | |
gr.HTML('<div style="width: 20px;"></div>') # Spacer | |
# Main Dashboard Grid | |
with gr.Row(equal_height=False, elem_classes=["dashboard-grid"]): | |
# ================== TASK 1: DETECTION PANEL ================== | |
with gr.Column(scale=1, elem_classes=["task-panel"]): | |
gr.HTML('<div class="task-header">📊 TASK 1: AI THREAT DETECTION</div>') | |
# Detection Controls | |
gr.HTML('<div class="control-label">Detection Sensitivity</div>') | |
detect_sensitivity = gr.Radio( | |
choices=["High", "Medium", "Low"], | |
value="Medium", | |
interactive=True, | |
elem_classes=["compact-input"] | |
) | |
with gr.Row(): | |
detect_btn = gr.Button("🔍 AI Detect", elem_classes=["primary-btn"], scale=2) | |
sample_logs_btn = gr.Button("📝 Sample", elem_classes=["secondary-btn"], scale=1) | |
# Log Input | |
gr.HTML('<div class="result-header">Security Logs Input</div>') | |
log_input = gr.Textbox( | |
placeholder="Paste security logs here for AI-powered analysis...", | |
lines=6, | |
elem_classes=["compact-input", "detection-input"], | |
interactive=True, | |
show_label=False | |
) | |
# Detection Results | |
gr.HTML('<div class="result-header">AI Detection Results</div>') | |
detection_output = gr.Textbox( | |
lines=8, | |
elem_classes=["compact-output"], | |
interactive=False, | |
placeholder="GPT-OSS-20B detection results will appear here...", | |
show_label=False | |
) | |
detection_status = gr.Textbox( | |
label="Status", | |
elem_classes=["status-indicator", "status-success"], | |
interactive=False, | |
show_label=False | |
) | |
# ================== TASK 2: ASSISTANT PANEL ================== | |
with gr.Column(scale=1, elem_classes=["task-panel"]): | |
gr.HTML('<div class="task-header">🤖 TASK 2: AI ANALYST ASSISTANT</div>') | |
# Assistant Controls | |
gr.HTML('<div class="control-label">Analyst Level</div>') | |
analyst_level = gr.Radio( | |
choices=["L1", "L2", "L3"], | |
value="L2", | |
interactive=True, | |
elem_classes=["compact-input"] | |
) | |
with gr.Row(): | |
analyze_btn = gr.Button("🚀 AI Analyze", elem_classes=["primary-btn"], scale=2) | |
sample_threat_btn = gr.Button("📝 Sample", elem_classes=["secondary-btn"], scale=1) | |
# Threat Input | |
gr.HTML('<div class="result-header">Threat Description</div>') | |
threat_input = gr.Textbox( | |
placeholder="Describe the security threat for AI analysis...", | |
lines=6, | |
elem_classes=["compact-input"], | |
interactive=True, | |
show_label=False | |
) | |
# Analysis Results | |
gr.HTML('<div class="result-header">AI Analysis & Recommendations</div>') | |
analysis_output = gr.Textbox( | |
lines=8, | |
elem_classes=["compact-output"], | |
interactive=False, | |
placeholder="GPT-OSS-20B analysis results will appear here...", | |
show_label=False | |
) | |
analysis_status = gr.Textbox( | |
label="Status", | |
elem_classes=["status-indicator", "status-success"], | |
interactive=False, | |
show_label=False | |
) | |
# Quick Info Footer | |
gr.HTML(""" | |
<div style="text-align: center; padding: 12px; color: rgba(255,255,255,0.8); font-size: 11px; margin-top: 10px;"> | |
<strong>Research Project:</strong> LLM-based SOC Assistant • <strong>Model:</strong> GPT-OSS-20B • <strong>Student:</strong> Abdullah Alanazi • <strong>Supervisor:</strong> Prof. Ali Shoker • <strong>Institution:</strong> KAUST | |
</div> | |
""") | |
# ================== EVENT HANDLERS ================== | |
# Detection handlers | |
detect_btn.click( | |
fn=detect_threats, | |
inputs=[log_input, detect_sensitivity], | |
outputs=[detection_output, detection_status] | |
) | |
sample_logs_btn.click( | |
fn=lambda: SAMPLE_LOGS, | |
outputs=[log_input] | |
) | |
# Assistant handlers | |
analyze_btn.click( | |
fn=analyze_threat, | |
inputs=[threat_input, analyst_level], | |
outputs=[analysis_output, analysis_status] | |
) | |
sample_threat_btn.click( | |
fn=lambda: SAMPLE_THREAT, | |
outputs=[threat_input] | |
) | |
# System initialization | |
demo.load( | |
fn=load_model, | |
outputs=[system_status] | |
) | |
if __name__ == "__main__": | |
demo.launch( | |
share=True, | |
server_name="0.0.0.0", | |
server_port=7860 | |
) |