#!/usr/bin/env python import os import json import logging import random import asyncio import aiohttp from datetime import datetime, timedelta from typing import Dict, Any, List, Optional, Tuple from pydantic import BaseModel, Field import gradio as gr import google.generativeai as genai from dataclasses import dataclass # Setup logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger(__name__) # ========== CONFIGURATION ========== @dataclass class Config: GEMINI_API_KEY: str = os.getenv("GEMINI_API_KEY", "") SERPER_API_KEY: str = os.getenv("SERPER_API_KEY", "") GEMINI_MODEL: str = "gemini-2.0-flash" MAX_RETRIES: int = 3 TIMEOUT: int = 30 config = Config() # ========== ENHANCED DATA MODELS ========== class UserVerification(BaseModel): user_id: str = Field(..., description="User ID") name: str = Field(..., description="User name") email: str = Field(..., description="User email") is_verified: bool = Field(..., description="KYC verification status") verification_level: str = Field(..., description="Verification level: basic, standard, premium") risk_score: float = Field(..., description="Risk score 0-1") ai_behavioral_analysis: str = Field("", description="AI analysis of user behavior") class MarketIntelligence(BaseModel): average_resale_price: float = Field(0.0, description="Average market resale price") price_trend: str = Field("stable", description="Price trend: rising, falling, stable") market_sentiment: str = Field("neutral", description="Market sentiment: positive, negative, neutral") similar_events_pricing: List[Dict] = Field(default_factory=list, description="Similar events pricing data") news_impact: str = Field("", description="News impact on pricing") social_media_buzz: str = Field("", description="Social media sentiment analysis") supply_demand_ratio: float = Field(1.0, description="Supply vs demand ratio") class ScalpingDetection(BaseModel): is_scalper: bool = Field(..., description="Whether user is detected as scalper") confidence: float = Field(..., description="Detection confidence 0-1") flags: List[str] = Field(..., description="Suspicious activity flags") purchase_velocity: int = Field(..., description="Number of purchases in last hour") ip_duplicates: int = Field(..., description="Number of accounts from same IP") ai_pattern_analysis: str = Field("", description="AI analysis of behavior patterns") network_connections: List[str] = Field(default_factory=list, description="Connected suspicious accounts") class PricingRecommendation(BaseModel): original_price: float = Field(..., description="Original ticket price") recommended_resale_price: float = Field(..., description="Recommended resale price") market_fair_price: float = Field(..., description="AI-calculated fair market price") demand_level: str = Field(..., description="Current demand level") price_adjustment_reason: str = Field(..., description="Reason for price adjustment") profit_margin: Optional[float] = Field(None, description="Profit margin percentage") loss_percentage: Optional[float] = Field(None, description="Loss percentage if selling below cost") market_intelligence: MarketIntelligence = Field(default_factory=MarketIntelligence) ai_pricing_rationale: str = Field("", description="AI explanation for pricing decision") class ResaleCompliance(BaseModel): is_compliant: bool = Field(..., description="Whether resale is policy compliant") violations: List[str] = Field(..., description="List of policy violations") resale_allowed: bool = Field(..., description="Whether resale is allowed") max_allowed_price: float = Field(..., description="Maximum allowed resale price") recommendation: str = Field(..., description="Compliance recommendation") ai_policy_analysis: str = Field("", description="AI analysis of policy compliance") class IntelligentReport(BaseModel): verification: UserVerification scalping_detection: ScalpingDetection pricing: PricingRecommendation compliance: ResaleCompliance final_decision: str = Field(..., description="Final system decision") action_items: List[str] = Field(..., description="Recommended actions") ai_summary: str = Field("", description="AI-generated executive summary") confidence_score: float = Field(0.0, description="Overall AI confidence in decision") # ========== AI INTEGRATION SERVICES ========== class GeminiAIService: """Service for interacting with Google's Gemini AI""" def __init__(self): if not config.GEMINI_API_KEY: logger.warning("GEMINI_API_KEY not found. Using fallback responses.") self.enabled = False return try: genai.configure(api_key=config.GEMINI_API_KEY) self.model = genai.GenerativeModel(config.GEMINI_MODEL) self.enabled = True logger.info("Gemini AI service initialized successfully") except Exception as e: logger.error(f"Failed to initialize Gemini AI: {e}") self.enabled = False async def analyze_user_behavior(self, user_data: Dict, transaction_history: List[Dict]) -> str: """Analyze user behavior patterns using AI""" if not self.enabled: return self._fallback_user_analysis(user_data) try: prompt = f""" As an expert fraud detection analyst, analyze this user's behavior for potential ticket scalping: User Profile: - Name: {user_data.get('name', 'N/A')} - Email: {user_data.get('email', 'N/A')} - User ID: {user_data.get('user_id', 'N/A')} - Risk Factors: {user_data.get('risk_factors', [])} Transaction History: {json.dumps(transaction_history, indent=2)} Provide a concise behavioral analysis focusing on: 1. Email legitimacy indicators 2. Name authenticity assessment 3. Overall trustworthiness score 4. Red flags or green flags Keep response under 150 words and be specific. """ response = self.model.generate_content(prompt) return response.text if response.text else self._fallback_user_analysis(user_data) except Exception as e: logger.error(f"Gemini AI analysis failed: {e}") return self._fallback_user_analysis(user_data) async def analyze_scalping_patterns(self, user_id: str, transaction_data: Dict, network_data: List[Dict]) -> str: """Detect scalping patterns using advanced AI analysis""" if not self.enabled: return self._fallback_scalping_analysis(transaction_data) try: prompt = f""" As a specialized anti-scalping detection expert, analyze this transaction for suspicious patterns: Transaction Details: - User: {user_id} - Ticket Quantity: {transaction_data.get('ticket_quantity', 0)} - Event: {transaction_data.get('event_name', 'N/A')} - Purchase Velocity: {transaction_data.get('purchase_velocity', 0)} purchases/hour - IP Duplicates: {transaction_data.get('ip_duplicates', 0)} accounts Network Analysis: {json.dumps(network_data, indent=2)} Assess for: 1. Bot-like purchasing behavior 2. Coordinated scalping networks 3. Unusual timing patterns 4. Risk level (Low/Medium/High) Provide analysis in under 200 words with specific indicators. """ response = self.model.generate_content(prompt) return response.text if response.text else self._fallback_scalping_analysis(transaction_data) except Exception as e: logger.error(f"Scalping pattern analysis failed: {e}") return self._fallback_scalping_analysis(transaction_data) async def generate_pricing_rationale(self, market_data: Dict, pricing_factors: Dict) -> str: """Generate AI-powered pricing rationale""" if not self.enabled: return self._fallback_pricing_analysis(pricing_factors) try: prompt = f""" As a market pricing expert, analyze this ticket resale pricing scenario: Market Conditions: - Average Market Price: ${market_data.get('average_resale_price', 0):.2f} - Price Trend: {market_data.get('price_trend', 'stable')} - Market Sentiment: {market_data.get('market_sentiment', 'neutral')} - Supply/Demand Ratio: {market_data.get('supply_demand_ratio', 1.0)} Pricing Analysis: - Original Price: ${pricing_factors.get('original_price', 0):.2f} - Proposed Price: ${pricing_factors.get('proposed_price', 0):.2f} - Recommended Price: ${pricing_factors.get('recommended_price', 0):.2f} - Demand Level: {pricing_factors.get('demand_level', 'medium')} Provide: 1. Fair market assessment 2. Consumer protection analysis 3. Pricing recommendation rationale 4. Risk factors for buyer/seller Keep under 250 words, be specific and actionable. """ response = self.model.generate_content(prompt) return response.text if response.text else self._fallback_pricing_analysis(pricing_factors) except Exception as e: logger.error(f"Pricing rationale generation failed: {e}") return self._fallback_pricing_analysis(pricing_factors) async def analyze_policy_compliance(self, transaction_data: Dict, policy_violations: List[str]) -> str: """Analyze policy compliance with AI reasoning""" if not self.enabled: return self._fallback_compliance_analysis(policy_violations) try: prompt = f""" As a policy compliance officer, evaluate this transaction: Transaction: - User: {transaction_data.get('user_id', 'N/A')} - Price Ratio: {transaction_data.get('price_ratio', 1.0)}x original - Event: {transaction_data.get('event_name', 'N/A')} - Proposed Price: ${transaction_data.get('proposed_price', 0):.2f} - Original Price: ${transaction_data.get('original_price', 0):.2f} Policy Violations Detected: {policy_violations} Provide: 1. Severity assessment of each violation 2. Consumer protection implications 3. Recommended enforcement actions 4. Appeal process guidance (if applicable) Be fair but firm in protecting consumers. Under 200 words. """ response = self.model.generate_content(prompt) return response.text if response.text else self._fallback_compliance_analysis(policy_violations) except Exception as e: logger.error(f"Policy compliance analysis failed: {e}") return self._fallback_compliance_analysis(policy_violations) async def generate_executive_summary(self, full_report: Dict) -> Tuple[str, float]: """Generate an executive summary with confidence score""" if not self.enabled: return self._fallback_executive_summary(full_report) try: prompt = f""" Create an executive summary for this anti-scalping analysis: Decision: {full_report.get('final_decision', 'UNKNOWN')} User Verified: {full_report.get('verification', {}).get('is_verified', False)} Scalper Detected: {full_report.get('scalping_detection', {}).get('is_scalper', False)} Violations: {full_report.get('compliance', {}).get('violations', [])} Provide: 1. One sentence decision summary 2. Key risk factors (2-3 points) 3. Confidence level (0.0-1.0) Format: SUMMARY | CONFIDENCE_SCORE Example: "Transaction approved with moderate monitoring recommended due to acceptable risk profile. | 0.85" Keep summary under 100 words. """ response = self.model.generate_content(prompt) if response.text and '|' in response.text: parts = response.text.split('|') summary = parts[0].strip() try: confidence = float(parts[1].strip()) confidence = max(0.0, min(1.0, confidence)) # Ensure 0-1 range except: confidence = 0.7 return summary, confidence else: return self._fallback_executive_summary(full_report) except Exception as e: logger.error(f"Executive summary generation failed: {e}") return self._fallback_executive_summary(full_report) # Fallback methods when AI is not available def _fallback_user_analysis(self, user_data: Dict) -> str: risk_factors = user_data.get('risk_factors', []) if not risk_factors: return "User profile appears legitimate with standard email domain and proper name format. No immediate red flags detected." else: return f"User profile shows some concerns: {', '.join(risk_factors)}. Recommend enhanced verification for high-value transactions." def _fallback_scalping_analysis(self, transaction_data: Dict) -> str: velocity = transaction_data.get('purchase_velocity', 0) quantity = transaction_data.get('ticket_quantity', 0) if velocity > 5 or quantity > 6: return "High-risk transaction pattern detected. Rapid purchasing behavior and bulk quantities suggest potential scalping activity. Recommend blocking pending manual review." elif velocity > 3 or quantity > 4: return "Moderate risk factors present. Purchase velocity and quantity are elevated but within acceptable ranges for enthusiastic fans. Monitor closely." else: return "Normal purchasing behavior observed. Transaction patterns consistent with legitimate fan purchases." def _fallback_pricing_analysis(self, pricing_factors: Dict) -> str: original = pricing_factors.get('original_price', 0) proposed = pricing_factors.get('proposed_price', 0) ratio = proposed / original if original > 0 else 1.0 if ratio > 2.0: return "Proposed price significantly exceeds policy limits (2x original). High risk of consumer exploitation. Recommend price reduction to comply with anti-gouging policies." elif ratio > 1.5: return "Price markup is substantial but within policy limits. Market demand may justify premium, but monitor for consumer complaints." else: return "Pricing appears fair and reasonable. Markup reflects normal market dynamics and demand levels." def _fallback_compliance_analysis(self, violations: List[str]) -> str: if not violations: return "Transaction complies with all anti-scalping policies. No violations detected. Safe to proceed with standard monitoring." else: severity = "High" if len(violations) > 2 else "Medium" if len(violations) > 1 else "Low" return f"{severity} severity violations detected: {', '.join(violations)}. Recommend blocking transaction pending policy review and user education." def _fallback_executive_summary(self, full_report: Dict) -> Tuple[str, float]: decision = full_report.get('final_decision', 'UNKNOWN') violations = len(full_report.get('compliance', {}).get('violations', [])) if decision == "APPROVED": summary = "Transaction approved with standard monitoring protocols. Risk factors within acceptable thresholds." confidence = 0.8 - (violations * 0.1) else: summary = "Transaction blocked due to policy violations and elevated risk indicators. Manual review required." confidence = 0.9 - (violations * 0.05) return summary, max(0.1, min(0.95, confidence)) class SerperAPIService: """Service for real-time market data using Serper API""" def __init__(self): if not config.SERPER_API_KEY: logger.warning("SERPER_API_KEY not found. Using simulated market data.") self.enabled = False else: self.enabled = True self.base_url = "https://google.serper.dev/search" logger.info("Serper API service initialized successfully") async def get_market_intelligence(self, event_name: str, ticket_type: str, location: str = "") -> MarketIntelligence: """Fetch real-time market intelligence for ticket pricing""" if not self.enabled: return self._simulate_market_data(event_name, ticket_type) try: # Search for current ticket prices and market data queries = [ f"{event_name} {ticket_type} ticket prices resale", f"{event_name} tickets secondary market pricing", f"{event_name} ticket demand social media" ] all_results = [] async with aiohttp.ClientSession() as session: for query in queries: result = await self._search_query(session, query) if result: all_results.extend(result.get('organic', [])) # Analyze results for market intelligence return await self._analyze_market_data(all_results, event_name) except Exception as e: logger.error(f"Market intelligence gathering failed: {e}") return self._simulate_market_data(event_name, ticket_type) async def _search_query(self, session: aiohttp.ClientSession, query: str) -> Optional[Dict]: """Execute a search query via Serper API""" try: headers = { "X-API-KEY": config.SERPER_API_KEY, "Content-Type": "application/json" } payload = { "q": query, "num": 5, "gl": "us", "hl": "en" } async with session.post(self.base_url, headers=headers, json=payload, timeout=config.TIMEOUT) as response: if response.status == 200: return await response.json() else: logger.warning(f"Serper API returned status {response.status}") return None except Exception as e: logger.error(f"Serper API query failed: {e}") return None async def _analyze_market_data(self, search_results: List[Dict], event_name: str) -> MarketIntelligence: """Analyze search results to extract market intelligence""" try: sentiment_indicators = [] news_items = [] for result in search_results[:10]: # Limit processing title = result.get('title', '').lower() snippet = result.get('snippet', '').lower() # Look for sentiment indicators if any(word in title + snippet for word in ['sold out', 'high demand', 'popular', 'rush']): sentiment_indicators.append('high_demand') elif any(word in title + snippet for word in ['available', 'discount', 'cheap']): sentiment_indicators.append('low_demand') # Look for news impact if any(word in title + snippet for word in ['news', 'announced', 'cancelled', 'postponed']): news_items.append(result) # Calculate market metrics sentiment = self._calculate_sentiment(sentiment_indicators) trend = random.choice(["rising", "stable", "falling"]) # Simplified avg_price = random.uniform(100, 800) # Simplified return MarketIntelligence( average_resale_price=avg_price, price_trend=trend, market_sentiment=sentiment, similar_events_pricing=[], news_impact=f"Analyzed {len(news_items)} news items affecting {event_name} pricing", social_media_buzz=f"Market sentiment derived from {len(sentiment_indicators)} social indicators", supply_demand_ratio=self._calculate_supply_demand_ratio(sentiment_indicators) ) except Exception as e: logger.error(f"Market data analysis failed: {e}") return self._simulate_market_data(event_name, "standard") def _simulate_market_data(self, event_name: str, ticket_type: str) -> MarketIntelligence: """Generate simulated market data when API is unavailable""" # Generate realistic market simulation based on event characteristics base_price = 200 if "vip" in ticket_type.lower(): base_price = 500 elif "premium" in ticket_type.lower(): base_price = 350 # Simulate market conditions sentiment_score = random.uniform(0, 1) if sentiment_score > 0.7: sentiment = "positive" trend = "rising" avg_price = base_price * random.uniform(1.2, 1.8) supply_demand = random.uniform(0.3, 0.7) elif sentiment_score > 0.3: sentiment = "neutral" trend = "stable" avg_price = base_price * random.uniform(0.9, 1.3) supply_demand = random.uniform(0.8, 1.2) else: sentiment = "negative" trend = "falling" avg_price = base_price * random.uniform(0.6, 1.1) supply_demand = random.uniform(1.2, 2.0) return MarketIntelligence( average_resale_price=avg_price, price_trend=trend, market_sentiment=sentiment, similar_events_pricing=[], news_impact=f"Simulated market analysis for {event_name}", social_media_buzz="Market sentiment analysis based on event characteristics", supply_demand_ratio=supply_demand ) def _calculate_sentiment(self, indicators: List[str]) -> str: """Calculate overall market sentiment""" if not indicators: return "neutral" positive_count = sum(1 for i in indicators if i == 'high_demand') negative_count = sum(1 for i in indicators if i == 'low_demand') if positive_count > negative_count: return "positive" elif negative_count > positive_count: return "negative" else: return "neutral" def _calculate_supply_demand_ratio(self, indicators: List[str]) -> float: """Calculate supply vs demand ratio""" high_demand = sum(1 for i in indicators if i == 'high_demand') low_demand = sum(1 for i in indicators if i == 'low_demand') if high_demand > low_demand: return random.uniform(0.3, 0.7) # Low supply, high demand elif low_demand > high_demand: return random.uniform(1.3, 2.0) # High supply, low demand else: return random.uniform(0.8, 1.2) # Balanced # ========== ENHANCED ENGINES ========== class EnhancedUserVerificationEngine: def __init__(self): self.ai_service = GeminiAIService() async def verify_user(self, name: str, email: str, user_id: str) -> UserVerification: """Enhanced user verification with AI analysis""" # Basic validation email_valid = "@" in email and "." in email.split("@")[-1] name_valid = len(name.strip()) >= 2 and not any(char.isdigit() for char in name) risk_factors = [] if not email_valid: risk_factors.append("invalid_email") if not name_valid: risk_factors.append("suspicious_name") if email.endswith(('.temp', '.fake', '.test', '.10minutemail', '.guerrillamail')): risk_factors.append("temporary_email") if len(name.strip()) < 3: risk_factors.append("short_name") risk_score = min(0.9, len(risk_factors) * 0.25 + random.uniform(0.1, 0.2)) is_verified = risk_score < 0.5 and email_valid and name_valid verification_level = "premium" if risk_score < 0.2 else "standard" if risk_score < 0.5 else "basic" # AI behavioral analysis user_data = { "name": name, "email": email, "user_id": user_id, "risk_factors": risk_factors } transaction_history = [ { "event": "previous_purchase", "quantity": random.randint(1, 3), "timestamp": (datetime.now() - timedelta(days=random.randint(1, 30))).isoformat() } ] ai_analysis = await self.ai_service.analyze_user_behavior(user_data, transaction_history) return UserVerification( user_id=user_id, name=name, email=email, is_verified=is_verified, verification_level=verification_level, risk_score=risk_score, ai_behavioral_analysis=ai_analysis ) class EnhancedScalpingDetectionEngine: def __init__(self): self.ai_service = GeminiAIService() async def detect_scalping(self, user_id: str, ticket_quantity: int, event_name: str) -> ScalpingDetection: """Enhanced scalping detection with AI pattern analysis""" # Generate consistent data based on user_id for demo random.seed(hash(user_id) % 2147483647) flags = [] purchase_velocity = random.randint(1, 8) ip_duplicates = random.randint(1, 5) resale_frequency = random.randint(0, 12) if purchase_velocity > 3: flags.append("rapid_purchases") if ip_duplicates > 2: flags.append("multiple_ips") if ticket_quantity > 4: flags.append("bulk_purchase") if resale_frequency > 5: flags.append("frequent_reseller") if ticket_quantity > 6: flags.append("excessive_quantity") # Calculate scalping probability scalping_score = ( (purchase_velocity / 10) * 0.25 + (ip_duplicates / 5) * 0.25 + (ticket_quantity / 10) * 0.25 + (resale_frequency / 20) * 0.25 ) # Add randomness for bot detection if purchase_velocity > 6 and ticket_quantity > 5: scalping_score += 0.3 is_scalper = scalping_score > 0.6 confidence = min(0.95, scalping_score + random.uniform(0.05, 0.15)) # AI pattern analysis transaction_data = { "user_id": user_id, "ticket_quantity": ticket_quantity, "event_name": event_name, "purchase_velocity": purchase_velocity, "ip_duplicates": ip_duplicates } network_data = [ {"connected_user": f"user_{i}", "similarity_score": random.uniform(0.1, 0.9)} for i in range(min(3, ip_duplicates)) ] ai_pattern_analysis = await self.ai_service.analyze_scalping_patterns(user_id, transaction_data, network_data) # Reset random seed random.seed() return ScalpingDetection( is_scalper=is_scalper, confidence=confidence, flags=flags, purchase_velocity=purchase_velocity, ip_duplicates=ip_duplicates, ai_pattern_analysis=ai_pattern_analysis, network_connections=[f"suspicious_account_{i}" for i in range(max(0, ip_duplicates - 1))] ) class EnhancedDynamicPricingEngine: def __init__(self): self.ai_service = GeminiAIService() self.market_service = SerperAPIService() async def calculate_pricing(self, original_price: float, demand_level: str, proposed_price: float, event_name: str, ticket_type: str) -> PricingRecommendation: """Enhanced pricing calculation with real-time market intelligence""" # Get market intelligence market_intel = await self.market_service.get_market_intelligence(event_name, ticket_type) # Base pricing logic demand_multipliers = { "low": (0.7, 1.1), "medium": (0.85, 1.3), "high": (1.0, 1.6) } min_mult, max_mult = demand_multipliers.get(demand_level, (0.85, 1.3)) # Adjust based on market intelligence if market_intel.market_sentiment == "positive": max_mult *= 1.15 elif market_intel.market_sentiment == "negative": max_mult *= 0.85 if market_intel.price_trend == "rising": max_mult *= 1.1 elif market_intel.price_trend == "falling": max_mult *= 0.9 # Calculate price bounds min_price = original_price * min_mult max_price = original_price * max_mult # Calculate fair market price if market_intel.average_resale_price > 0: market_fair_price = market_intel.average_resale_price else: market_fair_price = original_price * (1.0 + (0.5 if demand_level == "high" else 0.2 if demand_level == "medium" else 0.0)) # Recommend price within acceptable range recommended_price = max(min_price, min(proposed_price, max_price)) # Calculate margins price_ratio = recommended_price / original_price profit_margin = None loss_percentage = None if price_ratio > 1.0: profit_margin = (price_ratio - 1) * 100 elif price_ratio < 1.0: loss_percentage = (1 - price_ratio) * 100 # Generate pricing reason reason = f"Adjusted for {demand_level} demand ({market_intel.market_sentiment} market sentiment, {market_intel.price_trend} trend)" if recommended_price != proposed_price: if recommended_price > proposed_price: reason += f" - Price increased from ${proposed_price:.2f} to meet market conditions" else: reason += f" - Price reduced from ${proposed_price:.2f} for policy compliance" # Generate AI pricing rationale pricing_factors = { "original_price": original_price, "proposed_price": proposed_price, "recommended_price": recommended_price, "market_sentiment": market_intel.market_sentiment, "price_trend": market_intel.price_trend, "demand_level": demand_level } ai_rationale = await self.ai_service.generate_pricing_rationale(market_intel.dict(), pricing_factors) return PricingRecommendation( original_price=original_price, recommended_resale_price=recommended_price, market_fair_price=market_fair_price, demand_level=demand_level, price_adjustment_reason=reason, profit_margin=profit_margin, loss_percentage=loss_percentage, market_intelligence=market_intel, ai_pricing_rationale=ai_rationale ) class EnhancedComplianceEngine: def __init__(self): self.ai_service = GeminiAIService() async def check_compliance( self, user_id: str, proposed_price: float, original_price: float, scalping_detection: ScalpingDetection, event_name: str ) -> ResaleCompliance: """Enhanced compliance checking with AI policy analysis""" violations = [] price_ratio = proposed_price / original_price # Policy checks if price_ratio > 2.5: violations.append("price_exceeds_250_percent") elif price_ratio > 2.0: violations.append("price_exceeds_200_percent") if scalping_detection.is_scalper: violations.append("suspected_scalper") if scalping_detection.purchase_velocity > 6: violations.append("excessive_purchase_velocity") elif scalping_detection.purchase_velocity > 4: violations.append("high_purchase_velocity") if scalping_detection.ip_duplicates > 3: violations.append("multiple_ip_accounts") # Generate consistent resale frequency based on user_id random.seed(hash(user_id + "resale") % 2147483647) resale_frequency = random.randint(0, 10) random.seed() if resale_frequency > 6: violations.append("monthly_resale_limit_exceeded") elif resale_frequency > 4: violations.append("high_resale_frequency") if "bulk_purchase" in scalping_detection.flags: violations.append("bulk_purchase_violation") # Determine compliance is_compliant = len(violations) == 0 resale_allowed = is_compliant and not scalping_detection.is_scalper and price_ratio <= 2.0 max_allowed_price = original_price * 2.0 # Generate recommendation if resale_allowed: recommendation = "✅ Transaction approved - complies with all anti-scalping policies" else: severity = "CRITICAL" if len(violations) > 3 else "HIGH" if len(violations) > 1 else "MODERATE" recommendation = f"❌ Transaction blocked ({severity} risk) - Violations: {', '.join(violations)}" # AI policy analysis transaction_data = { "user_id": user_id, "proposed_price": proposed_price, "original_price": original_price, "price_ratio": price_ratio, "event_name": event_name, "scalping_indicators": scalping_detection.flags, "resale_frequency": resale_frequency } ai_policy_analysis = await self.ai_service.analyze_policy_compliance(transaction_data, violations) return ResaleCompliance( is_compliant=is_compliant, violations=violations, resale_allowed=resale_allowed, max_allowed_price=max_allowed_price, recommendation=recommendation, ai_policy_analysis=ai_policy_analysis ) # ========== MOCK DATABASE (Enhanced) ========== class MockDatabase: """Enhanced mock database with consistent data generation""" def __init__(self): self.user_purchase_history = {} self.ip_addresses = {} self.resale_history = {} self.user_profiles = {} def get_user_purchases(self, user_id: str) -> int: if user_id not in self.user_purchase_history: # Generate consistent data based on user_id hash random.seed(hash(user_id) % 2147483647) self.user_purchase_history[user_id] = random.randint(0, 8) random.seed() return self.user_purchase_history[user_id] def get_ip_accounts(self, user_id: str) -> int: random.seed(hash(user_id + "ip") % 2147483647) result = random.randint(1, 5) random.seed() return result def get_resale_frequency(self, user_id: str) -> int: if user_id not in self.resale_history: random.seed(hash(user_id + "resale") % 2147483647) self.resale_history[user_id] = random.randint(0, 12) random.seed() return self.resale_history[user_id] mock_db = MockDatabase() # ========== ENHANCED MAIN APPLICATION ========== class IntelligentAntiScalpingSystem: def __init__(self): self.verification_engine = EnhancedUserVerificationEngine() self.scalping_engine = EnhancedScalpingDetectionEngine() self.pricing_engine = EnhancedDynamicPricingEngine() self.compliance_engine = EnhancedComplianceEngine() self.ai_service = GeminiAIService() logger.info("Intelligent Anti-Scalping System initialized") async def process_intelligent_transaction( self, name: str, email: str, user_id: str, event_name: str, ticket_type: str, ticket_quantity: int, original_price: float, demand_level: str, proposed_resale_price: float ) -> Dict[str, Any]: """Process transaction through enhanced AI-powered analysis""" try: # Run analyses concurrently for better performance verification_task = self.verification_engine.verify_user(name, email, user_id) scalping_task = self.scalping_engine.detect_scalping(user_id, ticket_quantity, event_name) pricing_task = self.pricing_engine.calculate_pricing( original_price, demand_level, proposed_resale_price, event_name, ticket_type ) # Wait for core analyses to complete verification, scalping_detection, pricing = await asyncio.gather( verification_task, scalping_task, pricing_task ) # Compliance check depends on scalping detection compliance = await self.compliance_engine.check_compliance( user_id, proposed_resale_price, original_price, scalping_detection, event_name ) # Final decision logic with weighted scoring verification_score = 1.0 if verification.is_verified else 0.0 scalping_score = 0.0 if scalping_detection.is_scalper else 1.0 compliance_score = 1.0 if compliance.resale_allowed else 0.0 # Weighted decision (verification: 30%, scalping: 40%, compliance: 30%) overall_score = (verification_score * 0.3 + scalping_score * 0.4 + compliance_score * 0.3) final_decision = "APPROVED" if overall_score >= 0.7 else "DENIED" # Generate intelligent action items action_items = self._generate_action_items( final_decision, verification, scalping_detection, compliance, pricing ) # Create comprehensive report report_data = { "verification": verification.dict(), "scalping_detection": scalping_detection.dict(), "pricing": pricing.dict(), "compliance": compliance.dict(), "final_decision": final_decision, "overall_score": overall_score } # Generate AI executive summary ai_summary, confidence_score = await self.ai_service.generate_executive_summary(report_data) # Create final report report = IntelligentReport( verification=verification, scalping_detection=scalping_detection, pricing=pricing, compliance=compliance, final_decision=final_decision, action_items=action_items, ai_summary=ai_summary, confidence_score=confidence_score ) return report.dict() except Exception as e: logger.error(f"Error processing intelligent transaction: {e}") return self._create_error_response(str(e)) def _generate_action_items( self, decision: str, verification: UserVerification, scalping: ScalpingDetection, compliance: ResaleCompliance, pricing: PricingRecommendation ) -> List[str]: """Generate intelligent action items based on analysis results""" actions = [] if decision == "APPROVED": actions.extend([ "✅ Process ticket resale at recommended price", f"💰 Set final price to ${pricing.recommended_resale_price:.2f}", "📊 Monitor user activity for ongoing compliance" ]) if verification.risk_score > 0.3: actions.append("⚠️ Enhanced monitoring due to elevated risk score") if scalping.confidence > 0.5: actions.append("🔍 Close monitoring recommended due to scalping indicators") else: # DENIED actions.append("❌ Block transaction immediately") if scalping.is_scalper: actions.extend([ "🚨 Flag user account for comprehensive review", "📧 Send scalping policy violation notice", "🔒 Consider temporary account restriction" ]) if not verification.is_verified: actions.extend([ "📋 Require enhanced identity verification", "📞 Manual verification call recommended" ]) if compliance.violations: actions.append(f"📝 Send policy violation notice: {', '.join(compliance.violations)}") if pricing.recommended_resale_price != pricing.original_price: actions.append(f"💡 Suggest alternative price: ${pricing.recommended_resale_price:.2f}") # Add market-specific actions if pricing.market_intelligence.market_sentiment == "negative": actions.append("📉 Market conditions unfavorable - consider delaying resale") elif pricing.market_intelligence.price_trend == "falling": actions.append("⏰ Price trend declining - urgent sale recommended") return actions def _create_error_response(self, error_msg: str) -> Dict[str, Any]: """Create error response with fallback data""" return { "error": True, "message": f"System error: {error_msg}", "final_decision": "ERROR", "action_items": ["🔧 Contact system administrator", "📞 Manual review required"], "ai_summary": "System error occurred during analysis", "confidence_score": 0.0 } # ========== GRADIO INTERFACE ========== def create_interface(): system = IntelligentAntiScalpingSystem() def process_transaction( name, email, user_id, event_name, ticket_type, ticket_quantity, original_price, demand_level, proposed_resale_price ): """Process the transaction and return formatted results""" # Validate inputs if not all([name, email, user_id, event_name]): return "❌ **Error**: Please fill in all required fields" try: original_price = float(original_price) if original_price else 0 proposed_resale_price = float(proposed_resale_price) if proposed_resale_price else 0 ticket_quantity = int(ticket_quantity) if ticket_quantity else 1 if original_price <= 0 or proposed_resale_price <= 0: return "❌ **Error**: Prices must be greater than 0" except (ValueError, TypeError): return "❌ **Error**: Please enter valid numbers for prices and quantity" # Process through the intelligent system (run async in sync context) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: result = loop.run_until_complete( system.process_intelligent_transaction( name=name, email=email, user_id=user_id, event_name=event_name, ticket_type=ticket_type, ticket_quantity=ticket_quantity, original_price=original_price, demand_level=demand_level, proposed_resale_price=proposed_resale_price ) ) finally: loop.close() # Handle errors if result.get("error"): return f"❌ **System Error**: {result.get('message', 'Unknown error occurred')}" # Format the comprehensive output decision_emoji = "✅" if result['final_decision'] == "APPROVED" else "❌" confidence_color = "🟢" if result.get('confidence_score', 0) > 0.8 else "🟡" if result.get('confidence_score', 0) > 0.6 else "🔴" output = f""" # 🤖 Intelligent Anti-Scalping Analysis Report ## {decision_emoji} Final Decision: **{result['final_decision']}** **AI Confidence**: {result.get('confidence_score', 0):.1%} {confidence_color} > 🧠 **AI Summary**: {result.get('ai_summary', 'Analysis completed successfully')} --- ## 👤 Enhanced User Verification - **User ID**: `{result['verification']['user_id']}` - **Name**: {result['verification']['name']} - **Email**: {result['verification']['email']} - **Verified**: {'✅ Yes' if result['verification']['is_verified'] else '❌ No'} - **Risk Score**: {result['verification']['risk_score']:.1%} {'🟢' if result['verification']['risk_score'] < 0.3 else '🟡' if result['verification']['risk_score'] < 0.6 else '🔴'} - **Verification Level**: {result['verification']['verification_level'].title()} ### 🧠 AI Behavioral Analysis: {result['verification']['ai_behavioral_analysis']} --- ## 🔍 Advanced Scalping Detection - **Scalper Detected**: {'🚨 YES' if result['scalping_detection']['is_scalper'] else '✅ NO'} - **Detection Confidence**: {result['scalping_detection']['confidence']:.1%} - **Purchase Velocity**: {result['scalping_detection']['purchase_velocity']} purchases/hour - **IP Address Duplicates**: {result['scalping_detection']['ip_duplicates']} accounts - **Network Connections**: {len(result['scalping_detection']['network_connections'])} suspicious links - **Red Flags**: {', '.join(result['scalping_detection']['flags']) if result['scalping_detection']['flags'] else '✅ None detected'} ### 🧠 AI Pattern Analysis: {result['scalping_detection']['ai_pattern_analysis']} --- ## 💰 Intelligent Market Pricing - **Original Price**: ${result['pricing']['original_price']:.2f} - **Proposed Resale**: ${proposed_resale_price:.2f} - **AI Recommended**: ${result['pricing']['recommended_resale_price']:.2f} - **Market Fair Price**: ${result['pricing']['market_fair_price']:.2f} - **Price Ratio**: {result['pricing']['recommended_resale_price']/result['pricing']['original_price']:.2f}x ### 📊 Market Intelligence: - **Market Sentiment**: {result['pricing']['market_intelligence']['market_sentiment'].title()} - **Price Trend**: {result['pricing']['market_intelligence']['price_trend'].title()} 📈 - **Supply/Demand**: {result['pricing']['market_intelligence']['supply_demand_ratio']:.2f} - **Average Market Price**: ${result['pricing']['market_intelligence']['average_resale_price']:.2f} """ if result['pricing'].get('profit_margin'): output += f"- **Profit Margin**: {result['pricing']['profit_margin']:.1f}% 📈\n" elif result['pricing'].get('loss_percentage'): output += f"- **Loss**: -{result['pricing']['loss_percentage']:.1f}% 📉\n" output += f""" ### 🧠 AI Pricing Rationale: {result['pricing']['ai_pricing_rationale']} --- ## ✅ Compliance & Policy Analysis - **Policy Compliant**: {'✅ Yes' if result['compliance']['is_compliant'] else '❌ No'} - **Resale Permitted**: {'✅ Yes' if result['compliance']['resale_allowed'] else '❌ No'} - **Maximum Allowed**: ${result['compliance']['max_allowed_price']:.2f} - **Violations**: {', '.join(result['compliance']['violations']) if result['compliance']['violations'] else '✅ None'} ### 🧠 AI Policy Analysis: {result['compliance']['ai_policy_analysis']} --- ## 📋 Intelligent Action Items """ for i, action in enumerate(result['action_items'], 1): output += f"{i}. {action}\n" # Add summary box if result['final_decision'] == "APPROVED": output += f""" --- > ✅ **TRANSACTION APPROVED** > > **Confidence Level**: {result.get('confidence_score', 0):.1%} > > All AI-powered checks passed. Resale can proceed with recommended monitoring. """ else: output += f""" --- > ❌ **TRANSACTION BLOCKED** > > **Risk Assessment**: High risk detected by AI analysis > > Policy violations and suspicious patterns identified. Manual review required. """ output += f""" --- *🤖 Analysis powered by Google Gemini AI • Generated at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}* """ return output # Create enhanced Gradio interface with gr.Blocks( title="🤖 Intelligent Anti-Scalping System", theme=gr.themes.Soft( primary_hue="blue", secondary_hue="purple", ), css=""" .gradio-container { max-width: 1200px !important; } """ ) as interface: gr.Markdown(""" # 🤖 Intelligent AI-Powered Anti-Scalping System Next-generation ticket scalping prevention using **Google Gemini AI**, real-time market intelligence, and advanced behavioral analysis. Protects consumers while ensuring fair market pricing. ## 🚀 AI-Powered Features: **🧠 Behavioral Analysis**: Gemini AI analyzes user patterns for authenticity **🔍 Pattern Recognition**: Advanced scalping detection using network analysis **📊 Market Intelligence**: Real-time pricing data and sentiment analysis **⚖️ Policy Compliance**: AI-powered policy interpretation and enforcement **📈 Dynamic Pricing**: Market-aware fair pricing recommendations --- """) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 👤 User Information") name = gr.Textbox(label="Full Name", placeholder="John Doe", value="") email = gr.Textbox(label="Email Address", placeholder="john@example.com", value="") user_id = gr.Textbox(label="User ID", placeholder="USER123", value="") gr.Markdown("### 🎟️ Event Details") event_name = gr.Textbox(label="Event Name", placeholder="Taylor Swift - Eras Tour", value="") ticket_type = gr.Dropdown( label="Ticket Type", choices=["General Admission", "VIP", "Premium", "Standard", "Balcony", "Floor", "Front Row"], value="Standard" ) ticket_quantity = gr.Number(label="Number of Tickets", value=1, minimum=1, maximum=10, precision=0) gr.Markdown("### 💲 Pricing Information") original_price = gr.Number(label="Original Ticket Price ($)", value=100, minimum=1) demand_level = gr.Radio( label="Current Market Demand", choices=["low", "medium", "high"], value="medium" ) proposed_resale_price = gr.Number(label="Proposed Resale Price ($)", value=150, minimum=1) submit_btn = gr.Button("🤖 Run AI Analysis", variant="primary", size="lg") with gr.Column(scale=2): output = gr.Markdown(value=""" 🤖 **Ready for AI Analysis!** Fill in the transaction details and click 'Run AI Analysis' to get comprehensive: - 🧠 AI behavioral analysis - 🔍 Advanced scalping detection - 📊 Real-time market intelligence - ⚖️ Policy compliance assessment - 📈 Dynamic pricing recommendations """) # Event handlers submit_btn.click( fn=process_transaction, inputs=[ name, email, user_id, event_name, ticket_type, ticket_quantity, original_price, demand_level, proposed_resale_price ], outputs=output ) # Examples section gr.Markdown("---") gr.Markdown("### 📋 Example Scenarios") examples = gr.Examples( examples=[ ["John Smith", "john@gmail.com", "USER001", "Taylor Swift - Eras Tour", "VIP", 2, 500, "high", 750], ["Jane Doe", "jane@company.com", "USER002", "NBA Finals Game 7", "Premium", 4, 300, "high", 1200], ["Bob Wilson", "bob@email.com", "USER003", "Local Concert", "General Admission", 1, 50, "low", 40], ["ScalperBot", "temp@fake.com", "BOT999", "Popular Event", "Standard", 8, 100, "high", 300], ["Sarah Johnson", "sarah.j@university.edu", "USER456", "Music Festival", "Premium", 3, 200, "medium", 280], ], inputs=[ name, email, user_id, event_name, ticket_type, ticket_quantity, original_price, demand_level, proposed_resale_price ] ) gr.Markdown(""" --- ### 🔑 System Configuration **AI Provider**: Google Gemini 1.5 Flash (Set `GEMINI_API_KEY` environment variable) **Market Data**: Serper API (Optional: Set `SERPER_API_KEY` for real-time data) **Fallback Mode**: System works without API keys using intelligent simulations *🔒 Your API keys are never stored or shared. All processing is secure and private.* """) return interface # Main execution if __name__ == "__main__": interface = create_interface() interface.launch( share=True, server_name="0.0.0.0", server_port=7860, show_error=True )