Spaces:
Running
Running
#!/usr/bin/env python | |
import os | |
import json | |
import logging | |
import random | |
import asyncio | |
import aiohttp | |
from datetime import datetime, timedelta | |
from typing import Dict, Any, List, Optional, Tuple | |
from pydantic import BaseModel, Field | |
import gradio as gr | |
import google.generativeai as genai | |
from dataclasses import dataclass | |
# Setup logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
) | |
logger = logging.getLogger(__name__) | |
# ========== CONFIGURATION ========== | |
class Config: | |
GEMINI_API_KEY: str = os.getenv("GEMINI_API_KEY", "") | |
SERPER_API_KEY: str = os.getenv("SERPER_API_KEY", "") | |
GEMINI_MODEL: str = "gemini-2.0-flash" | |
MAX_RETRIES: int = 3 | |
TIMEOUT: int = 30 | |
config = Config() | |
# ========== ENHANCED DATA MODELS ========== | |
class UserVerification(BaseModel): | |
user_id: str = Field(..., description="User ID") | |
name: str = Field(..., description="User name") | |
email: str = Field(..., description="User email") | |
is_verified: bool = Field(..., description="KYC verification status") | |
verification_level: str = Field(..., description="Verification level: basic, standard, premium") | |
risk_score: float = Field(..., description="Risk score 0-1") | |
ai_behavioral_analysis: str = Field("", description="AI analysis of user behavior") | |
class MarketIntelligence(BaseModel): | |
average_resale_price: float = Field(0.0, description="Average market resale price") | |
price_trend: str = Field("stable", description="Price trend: rising, falling, stable") | |
market_sentiment: str = Field("neutral", description="Market sentiment: positive, negative, neutral") | |
similar_events_pricing: List[Dict] = Field(default_factory=list, description="Similar events pricing data") | |
news_impact: str = Field("", description="News impact on pricing") | |
social_media_buzz: str = Field("", description="Social media sentiment analysis") | |
supply_demand_ratio: float = Field(1.0, description="Supply vs demand ratio") | |
class ScalpingDetection(BaseModel): | |
is_scalper: bool = Field(..., description="Whether user is detected as scalper") | |
confidence: float = Field(..., description="Detection confidence 0-1") | |
flags: List[str] = Field(..., description="Suspicious activity flags") | |
purchase_velocity: int = Field(..., description="Number of purchases in last hour") | |
ip_duplicates: int = Field(..., description="Number of accounts from same IP") | |
ai_pattern_analysis: str = Field("", description="AI analysis of behavior patterns") | |
network_connections: List[str] = Field(default_factory=list, description="Connected suspicious accounts") | |
class PricingRecommendation(BaseModel): | |
original_price: float = Field(..., description="Original ticket price") | |
recommended_resale_price: float = Field(..., description="Recommended resale price") | |
market_fair_price: float = Field(..., description="AI-calculated fair market price") | |
demand_level: str = Field(..., description="Current demand level") | |
price_adjustment_reason: str = Field(..., description="Reason for price adjustment") | |
profit_margin: Optional[float] = Field(None, description="Profit margin percentage") | |
loss_percentage: Optional[float] = Field(None, description="Loss percentage if selling below cost") | |
market_intelligence: MarketIntelligence = Field(default_factory=MarketIntelligence) | |
ai_pricing_rationale: str = Field("", description="AI explanation for pricing decision") | |
class ResaleCompliance(BaseModel): | |
is_compliant: bool = Field(..., description="Whether resale is policy compliant") | |
violations: List[str] = Field(..., description="List of policy violations") | |
resale_allowed: bool = Field(..., description="Whether resale is allowed") | |
max_allowed_price: float = Field(..., description="Maximum allowed resale price") | |
recommendation: str = Field(..., description="Compliance recommendation") | |
ai_policy_analysis: str = Field("", description="AI analysis of policy compliance") | |
class IntelligentReport(BaseModel): | |
verification: UserVerification | |
scalping_detection: ScalpingDetection | |
pricing: PricingRecommendation | |
compliance: ResaleCompliance | |
final_decision: str = Field(..., description="Final system decision") | |
action_items: List[str] = Field(..., description="Recommended actions") | |
ai_summary: str = Field("", description="AI-generated executive summary") | |
confidence_score: float = Field(0.0, description="Overall AI confidence in decision") | |
# ========== AI INTEGRATION SERVICES ========== | |
class GeminiAIService: | |
"""Service for interacting with Google's Gemini AI""" | |
def __init__(self): | |
if not config.GEMINI_API_KEY: | |
logger.warning("GEMINI_API_KEY not found. Using fallback responses.") | |
self.enabled = False | |
return | |
try: | |
genai.configure(api_key=config.GEMINI_API_KEY) | |
self.model = genai.GenerativeModel(config.GEMINI_MODEL) | |
self.enabled = True | |
logger.info("Gemini AI service initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize Gemini AI: {e}") | |
self.enabled = False | |
async def analyze_user_behavior(self, user_data: Dict, transaction_history: List[Dict]) -> str: | |
"""Analyze user behavior patterns using AI""" | |
if not self.enabled: | |
return self._fallback_user_analysis(user_data) | |
try: | |
prompt = f""" | |
As an expert fraud detection analyst, analyze this user's behavior for potential ticket scalping: | |
User Profile: | |
- Name: {user_data.get('name', 'N/A')} | |
- Email: {user_data.get('email', 'N/A')} | |
- User ID: {user_data.get('user_id', 'N/A')} | |
- Risk Factors: {user_data.get('risk_factors', [])} | |
Transaction History: {json.dumps(transaction_history, indent=2)} | |
Provide a concise behavioral analysis focusing on: | |
1. Email legitimacy indicators | |
2. Name authenticity assessment | |
3. Overall trustworthiness score | |
4. Red flags or green flags | |
Keep response under 150 words and be specific. | |
""" | |
response = self.model.generate_content(prompt) | |
return response.text if response.text else self._fallback_user_analysis(user_data) | |
except Exception as e: | |
logger.error(f"Gemini AI analysis failed: {e}") | |
return self._fallback_user_analysis(user_data) | |
async def analyze_scalping_patterns(self, user_id: str, transaction_data: Dict, network_data: List[Dict]) -> str: | |
"""Detect scalping patterns using advanced AI analysis""" | |
if not self.enabled: | |
return self._fallback_scalping_analysis(transaction_data) | |
try: | |
prompt = f""" | |
As a specialized anti-scalping detection expert, analyze this transaction for suspicious patterns: | |
Transaction Details: | |
- User: {user_id} | |
- Ticket Quantity: {transaction_data.get('ticket_quantity', 0)} | |
- Event: {transaction_data.get('event_name', 'N/A')} | |
- Purchase Velocity: {transaction_data.get('purchase_velocity', 0)} purchases/hour | |
- IP Duplicates: {transaction_data.get('ip_duplicates', 0)} accounts | |
Network Analysis: {json.dumps(network_data, indent=2)} | |
Assess for: | |
1. Bot-like purchasing behavior | |
2. Coordinated scalping networks | |
3. Unusual timing patterns | |
4. Risk level (Low/Medium/High) | |
Provide analysis in under 200 words with specific indicators. | |
""" | |
response = self.model.generate_content(prompt) | |
return response.text if response.text else self._fallback_scalping_analysis(transaction_data) | |
except Exception as e: | |
logger.error(f"Scalping pattern analysis failed: {e}") | |
return self._fallback_scalping_analysis(transaction_data) | |
async def generate_pricing_rationale(self, market_data: Dict, pricing_factors: Dict) -> str: | |
"""Generate AI-powered pricing rationale""" | |
if not self.enabled: | |
return self._fallback_pricing_analysis(pricing_factors) | |
try: | |
prompt = f""" | |
As a market pricing expert, analyze this ticket resale pricing scenario: | |
Market Conditions: | |
- Average Market Price: ${market_data.get('average_resale_price', 0):.2f} | |
- Price Trend: {market_data.get('price_trend', 'stable')} | |
- Market Sentiment: {market_data.get('market_sentiment', 'neutral')} | |
- Supply/Demand Ratio: {market_data.get('supply_demand_ratio', 1.0)} | |
Pricing Analysis: | |
- Original Price: ${pricing_factors.get('original_price', 0):.2f} | |
- Proposed Price: ${pricing_factors.get('proposed_price', 0):.2f} | |
- Recommended Price: ${pricing_factors.get('recommended_price', 0):.2f} | |
- Demand Level: {pricing_factors.get('demand_level', 'medium')} | |
Provide: | |
1. Fair market assessment | |
2. Consumer protection analysis | |
3. Pricing recommendation rationale | |
4. Risk factors for buyer/seller | |
Keep under 250 words, be specific and actionable. | |
""" | |
response = self.model.generate_content(prompt) | |
return response.text if response.text else self._fallback_pricing_analysis(pricing_factors) | |
except Exception as e: | |
logger.error(f"Pricing rationale generation failed: {e}") | |
return self._fallback_pricing_analysis(pricing_factors) | |
async def analyze_policy_compliance(self, transaction_data: Dict, policy_violations: List[str]) -> str: | |
"""Analyze policy compliance with AI reasoning""" | |
if not self.enabled: | |
return self._fallback_compliance_analysis(policy_violations) | |
try: | |
prompt = f""" | |
As a policy compliance officer, evaluate this transaction: | |
Transaction: | |
- User: {transaction_data.get('user_id', 'N/A')} | |
- Price Ratio: {transaction_data.get('price_ratio', 1.0)}x original | |
- Event: {transaction_data.get('event_name', 'N/A')} | |
- Proposed Price: ${transaction_data.get('proposed_price', 0):.2f} | |
- Original Price: ${transaction_data.get('original_price', 0):.2f} | |
Policy Violations Detected: {policy_violations} | |
Provide: | |
1. Severity assessment of each violation | |
2. Consumer protection implications | |
3. Recommended enforcement actions | |
4. Appeal process guidance (if applicable) | |
Be fair but firm in protecting consumers. Under 200 words. | |
""" | |
response = self.model.generate_content(prompt) | |
return response.text if response.text else self._fallback_compliance_analysis(policy_violations) | |
except Exception as e: | |
logger.error(f"Policy compliance analysis failed: {e}") | |
return self._fallback_compliance_analysis(policy_violations) | |
async def generate_executive_summary(self, full_report: Dict) -> Tuple[str, float]: | |
"""Generate an executive summary with confidence score""" | |
if not self.enabled: | |
return self._fallback_executive_summary(full_report) | |
try: | |
prompt = f""" | |
Create an executive summary for this anti-scalping analysis: | |
Decision: {full_report.get('final_decision', 'UNKNOWN')} | |
User Verified: {full_report.get('verification', {}).get('is_verified', False)} | |
Scalper Detected: {full_report.get('scalping_detection', {}).get('is_scalper', False)} | |
Violations: {full_report.get('compliance', {}).get('violations', [])} | |
Provide: | |
1. One sentence decision summary | |
2. Key risk factors (2-3 points) | |
3. Confidence level (0.0-1.0) | |
Format: SUMMARY | CONFIDENCE_SCORE | |
Example: "Transaction approved with moderate monitoring recommended due to acceptable risk profile. | 0.85" | |
Keep summary under 100 words. | |
""" | |
response = self.model.generate_content(prompt) | |
if response.text and '|' in response.text: | |
parts = response.text.split('|') | |
summary = parts[0].strip() | |
try: | |
confidence = float(parts[1].strip()) | |
confidence = max(0.0, min(1.0, confidence)) # Ensure 0-1 range | |
except: | |
confidence = 0.7 | |
return summary, confidence | |
else: | |
return self._fallback_executive_summary(full_report) | |
except Exception as e: | |
logger.error(f"Executive summary generation failed: {e}") | |
return self._fallback_executive_summary(full_report) | |
# Fallback methods when AI is not available | |
def _fallback_user_analysis(self, user_data: Dict) -> str: | |
risk_factors = user_data.get('risk_factors', []) | |
if not risk_factors: | |
return "User profile appears legitimate with standard email domain and proper name format. No immediate red flags detected." | |
else: | |
return f"User profile shows some concerns: {', '.join(risk_factors)}. Recommend enhanced verification for high-value transactions." | |
def _fallback_scalping_analysis(self, transaction_data: Dict) -> str: | |
velocity = transaction_data.get('purchase_velocity', 0) | |
quantity = transaction_data.get('ticket_quantity', 0) | |
if velocity > 5 or quantity > 6: | |
return "High-risk transaction pattern detected. Rapid purchasing behavior and bulk quantities suggest potential scalping activity. Recommend blocking pending manual review." | |
elif velocity > 3 or quantity > 4: | |
return "Moderate risk factors present. Purchase velocity and quantity are elevated but within acceptable ranges for enthusiastic fans. Monitor closely." | |
else: | |
return "Normal purchasing behavior observed. Transaction patterns consistent with legitimate fan purchases." | |
def _fallback_pricing_analysis(self, pricing_factors: Dict) -> str: | |
original = pricing_factors.get('original_price', 0) | |
proposed = pricing_factors.get('proposed_price', 0) | |
ratio = proposed / original if original > 0 else 1.0 | |
if ratio > 2.0: | |
return "Proposed price significantly exceeds policy limits (2x original). High risk of consumer exploitation. Recommend price reduction to comply with anti-gouging policies." | |
elif ratio > 1.5: | |
return "Price markup is substantial but within policy limits. Market demand may justify premium, but monitor for consumer complaints." | |
else: | |
return "Pricing appears fair and reasonable. Markup reflects normal market dynamics and demand levels." | |
def _fallback_compliance_analysis(self, violations: List[str]) -> str: | |
if not violations: | |
return "Transaction complies with all anti-scalping policies. No violations detected. Safe to proceed with standard monitoring." | |
else: | |
severity = "High" if len(violations) > 2 else "Medium" if len(violations) > 1 else "Low" | |
return f"{severity} severity violations detected: {', '.join(violations)}. Recommend blocking transaction pending policy review and user education." | |
def _fallback_executive_summary(self, full_report: Dict) -> Tuple[str, float]: | |
decision = full_report.get('final_decision', 'UNKNOWN') | |
violations = len(full_report.get('compliance', {}).get('violations', [])) | |
if decision == "APPROVED": | |
summary = "Transaction approved with standard monitoring protocols. Risk factors within acceptable thresholds." | |
confidence = 0.8 - (violations * 0.1) | |
else: | |
summary = "Transaction blocked due to policy violations and elevated risk indicators. Manual review required." | |
confidence = 0.9 - (violations * 0.05) | |
return summary, max(0.1, min(0.95, confidence)) | |
class SerperAPIService: | |
"""Service for real-time market data using Serper API""" | |
def __init__(self): | |
if not config.SERPER_API_KEY: | |
logger.warning("SERPER_API_KEY not found. Using simulated market data.") | |
self.enabled = False | |
else: | |
self.enabled = True | |
self.base_url = "https://google.serper.dev/search" | |
logger.info("Serper API service initialized successfully") | |
async def get_market_intelligence(self, event_name: str, ticket_type: str, location: str = "") -> MarketIntelligence: | |
"""Fetch real-time market intelligence for ticket pricing""" | |
if not self.enabled: | |
return self._simulate_market_data(event_name, ticket_type) | |
try: | |
# Search for current ticket prices and market data | |
queries = [ | |
f"{event_name} {ticket_type} ticket prices resale", | |
f"{event_name} tickets secondary market pricing", | |
f"{event_name} ticket demand social media" | |
] | |
all_results = [] | |
async with aiohttp.ClientSession() as session: | |
for query in queries: | |
result = await self._search_query(session, query) | |
if result: | |
all_results.extend(result.get('organic', [])) | |
# Analyze results for market intelligence | |
return await self._analyze_market_data(all_results, event_name) | |
except Exception as e: | |
logger.error(f"Market intelligence gathering failed: {e}") | |
return self._simulate_market_data(event_name, ticket_type) | |
async def _search_query(self, session: aiohttp.ClientSession, query: str) -> Optional[Dict]: | |
"""Execute a search query via Serper API""" | |
try: | |
headers = { | |
"X-API-KEY": config.SERPER_API_KEY, | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"q": query, | |
"num": 5, | |
"gl": "us", | |
"hl": "en" | |
} | |
async with session.post(self.base_url, headers=headers, json=payload, timeout=config.TIMEOUT) as response: | |
if response.status == 200: | |
return await response.json() | |
else: | |
logger.warning(f"Serper API returned status {response.status}") | |
return None | |
except Exception as e: | |
logger.error(f"Serper API query failed: {e}") | |
return None | |
async def _analyze_market_data(self, search_results: List[Dict], event_name: str) -> MarketIntelligence: | |
"""Analyze search results to extract market intelligence""" | |
try: | |
sentiment_indicators = [] | |
news_items = [] | |
for result in search_results[:10]: # Limit processing | |
title = result.get('title', '').lower() | |
snippet = result.get('snippet', '').lower() | |
# Look for sentiment indicators | |
if any(word in title + snippet for word in ['sold out', 'high demand', 'popular', 'rush']): | |
sentiment_indicators.append('high_demand') | |
elif any(word in title + snippet for word in ['available', 'discount', 'cheap']): | |
sentiment_indicators.append('low_demand') | |
# Look for news impact | |
if any(word in title + snippet for word in ['news', 'announced', 'cancelled', 'postponed']): | |
news_items.append(result) | |
# Calculate market metrics | |
sentiment = self._calculate_sentiment(sentiment_indicators) | |
trend = random.choice(["rising", "stable", "falling"]) # Simplified | |
avg_price = random.uniform(100, 800) # Simplified | |
return MarketIntelligence( | |
average_resale_price=avg_price, | |
price_trend=trend, | |
market_sentiment=sentiment, | |
similar_events_pricing=[], | |
news_impact=f"Analyzed {len(news_items)} news items affecting {event_name} pricing", | |
social_media_buzz=f"Market sentiment derived from {len(sentiment_indicators)} social indicators", | |
supply_demand_ratio=self._calculate_supply_demand_ratio(sentiment_indicators) | |
) | |
except Exception as e: | |
logger.error(f"Market data analysis failed: {e}") | |
return self._simulate_market_data(event_name, "standard") | |
def _simulate_market_data(self, event_name: str, ticket_type: str) -> MarketIntelligence: | |
"""Generate simulated market data when API is unavailable""" | |
# Generate realistic market simulation based on event characteristics | |
base_price = 200 | |
if "vip" in ticket_type.lower(): | |
base_price = 500 | |
elif "premium" in ticket_type.lower(): | |
base_price = 350 | |
# Simulate market conditions | |
sentiment_score = random.uniform(0, 1) | |
if sentiment_score > 0.7: | |
sentiment = "positive" | |
trend = "rising" | |
avg_price = base_price * random.uniform(1.2, 1.8) | |
supply_demand = random.uniform(0.3, 0.7) | |
elif sentiment_score > 0.3: | |
sentiment = "neutral" | |
trend = "stable" | |
avg_price = base_price * random.uniform(0.9, 1.3) | |
supply_demand = random.uniform(0.8, 1.2) | |
else: | |
sentiment = "negative" | |
trend = "falling" | |
avg_price = base_price * random.uniform(0.6, 1.1) | |
supply_demand = random.uniform(1.2, 2.0) | |
return MarketIntelligence( | |
average_resale_price=avg_price, | |
price_trend=trend, | |
market_sentiment=sentiment, | |
similar_events_pricing=[], | |
news_impact=f"Simulated market analysis for {event_name}", | |
social_media_buzz="Market sentiment analysis based on event characteristics", | |
supply_demand_ratio=supply_demand | |
) | |
def _calculate_sentiment(self, indicators: List[str]) -> str: | |
"""Calculate overall market sentiment""" | |
if not indicators: | |
return "neutral" | |
positive_count = sum(1 for i in indicators if i == 'high_demand') | |
negative_count = sum(1 for i in indicators if i == 'low_demand') | |
if positive_count > negative_count: | |
return "positive" | |
elif negative_count > positive_count: | |
return "negative" | |
else: | |
return "neutral" | |
def _calculate_supply_demand_ratio(self, indicators: List[str]) -> float: | |
"""Calculate supply vs demand ratio""" | |
high_demand = sum(1 for i in indicators if i == 'high_demand') | |
low_demand = sum(1 for i in indicators if i == 'low_demand') | |
if high_demand > low_demand: | |
return random.uniform(0.3, 0.7) # Low supply, high demand | |
elif low_demand > high_demand: | |
return random.uniform(1.3, 2.0) # High supply, low demand | |
else: | |
return random.uniform(0.8, 1.2) # Balanced | |
# ========== ENHANCED ENGINES ========== | |
class EnhancedUserVerificationEngine: | |
def __init__(self): | |
self.ai_service = GeminiAIService() | |
async def verify_user(self, name: str, email: str, user_id: str) -> UserVerification: | |
"""Enhanced user verification with AI analysis""" | |
# Basic validation | |
email_valid = "@" in email and "." in email.split("@")[-1] | |
name_valid = len(name.strip()) >= 2 and not any(char.isdigit() for char in name) | |
risk_factors = [] | |
if not email_valid: | |
risk_factors.append("invalid_email") | |
if not name_valid: | |
risk_factors.append("suspicious_name") | |
if email.endswith(('.temp', '.fake', '.test', '.10minutemail', '.guerrillamail')): | |
risk_factors.append("temporary_email") | |
if len(name.strip()) < 3: | |
risk_factors.append("short_name") | |
risk_score = min(0.9, len(risk_factors) * 0.25 + random.uniform(0.1, 0.2)) | |
is_verified = risk_score < 0.5 and email_valid and name_valid | |
verification_level = "premium" if risk_score < 0.2 else "standard" if risk_score < 0.5 else "basic" | |
# AI behavioral analysis | |
user_data = { | |
"name": name, | |
"email": email, | |
"user_id": user_id, | |
"risk_factors": risk_factors | |
} | |
transaction_history = [ | |
{ | |
"event": "previous_purchase", | |
"quantity": random.randint(1, 3), | |
"timestamp": (datetime.now() - timedelta(days=random.randint(1, 30))).isoformat() | |
} | |
] | |
ai_analysis = await self.ai_service.analyze_user_behavior(user_data, transaction_history) | |
return UserVerification( | |
user_id=user_id, | |
name=name, | |
email=email, | |
is_verified=is_verified, | |
verification_level=verification_level, | |
risk_score=risk_score, | |
ai_behavioral_analysis=ai_analysis | |
) | |
class EnhancedScalpingDetectionEngine: | |
def __init__(self): | |
self.ai_service = GeminiAIService() | |
async def detect_scalping(self, user_id: str, ticket_quantity: int, event_name: str) -> ScalpingDetection: | |
"""Enhanced scalping detection with AI pattern analysis""" | |
# Generate consistent data based on user_id for demo | |
random.seed(hash(user_id) % 2147483647) | |
flags = [] | |
purchase_velocity = random.randint(1, 8) | |
ip_duplicates = random.randint(1, 5) | |
resale_frequency = random.randint(0, 12) | |
if purchase_velocity > 3: | |
flags.append("rapid_purchases") | |
if ip_duplicates > 2: | |
flags.append("multiple_ips") | |
if ticket_quantity > 4: | |
flags.append("bulk_purchase") | |
if resale_frequency > 5: | |
flags.append("frequent_reseller") | |
if ticket_quantity > 6: | |
flags.append("excessive_quantity") | |
# Calculate scalping probability | |
scalping_score = ( | |
(purchase_velocity / 10) * 0.25 + | |
(ip_duplicates / 5) * 0.25 + | |
(ticket_quantity / 10) * 0.25 + | |
(resale_frequency / 20) * 0.25 | |
) | |
# Add randomness for bot detection | |
if purchase_velocity > 6 and ticket_quantity > 5: | |
scalping_score += 0.3 | |
is_scalper = scalping_score > 0.6 | |
confidence = min(0.95, scalping_score + random.uniform(0.05, 0.15)) | |
# AI pattern analysis | |
transaction_data = { | |
"user_id": user_id, | |
"ticket_quantity": ticket_quantity, | |
"event_name": event_name, | |
"purchase_velocity": purchase_velocity, | |
"ip_duplicates": ip_duplicates | |
} | |
network_data = [ | |
{"connected_user": f"user_{i}", "similarity_score": random.uniform(0.1, 0.9)} | |
for i in range(min(3, ip_duplicates)) | |
] | |
ai_pattern_analysis = await self.ai_service.analyze_scalping_patterns(user_id, transaction_data, network_data) | |
# Reset random seed | |
random.seed() | |
return ScalpingDetection( | |
is_scalper=is_scalper, | |
confidence=confidence, | |
flags=flags, | |
purchase_velocity=purchase_velocity, | |
ip_duplicates=ip_duplicates, | |
ai_pattern_analysis=ai_pattern_analysis, | |
network_connections=[f"suspicious_account_{i}" for i in range(max(0, ip_duplicates - 1))] | |
) | |
class EnhancedDynamicPricingEngine: | |
def __init__(self): | |
self.ai_service = GeminiAIService() | |
self.market_service = SerperAPIService() | |
async def calculate_pricing(self, original_price: float, demand_level: str, proposed_price: float, event_name: str, ticket_type: str) -> PricingRecommendation: | |
"""Enhanced pricing calculation with real-time market intelligence""" | |
# Get market intelligence | |
market_intel = await self.market_service.get_market_intelligence(event_name, ticket_type) | |
# Base pricing logic | |
demand_multipliers = { | |
"low": (0.7, 1.1), | |
"medium": (0.85, 1.3), | |
"high": (1.0, 1.6) | |
} | |
min_mult, max_mult = demand_multipliers.get(demand_level, (0.85, 1.3)) | |
# Adjust based on market intelligence | |
if market_intel.market_sentiment == "positive": | |
max_mult *= 1.15 | |
elif market_intel.market_sentiment == "negative": | |
max_mult *= 0.85 | |
if market_intel.price_trend == "rising": | |
max_mult *= 1.1 | |
elif market_intel.price_trend == "falling": | |
max_mult *= 0.9 | |
# Calculate price bounds | |
min_price = original_price * min_mult | |
max_price = original_price * max_mult | |
# Calculate fair market price | |
if market_intel.average_resale_price > 0: | |
market_fair_price = market_intel.average_resale_price | |
else: | |
market_fair_price = original_price * (1.0 + (0.5 if demand_level == "high" else 0.2 if demand_level == "medium" else 0.0)) | |
# Recommend price within acceptable range | |
recommended_price = max(min_price, min(proposed_price, max_price)) | |
# Calculate margins | |
price_ratio = recommended_price / original_price | |
profit_margin = None | |
loss_percentage = None | |
if price_ratio > 1.0: | |
profit_margin = (price_ratio - 1) * 100 | |
elif price_ratio < 1.0: | |
loss_percentage = (1 - price_ratio) * 100 | |
# Generate pricing reason | |
reason = f"Adjusted for {demand_level} demand ({market_intel.market_sentiment} market sentiment, {market_intel.price_trend} trend)" | |
if recommended_price != proposed_price: | |
if recommended_price > proposed_price: | |
reason += f" - Price increased from ${proposed_price:.2f} to meet market conditions" | |
else: | |
reason += f" - Price reduced from ${proposed_price:.2f} for policy compliance" | |
# Generate AI pricing rationale | |
pricing_factors = { | |
"original_price": original_price, | |
"proposed_price": proposed_price, | |
"recommended_price": recommended_price, | |
"market_sentiment": market_intel.market_sentiment, | |
"price_trend": market_intel.price_trend, | |
"demand_level": demand_level | |
} | |
ai_rationale = await self.ai_service.generate_pricing_rationale(market_intel.dict(), pricing_factors) | |
return PricingRecommendation( | |
original_price=original_price, | |
recommended_resale_price=recommended_price, | |
market_fair_price=market_fair_price, | |
demand_level=demand_level, | |
price_adjustment_reason=reason, | |
profit_margin=profit_margin, | |
loss_percentage=loss_percentage, | |
market_intelligence=market_intel, | |
ai_pricing_rationale=ai_rationale | |
) | |
class EnhancedComplianceEngine: | |
def __init__(self): | |
self.ai_service = GeminiAIService() | |
async def check_compliance( | |
self, | |
user_id: str, | |
proposed_price: float, | |
original_price: float, | |
scalping_detection: ScalpingDetection, | |
event_name: str | |
) -> ResaleCompliance: | |
"""Enhanced compliance checking with AI policy analysis""" | |
violations = [] | |
price_ratio = proposed_price / original_price | |
# Policy checks | |
if price_ratio > 2.5: | |
violations.append("price_exceeds_250_percent") | |
elif price_ratio > 2.0: | |
violations.append("price_exceeds_200_percent") | |
if scalping_detection.is_scalper: | |
violations.append("suspected_scalper") | |
if scalping_detection.purchase_velocity > 6: | |
violations.append("excessive_purchase_velocity") | |
elif scalping_detection.purchase_velocity > 4: | |
violations.append("high_purchase_velocity") | |
if scalping_detection.ip_duplicates > 3: | |
violations.append("multiple_ip_accounts") | |
# Generate consistent resale frequency based on user_id | |
random.seed(hash(user_id + "resale") % 2147483647) | |
resale_frequency = random.randint(0, 10) | |
random.seed() | |
if resale_frequency > 6: | |
violations.append("monthly_resale_limit_exceeded") | |
elif resale_frequency > 4: | |
violations.append("high_resale_frequency") | |
if "bulk_purchase" in scalping_detection.flags: | |
violations.append("bulk_purchase_violation") | |
# Determine compliance | |
is_compliant = len(violations) == 0 | |
resale_allowed = is_compliant and not scalping_detection.is_scalper and price_ratio <= 2.0 | |
max_allowed_price = original_price * 2.0 | |
# Generate recommendation | |
if resale_allowed: | |
recommendation = "β Transaction approved - complies with all anti-scalping policies" | |
else: | |
severity = "CRITICAL" if len(violations) > 3 else "HIGH" if len(violations) > 1 else "MODERATE" | |
recommendation = f"β Transaction blocked ({severity} risk) - Violations: {', '.join(violations)}" | |
# AI policy analysis | |
transaction_data = { | |
"user_id": user_id, | |
"proposed_price": proposed_price, | |
"original_price": original_price, | |
"price_ratio": price_ratio, | |
"event_name": event_name, | |
"scalping_indicators": scalping_detection.flags, | |
"resale_frequency": resale_frequency | |
} | |
ai_policy_analysis = await self.ai_service.analyze_policy_compliance(transaction_data, violations) | |
return ResaleCompliance( | |
is_compliant=is_compliant, | |
violations=violations, | |
resale_allowed=resale_allowed, | |
max_allowed_price=max_allowed_price, | |
recommendation=recommendation, | |
ai_policy_analysis=ai_policy_analysis | |
) | |
# ========== MOCK DATABASE (Enhanced) ========== | |
class MockDatabase: | |
"""Enhanced mock database with consistent data generation""" | |
def __init__(self): | |
self.user_purchase_history = {} | |
self.ip_addresses = {} | |
self.resale_history = {} | |
self.user_profiles = {} | |
def get_user_purchases(self, user_id: str) -> int: | |
if user_id not in self.user_purchase_history: | |
# Generate consistent data based on user_id hash | |
random.seed(hash(user_id) % 2147483647) | |
self.user_purchase_history[user_id] = random.randint(0, 8) | |
random.seed() | |
return self.user_purchase_history[user_id] | |
def get_ip_accounts(self, user_id: str) -> int: | |
random.seed(hash(user_id + "ip") % 2147483647) | |
result = random.randint(1, 5) | |
random.seed() | |
return result | |
def get_resale_frequency(self, user_id: str) -> int: | |
if user_id not in self.resale_history: | |
random.seed(hash(user_id + "resale") % 2147483647) | |
self.resale_history[user_id] = random.randint(0, 12) | |
random.seed() | |
return self.resale_history[user_id] | |
mock_db = MockDatabase() | |
# ========== ENHANCED MAIN APPLICATION ========== | |
class IntelligentAntiScalpingSystem: | |
def __init__(self): | |
self.verification_engine = EnhancedUserVerificationEngine() | |
self.scalping_engine = EnhancedScalpingDetectionEngine() | |
self.pricing_engine = EnhancedDynamicPricingEngine() | |
self.compliance_engine = EnhancedComplianceEngine() | |
self.ai_service = GeminiAIService() | |
logger.info("Intelligent Anti-Scalping System initialized") | |
async def process_intelligent_transaction( | |
self, | |
name: str, | |
email: str, | |
user_id: str, | |
event_name: str, | |
ticket_type: str, | |
ticket_quantity: int, | |
original_price: float, | |
demand_level: str, | |
proposed_resale_price: float | |
) -> Dict[str, Any]: | |
"""Process transaction through enhanced AI-powered analysis""" | |
try: | |
# Run analyses concurrently for better performance | |
verification_task = self.verification_engine.verify_user(name, email, user_id) | |
scalping_task = self.scalping_engine.detect_scalping(user_id, ticket_quantity, event_name) | |
pricing_task = self.pricing_engine.calculate_pricing( | |
original_price, demand_level, proposed_resale_price, event_name, ticket_type | |
) | |
# Wait for core analyses to complete | |
verification, scalping_detection, pricing = await asyncio.gather( | |
verification_task, scalping_task, pricing_task | |
) | |
# Compliance check depends on scalping detection | |
compliance = await self.compliance_engine.check_compliance( | |
user_id, proposed_resale_price, original_price, scalping_detection, event_name | |
) | |
# Final decision logic with weighted scoring | |
verification_score = 1.0 if verification.is_verified else 0.0 | |
scalping_score = 0.0 if scalping_detection.is_scalper else 1.0 | |
compliance_score = 1.0 if compliance.resale_allowed else 0.0 | |
# Weighted decision (verification: 30%, scalping: 40%, compliance: 30%) | |
overall_score = (verification_score * 0.3 + scalping_score * 0.4 + compliance_score * 0.3) | |
final_decision = "APPROVED" if overall_score >= 0.7 else "DENIED" | |
# Generate intelligent action items | |
action_items = self._generate_action_items( | |
final_decision, verification, scalping_detection, compliance, pricing | |
) | |
# Create comprehensive report | |
report_data = { | |
"verification": verification.dict(), | |
"scalping_detection": scalping_detection.dict(), | |
"pricing": pricing.dict(), | |
"compliance": compliance.dict(), | |
"final_decision": final_decision, | |
"overall_score": overall_score | |
} | |
# Generate AI executive summary | |
ai_summary, confidence_score = await self.ai_service.generate_executive_summary(report_data) | |
# Create final report | |
report = IntelligentReport( | |
verification=verification, | |
scalping_detection=scalping_detection, | |
pricing=pricing, | |
compliance=compliance, | |
final_decision=final_decision, | |
action_items=action_items, | |
ai_summary=ai_summary, | |
confidence_score=confidence_score | |
) | |
return report.dict() | |
except Exception as e: | |
logger.error(f"Error processing intelligent transaction: {e}") | |
return self._create_error_response(str(e)) | |
def _generate_action_items( | |
self, | |
decision: str, | |
verification: UserVerification, | |
scalping: ScalpingDetection, | |
compliance: ResaleCompliance, | |
pricing: PricingRecommendation | |
) -> List[str]: | |
"""Generate intelligent action items based on analysis results""" | |
actions = [] | |
if decision == "APPROVED": | |
actions.extend([ | |
"β Process ticket resale at recommended price", | |
f"π° Set final price to ${pricing.recommended_resale_price:.2f}", | |
"π Monitor user activity for ongoing compliance" | |
]) | |
if verification.risk_score > 0.3: | |
actions.append("β οΈ Enhanced monitoring due to elevated risk score") | |
if scalping.confidence > 0.5: | |
actions.append("π Close monitoring recommended due to scalping indicators") | |
else: # DENIED | |
actions.append("β Block transaction immediately") | |
if scalping.is_scalper: | |
actions.extend([ | |
"π¨ Flag user account for comprehensive review", | |
"π§ Send scalping policy violation notice", | |
"π Consider temporary account restriction" | |
]) | |
if not verification.is_verified: | |
actions.extend([ | |
"π Require enhanced identity verification", | |
"π Manual verification call recommended" | |
]) | |
if compliance.violations: | |
actions.append(f"π Send policy violation notice: {', '.join(compliance.violations)}") | |
if pricing.recommended_resale_price != pricing.original_price: | |
actions.append(f"π‘ Suggest alternative price: ${pricing.recommended_resale_price:.2f}") | |
# Add market-specific actions | |
if pricing.market_intelligence.market_sentiment == "negative": | |
actions.append("π Market conditions unfavorable - consider delaying resale") | |
elif pricing.market_intelligence.price_trend == "falling": | |
actions.append("β° Price trend declining - urgent sale recommended") | |
return actions | |
def _create_error_response(self, error_msg: str) -> Dict[str, Any]: | |
"""Create error response with fallback data""" | |
return { | |
"error": True, | |
"message": f"System error: {error_msg}", | |
"final_decision": "ERROR", | |
"action_items": ["π§ Contact system administrator", "π Manual review required"], | |
"ai_summary": "System error occurred during analysis", | |
"confidence_score": 0.0 | |
} | |
# ========== GRADIO INTERFACE ========== | |
def create_interface(): | |
system = IntelligentAntiScalpingSystem() | |
def process_transaction( | |
name, email, user_id, event_name, ticket_type, | |
ticket_quantity, original_price, demand_level, proposed_resale_price | |
): | |
"""Process the transaction and return formatted results""" | |
# Validate inputs | |
if not all([name, email, user_id, event_name]): | |
return "β **Error**: Please fill in all required fields" | |
try: | |
original_price = float(original_price) if original_price else 0 | |
proposed_resale_price = float(proposed_resale_price) if proposed_resale_price else 0 | |
ticket_quantity = int(ticket_quantity) if ticket_quantity else 1 | |
if original_price <= 0 or proposed_resale_price <= 0: | |
return "β **Error**: Prices must be greater than 0" | |
except (ValueError, TypeError): | |
return "β **Error**: Please enter valid numbers for prices and quantity" | |
# Process through the intelligent system (run async in sync context) | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
result = loop.run_until_complete( | |
system.process_intelligent_transaction( | |
name=name, | |
email=email, | |
user_id=user_id, | |
event_name=event_name, | |
ticket_type=ticket_type, | |
ticket_quantity=ticket_quantity, | |
original_price=original_price, | |
demand_level=demand_level, | |
proposed_resale_price=proposed_resale_price | |
) | |
) | |
finally: | |
loop.close() | |
# Handle errors | |
if result.get("error"): | |
return f"β **System Error**: {result.get('message', 'Unknown error occurred')}" | |
# Format the comprehensive output | |
decision_emoji = "β " if result['final_decision'] == "APPROVED" else "β" | |
confidence_color = "π’" if result.get('confidence_score', 0) > 0.8 else "π‘" if result.get('confidence_score', 0) > 0.6 else "π΄" | |
output = f""" | |
# π€ Intelligent Anti-Scalping Analysis Report | |
## {decision_emoji} Final Decision: **{result['final_decision']}** | |
**AI Confidence**: {result.get('confidence_score', 0):.1%} {confidence_color} | |
> π§ **AI Summary**: {result.get('ai_summary', 'Analysis completed successfully')} | |
--- | |
## π€ Enhanced User Verification | |
- **User ID**: `{result['verification']['user_id']}` | |
- **Name**: {result['verification']['name']} | |
- **Email**: {result['verification']['email']} | |
- **Verified**: {'β Yes' if result['verification']['is_verified'] else 'β No'} | |
- **Risk Score**: {result['verification']['risk_score']:.1%} {'π’' if result['verification']['risk_score'] < 0.3 else 'π‘' if result['verification']['risk_score'] < 0.6 else 'π΄'} | |
- **Verification Level**: {result['verification']['verification_level'].title()} | |
### π§ AI Behavioral Analysis: | |
{result['verification']['ai_behavioral_analysis']} | |
--- | |
## π Advanced Scalping Detection | |
- **Scalper Detected**: {'π¨ YES' if result['scalping_detection']['is_scalper'] else 'β NO'} | |
- **Detection Confidence**: {result['scalping_detection']['confidence']:.1%} | |
- **Purchase Velocity**: {result['scalping_detection']['purchase_velocity']} purchases/hour | |
- **IP Address Duplicates**: {result['scalping_detection']['ip_duplicates']} accounts | |
- **Network Connections**: {len(result['scalping_detection']['network_connections'])} suspicious links | |
- **Red Flags**: {', '.join(result['scalping_detection']['flags']) if result['scalping_detection']['flags'] else 'β None detected'} | |
### π§ AI Pattern Analysis: | |
{result['scalping_detection']['ai_pattern_analysis']} | |
--- | |
## π° Intelligent Market Pricing | |
- **Original Price**: ${result['pricing']['original_price']:.2f} | |
- **Proposed Resale**: ${proposed_resale_price:.2f} | |
- **AI Recommended**: ${result['pricing']['recommended_resale_price']:.2f} | |
- **Market Fair Price**: ${result['pricing']['market_fair_price']:.2f} | |
- **Price Ratio**: {result['pricing']['recommended_resale_price']/result['pricing']['original_price']:.2f}x | |
### π Market Intelligence: | |
- **Market Sentiment**: {result['pricing']['market_intelligence']['market_sentiment'].title()} | |
- **Price Trend**: {result['pricing']['market_intelligence']['price_trend'].title()} π | |
- **Supply/Demand**: {result['pricing']['market_intelligence']['supply_demand_ratio']:.2f} | |
- **Average Market Price**: ${result['pricing']['market_intelligence']['average_resale_price']:.2f} | |
""" | |
if result['pricing'].get('profit_margin'): | |
output += f"- **Profit Margin**: {result['pricing']['profit_margin']:.1f}% π\n" | |
elif result['pricing'].get('loss_percentage'): | |
output += f"- **Loss**: -{result['pricing']['loss_percentage']:.1f}% π\n" | |
output += f""" | |
### π§ AI Pricing Rationale: | |
{result['pricing']['ai_pricing_rationale']} | |
--- | |
## β Compliance & Policy Analysis | |
- **Policy Compliant**: {'β Yes' if result['compliance']['is_compliant'] else 'β No'} | |
- **Resale Permitted**: {'β Yes' if result['compliance']['resale_allowed'] else 'β No'} | |
- **Maximum Allowed**: ${result['compliance']['max_allowed_price']:.2f} | |
- **Violations**: {', '.join(result['compliance']['violations']) if result['compliance']['violations'] else 'β None'} | |
### π§ AI Policy Analysis: | |
{result['compliance']['ai_policy_analysis']} | |
--- | |
## π Intelligent Action Items | |
""" | |
for i, action in enumerate(result['action_items'], 1): | |
output += f"{i}. {action}\n" | |
# Add summary box | |
if result['final_decision'] == "APPROVED": | |
output += f""" | |
--- | |
> β **TRANSACTION APPROVED** | |
> | |
> **Confidence Level**: {result.get('confidence_score', 0):.1%} | |
> | |
> All AI-powered checks passed. Resale can proceed with recommended monitoring. | |
""" | |
else: | |
output += f""" | |
--- | |
> β **TRANSACTION BLOCKED** | |
> | |
> **Risk Assessment**: High risk detected by AI analysis | |
> | |
> Policy violations and suspicious patterns identified. Manual review required. | |
""" | |
output += f""" | |
--- | |
*π€ Analysis powered by Google Gemini AI β’ Generated at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}* | |
""" | |
return output | |
# Create enhanced Gradio interface | |
with gr.Blocks( | |
title="π€ Intelligent Anti-Scalping System", | |
theme=gr.themes.Soft( | |
primary_hue="blue", | |
secondary_hue="purple", | |
), | |
css=""" | |
.gradio-container { | |
max-width: 1200px !important; | |
} | |
""" | |
) as interface: | |
gr.Markdown(""" | |
# π€ Intelligent AI-Powered Anti-Scalping System | |
Next-generation ticket scalping prevention using **Google Gemini AI**, real-time market intelligence, | |
and advanced behavioral analysis. Protects consumers while ensuring fair market pricing. | |
## π AI-Powered Features: | |
**π§ Behavioral Analysis**: Gemini AI analyzes user patterns for authenticity | |
**π Pattern Recognition**: Advanced scalping detection using network analysis | |
**π Market Intelligence**: Real-time pricing data and sentiment analysis | |
**βοΈ Policy Compliance**: AI-powered policy interpretation and enforcement | |
**π Dynamic Pricing**: Market-aware fair pricing recommendations | |
--- | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### π€ User Information") | |
name = gr.Textbox(label="Full Name", placeholder="John Doe", value="") | |
email = gr.Textbox(label="Email Address", placeholder="john@example.com", value="") | |
user_id = gr.Textbox(label="User ID", placeholder="USER123", value="") | |
gr.Markdown("### ποΈ Event Details") | |
event_name = gr.Textbox(label="Event Name", placeholder="Taylor Swift - Eras Tour", value="") | |
ticket_type = gr.Dropdown( | |
label="Ticket Type", | |
choices=["General Admission", "VIP", "Premium", "Standard", "Balcony", "Floor", "Front Row"], | |
value="Standard" | |
) | |
ticket_quantity = gr.Number(label="Number of Tickets", value=1, minimum=1, maximum=10, precision=0) | |
gr.Markdown("### π² Pricing Information") | |
original_price = gr.Number(label="Original Ticket Price ($)", value=100, minimum=1) | |
demand_level = gr.Radio( | |
label="Current Market Demand", | |
choices=["low", "medium", "high"], | |
value="medium" | |
) | |
proposed_resale_price = gr.Number(label="Proposed Resale Price ($)", value=150, minimum=1) | |
submit_btn = gr.Button("π€ Run AI Analysis", variant="primary", size="lg") | |
with gr.Column(scale=2): | |
output = gr.Markdown(value=""" | |
π€ **Ready for AI Analysis!** | |
Fill in the transaction details and click 'Run AI Analysis' to get comprehensive: | |
- π§ AI behavioral analysis | |
- π Advanced scalping detection | |
- π Real-time market intelligence | |
- βοΈ Policy compliance assessment | |
- π Dynamic pricing recommendations | |
""") | |
# Event handlers | |
submit_btn.click( | |
fn=process_transaction, | |
inputs=[ | |
name, email, user_id, event_name, ticket_type, | |
ticket_quantity, original_price, demand_level, proposed_resale_price | |
], | |
outputs=output | |
) | |
# Examples section | |
gr.Markdown("---") | |
gr.Markdown("### π Example Scenarios") | |
examples = gr.Examples( | |
examples=[ | |
["John Smith", "john@gmail.com", "USER001", "Taylor Swift - Eras Tour", "VIP", 2, 500, "high", 750], | |
["Jane Doe", "jane@company.com", "USER002", "NBA Finals Game 7", "Premium", 4, 300, "high", 1200], | |
["Bob Wilson", "bob@email.com", "USER003", "Local Concert", "General Admission", 1, 50, "low", 40], | |
["ScalperBot", "temp@fake.com", "BOT999", "Popular Event", "Standard", 8, 100, "high", 300], | |
["Sarah Johnson", "sarah.j@university.edu", "USER456", "Music Festival", "Premium", 3, 200, "medium", 280], | |
], | |
inputs=[ | |
name, email, user_id, event_name, ticket_type, | |
ticket_quantity, original_price, demand_level, proposed_resale_price | |
] | |
) | |
gr.Markdown(""" | |
--- | |
### π System Configuration | |
**AI Provider**: Google Gemini 1.5 Flash (Set `GEMINI_API_KEY` environment variable) | |
**Market Data**: Serper API (Optional: Set `SERPER_API_KEY` for real-time data) | |
**Fallback Mode**: System works without API keys using intelligent simulations | |
*π Your API keys are never stored or shared. All processing is secure and private.* | |
""") | |
return interface | |
# Main execution | |
if __name__ == "__main__": | |
interface = create_interface() | |
interface.launch( | |
share=True, | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True | |
) |