ticket_scalping / app.py
Masterverse's picture
Update app.py
5d3018d verified
raw
history blame
55.1 kB
#!/usr/bin/env python
import os
import json
import logging
import random
import asyncio
import aiohttp
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Tuple
from pydantic import BaseModel, Field
import gradio as gr
import google.generativeai as genai
from dataclasses import dataclass
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)
# ========== CONFIGURATION ==========
@dataclass
class Config:
GEMINI_API_KEY: str = os.getenv("GEMINI_API_KEY", "")
SERPER_API_KEY: str = os.getenv("SERPER_API_KEY", "")
GEMINI_MODEL: str = "gemini-2.0-flash"
MAX_RETRIES: int = 3
TIMEOUT: int = 30
config = Config()
# ========== ENHANCED DATA MODELS ==========
class UserVerification(BaseModel):
user_id: str = Field(..., description="User ID")
name: str = Field(..., description="User name")
email: str = Field(..., description="User email")
is_verified: bool = Field(..., description="KYC verification status")
verification_level: str = Field(..., description="Verification level: basic, standard, premium")
risk_score: float = Field(..., description="Risk score 0-1")
ai_behavioral_analysis: str = Field("", description="AI analysis of user behavior")
class MarketIntelligence(BaseModel):
average_resale_price: float = Field(0.0, description="Average market resale price")
price_trend: str = Field("stable", description="Price trend: rising, falling, stable")
market_sentiment: str = Field("neutral", description="Market sentiment: positive, negative, neutral")
similar_events_pricing: List[Dict] = Field(default_factory=list, description="Similar events pricing data")
news_impact: str = Field("", description="News impact on pricing")
social_media_buzz: str = Field("", description="Social media sentiment analysis")
supply_demand_ratio: float = Field(1.0, description="Supply vs demand ratio")
class ScalpingDetection(BaseModel):
is_scalper: bool = Field(..., description="Whether user is detected as scalper")
confidence: float = Field(..., description="Detection confidence 0-1")
flags: List[str] = Field(..., description="Suspicious activity flags")
purchase_velocity: int = Field(..., description="Number of purchases in last hour")
ip_duplicates: int = Field(..., description="Number of accounts from same IP")
ai_pattern_analysis: str = Field("", description="AI analysis of behavior patterns")
network_connections: List[str] = Field(default_factory=list, description="Connected suspicious accounts")
class PricingRecommendation(BaseModel):
original_price: float = Field(..., description="Original ticket price")
recommended_resale_price: float = Field(..., description="Recommended resale price")
market_fair_price: float = Field(..., description="AI-calculated fair market price")
demand_level: str = Field(..., description="Current demand level")
price_adjustment_reason: str = Field(..., description="Reason for price adjustment")
profit_margin: Optional[float] = Field(None, description="Profit margin percentage")
loss_percentage: Optional[float] = Field(None, description="Loss percentage if selling below cost")
market_intelligence: MarketIntelligence = Field(default_factory=MarketIntelligence)
ai_pricing_rationale: str = Field("", description="AI explanation for pricing decision")
class ResaleCompliance(BaseModel):
is_compliant: bool = Field(..., description="Whether resale is policy compliant")
violations: List[str] = Field(..., description="List of policy violations")
resale_allowed: bool = Field(..., description="Whether resale is allowed")
max_allowed_price: float = Field(..., description="Maximum allowed resale price")
recommendation: str = Field(..., description="Compliance recommendation")
ai_policy_analysis: str = Field("", description="AI analysis of policy compliance")
class IntelligentReport(BaseModel):
verification: UserVerification
scalping_detection: ScalpingDetection
pricing: PricingRecommendation
compliance: ResaleCompliance
final_decision: str = Field(..., description="Final system decision")
action_items: List[str] = Field(..., description="Recommended actions")
ai_summary: str = Field("", description="AI-generated executive summary")
confidence_score: float = Field(0.0, description="Overall AI confidence in decision")
# ========== AI INTEGRATION SERVICES ==========
class GeminiAIService:
"""Service for interacting with Google's Gemini AI"""
def __init__(self):
if not config.GEMINI_API_KEY:
logger.warning("GEMINI_API_KEY not found. Using fallback responses.")
self.enabled = False
return
try:
genai.configure(api_key=config.GEMINI_API_KEY)
self.model = genai.GenerativeModel(config.GEMINI_MODEL)
self.enabled = True
logger.info("Gemini AI service initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Gemini AI: {e}")
self.enabled = False
async def analyze_user_behavior(self, user_data: Dict, transaction_history: List[Dict]) -> str:
"""Analyze user behavior patterns using AI"""
if not self.enabled:
return self._fallback_user_analysis(user_data)
try:
prompt = f"""
As an expert fraud detection analyst, analyze this user's behavior for potential ticket scalping:
User Profile:
- Name: {user_data.get('name', 'N/A')}
- Email: {user_data.get('email', 'N/A')}
- User ID: {user_data.get('user_id', 'N/A')}
- Risk Factors: {user_data.get('risk_factors', [])}
Transaction History: {json.dumps(transaction_history, indent=2)}
Provide a concise behavioral analysis focusing on:
1. Email legitimacy indicators
2. Name authenticity assessment
3. Overall trustworthiness score
4. Red flags or green flags
Keep response under 150 words and be specific.
"""
response = self.model.generate_content(prompt)
return response.text if response.text else self._fallback_user_analysis(user_data)
except Exception as e:
logger.error(f"Gemini AI analysis failed: {e}")
return self._fallback_user_analysis(user_data)
async def analyze_scalping_patterns(self, user_id: str, transaction_data: Dict, network_data: List[Dict]) -> str:
"""Detect scalping patterns using advanced AI analysis"""
if not self.enabled:
return self._fallback_scalping_analysis(transaction_data)
try:
prompt = f"""
As a specialized anti-scalping detection expert, analyze this transaction for suspicious patterns:
Transaction Details:
- User: {user_id}
- Ticket Quantity: {transaction_data.get('ticket_quantity', 0)}
- Event: {transaction_data.get('event_name', 'N/A')}
- Purchase Velocity: {transaction_data.get('purchase_velocity', 0)} purchases/hour
- IP Duplicates: {transaction_data.get('ip_duplicates', 0)} accounts
Network Analysis: {json.dumps(network_data, indent=2)}
Assess for:
1. Bot-like purchasing behavior
2. Coordinated scalping networks
3. Unusual timing patterns
4. Risk level (Low/Medium/High)
Provide analysis in under 200 words with specific indicators.
"""
response = self.model.generate_content(prompt)
return response.text if response.text else self._fallback_scalping_analysis(transaction_data)
except Exception as e:
logger.error(f"Scalping pattern analysis failed: {e}")
return self._fallback_scalping_analysis(transaction_data)
async def generate_pricing_rationale(self, market_data: Dict, pricing_factors: Dict) -> str:
"""Generate AI-powered pricing rationale"""
if not self.enabled:
return self._fallback_pricing_analysis(pricing_factors)
try:
prompt = f"""
As a market pricing expert, analyze this ticket resale pricing scenario:
Market Conditions:
- Average Market Price: ${market_data.get('average_resale_price', 0):.2f}
- Price Trend: {market_data.get('price_trend', 'stable')}
- Market Sentiment: {market_data.get('market_sentiment', 'neutral')}
- Supply/Demand Ratio: {market_data.get('supply_demand_ratio', 1.0)}
Pricing Analysis:
- Original Price: ${pricing_factors.get('original_price', 0):.2f}
- Proposed Price: ${pricing_factors.get('proposed_price', 0):.2f}
- Recommended Price: ${pricing_factors.get('recommended_price', 0):.2f}
- Demand Level: {pricing_factors.get('demand_level', 'medium')}
Provide:
1. Fair market assessment
2. Consumer protection analysis
3. Pricing recommendation rationale
4. Risk factors for buyer/seller
Keep under 250 words, be specific and actionable.
"""
response = self.model.generate_content(prompt)
return response.text if response.text else self._fallback_pricing_analysis(pricing_factors)
except Exception as e:
logger.error(f"Pricing rationale generation failed: {e}")
return self._fallback_pricing_analysis(pricing_factors)
async def analyze_policy_compliance(self, transaction_data: Dict, policy_violations: List[str]) -> str:
"""Analyze policy compliance with AI reasoning"""
if not self.enabled:
return self._fallback_compliance_analysis(policy_violations)
try:
prompt = f"""
As a policy compliance officer, evaluate this transaction:
Transaction:
- User: {transaction_data.get('user_id', 'N/A')}
- Price Ratio: {transaction_data.get('price_ratio', 1.0)}x original
- Event: {transaction_data.get('event_name', 'N/A')}
- Proposed Price: ${transaction_data.get('proposed_price', 0):.2f}
- Original Price: ${transaction_data.get('original_price', 0):.2f}
Policy Violations Detected: {policy_violations}
Provide:
1. Severity assessment of each violation
2. Consumer protection implications
3. Recommended enforcement actions
4. Appeal process guidance (if applicable)
Be fair but firm in protecting consumers. Under 200 words.
"""
response = self.model.generate_content(prompt)
return response.text if response.text else self._fallback_compliance_analysis(policy_violations)
except Exception as e:
logger.error(f"Policy compliance analysis failed: {e}")
return self._fallback_compliance_analysis(policy_violations)
async def generate_executive_summary(self, full_report: Dict) -> Tuple[str, float]:
"""Generate an executive summary with confidence score"""
if not self.enabled:
return self._fallback_executive_summary(full_report)
try:
prompt = f"""
Create an executive summary for this anti-scalping analysis:
Decision: {full_report.get('final_decision', 'UNKNOWN')}
User Verified: {full_report.get('verification', {}).get('is_verified', False)}
Scalper Detected: {full_report.get('scalping_detection', {}).get('is_scalper', False)}
Violations: {full_report.get('compliance', {}).get('violations', [])}
Provide:
1. One sentence decision summary
2. Key risk factors (2-3 points)
3. Confidence level (0.0-1.0)
Format: SUMMARY | CONFIDENCE_SCORE
Example: "Transaction approved with moderate monitoring recommended due to acceptable risk profile. | 0.85"
Keep summary under 100 words.
"""
response = self.model.generate_content(prompt)
if response.text and '|' in response.text:
parts = response.text.split('|')
summary = parts[0].strip()
try:
confidence = float(parts[1].strip())
confidence = max(0.0, min(1.0, confidence)) # Ensure 0-1 range
except:
confidence = 0.7
return summary, confidence
else:
return self._fallback_executive_summary(full_report)
except Exception as e:
logger.error(f"Executive summary generation failed: {e}")
return self._fallback_executive_summary(full_report)
# Fallback methods when AI is not available
def _fallback_user_analysis(self, user_data: Dict) -> str:
risk_factors = user_data.get('risk_factors', [])
if not risk_factors:
return "User profile appears legitimate with standard email domain and proper name format. No immediate red flags detected."
else:
return f"User profile shows some concerns: {', '.join(risk_factors)}. Recommend enhanced verification for high-value transactions."
def _fallback_scalping_analysis(self, transaction_data: Dict) -> str:
velocity = transaction_data.get('purchase_velocity', 0)
quantity = transaction_data.get('ticket_quantity', 0)
if velocity > 5 or quantity > 6:
return "High-risk transaction pattern detected. Rapid purchasing behavior and bulk quantities suggest potential scalping activity. Recommend blocking pending manual review."
elif velocity > 3 or quantity > 4:
return "Moderate risk factors present. Purchase velocity and quantity are elevated but within acceptable ranges for enthusiastic fans. Monitor closely."
else:
return "Normal purchasing behavior observed. Transaction patterns consistent with legitimate fan purchases."
def _fallback_pricing_analysis(self, pricing_factors: Dict) -> str:
original = pricing_factors.get('original_price', 0)
proposed = pricing_factors.get('proposed_price', 0)
ratio = proposed / original if original > 0 else 1.0
if ratio > 2.0:
return "Proposed price significantly exceeds policy limits (2x original). High risk of consumer exploitation. Recommend price reduction to comply with anti-gouging policies."
elif ratio > 1.5:
return "Price markup is substantial but within policy limits. Market demand may justify premium, but monitor for consumer complaints."
else:
return "Pricing appears fair and reasonable. Markup reflects normal market dynamics and demand levels."
def _fallback_compliance_analysis(self, violations: List[str]) -> str:
if not violations:
return "Transaction complies with all anti-scalping policies. No violations detected. Safe to proceed with standard monitoring."
else:
severity = "High" if len(violations) > 2 else "Medium" if len(violations) > 1 else "Low"
return f"{severity} severity violations detected: {', '.join(violations)}. Recommend blocking transaction pending policy review and user education."
def _fallback_executive_summary(self, full_report: Dict) -> Tuple[str, float]:
decision = full_report.get('final_decision', 'UNKNOWN')
violations = len(full_report.get('compliance', {}).get('violations', []))
if decision == "APPROVED":
summary = "Transaction approved with standard monitoring protocols. Risk factors within acceptable thresholds."
confidence = 0.8 - (violations * 0.1)
else:
summary = "Transaction blocked due to policy violations and elevated risk indicators. Manual review required."
confidence = 0.9 - (violations * 0.05)
return summary, max(0.1, min(0.95, confidence))
class SerperAPIService:
"""Service for real-time market data using Serper API"""
def __init__(self):
if not config.SERPER_API_KEY:
logger.warning("SERPER_API_KEY not found. Using simulated market data.")
self.enabled = False
else:
self.enabled = True
self.base_url = "https://google.serper.dev/search"
logger.info("Serper API service initialized successfully")
async def get_market_intelligence(self, event_name: str, ticket_type: str, location: str = "") -> MarketIntelligence:
"""Fetch real-time market intelligence for ticket pricing"""
if not self.enabled:
return self._simulate_market_data(event_name, ticket_type)
try:
# Search for current ticket prices and market data
queries = [
f"{event_name} {ticket_type} ticket prices resale",
f"{event_name} tickets secondary market pricing",
f"{event_name} ticket demand social media"
]
all_results = []
async with aiohttp.ClientSession() as session:
for query in queries:
result = await self._search_query(session, query)
if result:
all_results.extend(result.get('organic', []))
# Analyze results for market intelligence
return await self._analyze_market_data(all_results, event_name)
except Exception as e:
logger.error(f"Market intelligence gathering failed: {e}")
return self._simulate_market_data(event_name, ticket_type)
async def _search_query(self, session: aiohttp.ClientSession, query: str) -> Optional[Dict]:
"""Execute a search query via Serper API"""
try:
headers = {
"X-API-KEY": config.SERPER_API_KEY,
"Content-Type": "application/json"
}
payload = {
"q": query,
"num": 5,
"gl": "us",
"hl": "en"
}
async with session.post(self.base_url, headers=headers, json=payload, timeout=config.TIMEOUT) as response:
if response.status == 200:
return await response.json()
else:
logger.warning(f"Serper API returned status {response.status}")
return None
except Exception as e:
logger.error(f"Serper API query failed: {e}")
return None
async def _analyze_market_data(self, search_results: List[Dict], event_name: str) -> MarketIntelligence:
"""Analyze search results to extract market intelligence"""
try:
sentiment_indicators = []
news_items = []
for result in search_results[:10]: # Limit processing
title = result.get('title', '').lower()
snippet = result.get('snippet', '').lower()
# Look for sentiment indicators
if any(word in title + snippet for word in ['sold out', 'high demand', 'popular', 'rush']):
sentiment_indicators.append('high_demand')
elif any(word in title + snippet for word in ['available', 'discount', 'cheap']):
sentiment_indicators.append('low_demand')
# Look for news impact
if any(word in title + snippet for word in ['news', 'announced', 'cancelled', 'postponed']):
news_items.append(result)
# Calculate market metrics
sentiment = self._calculate_sentiment(sentiment_indicators)
trend = random.choice(["rising", "stable", "falling"]) # Simplified
avg_price = random.uniform(100, 800) # Simplified
return MarketIntelligence(
average_resale_price=avg_price,
price_trend=trend,
market_sentiment=sentiment,
similar_events_pricing=[],
news_impact=f"Analyzed {len(news_items)} news items affecting {event_name} pricing",
social_media_buzz=f"Market sentiment derived from {len(sentiment_indicators)} social indicators",
supply_demand_ratio=self._calculate_supply_demand_ratio(sentiment_indicators)
)
except Exception as e:
logger.error(f"Market data analysis failed: {e}")
return self._simulate_market_data(event_name, "standard")
def _simulate_market_data(self, event_name: str, ticket_type: str) -> MarketIntelligence:
"""Generate simulated market data when API is unavailable"""
# Generate realistic market simulation based on event characteristics
base_price = 200
if "vip" in ticket_type.lower():
base_price = 500
elif "premium" in ticket_type.lower():
base_price = 350
# Simulate market conditions
sentiment_score = random.uniform(0, 1)
if sentiment_score > 0.7:
sentiment = "positive"
trend = "rising"
avg_price = base_price * random.uniform(1.2, 1.8)
supply_demand = random.uniform(0.3, 0.7)
elif sentiment_score > 0.3:
sentiment = "neutral"
trend = "stable"
avg_price = base_price * random.uniform(0.9, 1.3)
supply_demand = random.uniform(0.8, 1.2)
else:
sentiment = "negative"
trend = "falling"
avg_price = base_price * random.uniform(0.6, 1.1)
supply_demand = random.uniform(1.2, 2.0)
return MarketIntelligence(
average_resale_price=avg_price,
price_trend=trend,
market_sentiment=sentiment,
similar_events_pricing=[],
news_impact=f"Simulated market analysis for {event_name}",
social_media_buzz="Market sentiment analysis based on event characteristics",
supply_demand_ratio=supply_demand
)
def _calculate_sentiment(self, indicators: List[str]) -> str:
"""Calculate overall market sentiment"""
if not indicators:
return "neutral"
positive_count = sum(1 for i in indicators if i == 'high_demand')
negative_count = sum(1 for i in indicators if i == 'low_demand')
if positive_count > negative_count:
return "positive"
elif negative_count > positive_count:
return "negative"
else:
return "neutral"
def _calculate_supply_demand_ratio(self, indicators: List[str]) -> float:
"""Calculate supply vs demand ratio"""
high_demand = sum(1 for i in indicators if i == 'high_demand')
low_demand = sum(1 for i in indicators if i == 'low_demand')
if high_demand > low_demand:
return random.uniform(0.3, 0.7) # Low supply, high demand
elif low_demand > high_demand:
return random.uniform(1.3, 2.0) # High supply, low demand
else:
return random.uniform(0.8, 1.2) # Balanced
# ========== ENHANCED ENGINES ==========
class EnhancedUserVerificationEngine:
def __init__(self):
self.ai_service = GeminiAIService()
async def verify_user(self, name: str, email: str, user_id: str) -> UserVerification:
"""Enhanced user verification with AI analysis"""
# Basic validation
email_valid = "@" in email and "." in email.split("@")[-1]
name_valid = len(name.strip()) >= 2 and not any(char.isdigit() for char in name)
risk_factors = []
if not email_valid:
risk_factors.append("invalid_email")
if not name_valid:
risk_factors.append("suspicious_name")
if email.endswith(('.temp', '.fake', '.test', '.10minutemail', '.guerrillamail')):
risk_factors.append("temporary_email")
if len(name.strip()) < 3:
risk_factors.append("short_name")
risk_score = min(0.9, len(risk_factors) * 0.25 + random.uniform(0.1, 0.2))
is_verified = risk_score < 0.5 and email_valid and name_valid
verification_level = "premium" if risk_score < 0.2 else "standard" if risk_score < 0.5 else "basic"
# AI behavioral analysis
user_data = {
"name": name,
"email": email,
"user_id": user_id,
"risk_factors": risk_factors
}
transaction_history = [
{
"event": "previous_purchase",
"quantity": random.randint(1, 3),
"timestamp": (datetime.now() - timedelta(days=random.randint(1, 30))).isoformat()
}
]
ai_analysis = await self.ai_service.analyze_user_behavior(user_data, transaction_history)
return UserVerification(
user_id=user_id,
name=name,
email=email,
is_verified=is_verified,
verification_level=verification_level,
risk_score=risk_score,
ai_behavioral_analysis=ai_analysis
)
class EnhancedScalpingDetectionEngine:
def __init__(self):
self.ai_service = GeminiAIService()
async def detect_scalping(self, user_id: str, ticket_quantity: int, event_name: str) -> ScalpingDetection:
"""Enhanced scalping detection with AI pattern analysis"""
# Generate consistent data based on user_id for demo
random.seed(hash(user_id) % 2147483647)
flags = []
purchase_velocity = random.randint(1, 8)
ip_duplicates = random.randint(1, 5)
resale_frequency = random.randint(0, 12)
if purchase_velocity > 3:
flags.append("rapid_purchases")
if ip_duplicates > 2:
flags.append("multiple_ips")
if ticket_quantity > 4:
flags.append("bulk_purchase")
if resale_frequency > 5:
flags.append("frequent_reseller")
if ticket_quantity > 6:
flags.append("excessive_quantity")
# Calculate scalping probability
scalping_score = (
(purchase_velocity / 10) * 0.25 +
(ip_duplicates / 5) * 0.25 +
(ticket_quantity / 10) * 0.25 +
(resale_frequency / 20) * 0.25
)
# Add randomness for bot detection
if purchase_velocity > 6 and ticket_quantity > 5:
scalping_score += 0.3
is_scalper = scalping_score > 0.6
confidence = min(0.95, scalping_score + random.uniform(0.05, 0.15))
# AI pattern analysis
transaction_data = {
"user_id": user_id,
"ticket_quantity": ticket_quantity,
"event_name": event_name,
"purchase_velocity": purchase_velocity,
"ip_duplicates": ip_duplicates
}
network_data = [
{"connected_user": f"user_{i}", "similarity_score": random.uniform(0.1, 0.9)}
for i in range(min(3, ip_duplicates))
]
ai_pattern_analysis = await self.ai_service.analyze_scalping_patterns(user_id, transaction_data, network_data)
# Reset random seed
random.seed()
return ScalpingDetection(
is_scalper=is_scalper,
confidence=confidence,
flags=flags,
purchase_velocity=purchase_velocity,
ip_duplicates=ip_duplicates,
ai_pattern_analysis=ai_pattern_analysis,
network_connections=[f"suspicious_account_{i}" for i in range(max(0, ip_duplicates - 1))]
)
class EnhancedDynamicPricingEngine:
def __init__(self):
self.ai_service = GeminiAIService()
self.market_service = SerperAPIService()
async def calculate_pricing(self, original_price: float, demand_level: str, proposed_price: float, event_name: str, ticket_type: str) -> PricingRecommendation:
"""Enhanced pricing calculation with real-time market intelligence"""
# Get market intelligence
market_intel = await self.market_service.get_market_intelligence(event_name, ticket_type)
# Base pricing logic
demand_multipliers = {
"low": (0.7, 1.1),
"medium": (0.85, 1.3),
"high": (1.0, 1.6)
}
min_mult, max_mult = demand_multipliers.get(demand_level, (0.85, 1.3))
# Adjust based on market intelligence
if market_intel.market_sentiment == "positive":
max_mult *= 1.15
elif market_intel.market_sentiment == "negative":
max_mult *= 0.85
if market_intel.price_trend == "rising":
max_mult *= 1.1
elif market_intel.price_trend == "falling":
max_mult *= 0.9
# Calculate price bounds
min_price = original_price * min_mult
max_price = original_price * max_mult
# Calculate fair market price
if market_intel.average_resale_price > 0:
market_fair_price = market_intel.average_resale_price
else:
market_fair_price = original_price * (1.0 + (0.5 if demand_level == "high" else 0.2 if demand_level == "medium" else 0.0))
# Recommend price within acceptable range
recommended_price = max(min_price, min(proposed_price, max_price))
# Calculate margins
price_ratio = recommended_price / original_price
profit_margin = None
loss_percentage = None
if price_ratio > 1.0:
profit_margin = (price_ratio - 1) * 100
elif price_ratio < 1.0:
loss_percentage = (1 - price_ratio) * 100
# Generate pricing reason
reason = f"Adjusted for {demand_level} demand ({market_intel.market_sentiment} market sentiment, {market_intel.price_trend} trend)"
if recommended_price != proposed_price:
if recommended_price > proposed_price:
reason += f" - Price increased from ${proposed_price:.2f} to meet market conditions"
else:
reason += f" - Price reduced from ${proposed_price:.2f} for policy compliance"
# Generate AI pricing rationale
pricing_factors = {
"original_price": original_price,
"proposed_price": proposed_price,
"recommended_price": recommended_price,
"market_sentiment": market_intel.market_sentiment,
"price_trend": market_intel.price_trend,
"demand_level": demand_level
}
ai_rationale = await self.ai_service.generate_pricing_rationale(market_intel.dict(), pricing_factors)
return PricingRecommendation(
original_price=original_price,
recommended_resale_price=recommended_price,
market_fair_price=market_fair_price,
demand_level=demand_level,
price_adjustment_reason=reason,
profit_margin=profit_margin,
loss_percentage=loss_percentage,
market_intelligence=market_intel,
ai_pricing_rationale=ai_rationale
)
class EnhancedComplianceEngine:
def __init__(self):
self.ai_service = GeminiAIService()
async def check_compliance(
self,
user_id: str,
proposed_price: float,
original_price: float,
scalping_detection: ScalpingDetection,
event_name: str
) -> ResaleCompliance:
"""Enhanced compliance checking with AI policy analysis"""
violations = []
price_ratio = proposed_price / original_price
# Policy checks
if price_ratio > 2.5:
violations.append("price_exceeds_250_percent")
elif price_ratio > 2.0:
violations.append("price_exceeds_200_percent")
if scalping_detection.is_scalper:
violations.append("suspected_scalper")
if scalping_detection.purchase_velocity > 6:
violations.append("excessive_purchase_velocity")
elif scalping_detection.purchase_velocity > 4:
violations.append("high_purchase_velocity")
if scalping_detection.ip_duplicates > 3:
violations.append("multiple_ip_accounts")
# Generate consistent resale frequency based on user_id
random.seed(hash(user_id + "resale") % 2147483647)
resale_frequency = random.randint(0, 10)
random.seed()
if resale_frequency > 6:
violations.append("monthly_resale_limit_exceeded")
elif resale_frequency > 4:
violations.append("high_resale_frequency")
if "bulk_purchase" in scalping_detection.flags:
violations.append("bulk_purchase_violation")
# Determine compliance
is_compliant = len(violations) == 0
resale_allowed = is_compliant and not scalping_detection.is_scalper and price_ratio <= 2.0
max_allowed_price = original_price * 2.0
# Generate recommendation
if resale_allowed:
recommendation = "βœ… Transaction approved - complies with all anti-scalping policies"
else:
severity = "CRITICAL" if len(violations) > 3 else "HIGH" if len(violations) > 1 else "MODERATE"
recommendation = f"❌ Transaction blocked ({severity} risk) - Violations: {', '.join(violations)}"
# AI policy analysis
transaction_data = {
"user_id": user_id,
"proposed_price": proposed_price,
"original_price": original_price,
"price_ratio": price_ratio,
"event_name": event_name,
"scalping_indicators": scalping_detection.flags,
"resale_frequency": resale_frequency
}
ai_policy_analysis = await self.ai_service.analyze_policy_compliance(transaction_data, violations)
return ResaleCompliance(
is_compliant=is_compliant,
violations=violations,
resale_allowed=resale_allowed,
max_allowed_price=max_allowed_price,
recommendation=recommendation,
ai_policy_analysis=ai_policy_analysis
)
# ========== MOCK DATABASE (Enhanced) ==========
class MockDatabase:
"""Enhanced mock database with consistent data generation"""
def __init__(self):
self.user_purchase_history = {}
self.ip_addresses = {}
self.resale_history = {}
self.user_profiles = {}
def get_user_purchases(self, user_id: str) -> int:
if user_id not in self.user_purchase_history:
# Generate consistent data based on user_id hash
random.seed(hash(user_id) % 2147483647)
self.user_purchase_history[user_id] = random.randint(0, 8)
random.seed()
return self.user_purchase_history[user_id]
def get_ip_accounts(self, user_id: str) -> int:
random.seed(hash(user_id + "ip") % 2147483647)
result = random.randint(1, 5)
random.seed()
return result
def get_resale_frequency(self, user_id: str) -> int:
if user_id not in self.resale_history:
random.seed(hash(user_id + "resale") % 2147483647)
self.resale_history[user_id] = random.randint(0, 12)
random.seed()
return self.resale_history[user_id]
mock_db = MockDatabase()
# ========== ENHANCED MAIN APPLICATION ==========
class IntelligentAntiScalpingSystem:
def __init__(self):
self.verification_engine = EnhancedUserVerificationEngine()
self.scalping_engine = EnhancedScalpingDetectionEngine()
self.pricing_engine = EnhancedDynamicPricingEngine()
self.compliance_engine = EnhancedComplianceEngine()
self.ai_service = GeminiAIService()
logger.info("Intelligent Anti-Scalping System initialized")
async def process_intelligent_transaction(
self,
name: str,
email: str,
user_id: str,
event_name: str,
ticket_type: str,
ticket_quantity: int,
original_price: float,
demand_level: str,
proposed_resale_price: float
) -> Dict[str, Any]:
"""Process transaction through enhanced AI-powered analysis"""
try:
# Run analyses concurrently for better performance
verification_task = self.verification_engine.verify_user(name, email, user_id)
scalping_task = self.scalping_engine.detect_scalping(user_id, ticket_quantity, event_name)
pricing_task = self.pricing_engine.calculate_pricing(
original_price, demand_level, proposed_resale_price, event_name, ticket_type
)
# Wait for core analyses to complete
verification, scalping_detection, pricing = await asyncio.gather(
verification_task, scalping_task, pricing_task
)
# Compliance check depends on scalping detection
compliance = await self.compliance_engine.check_compliance(
user_id, proposed_resale_price, original_price, scalping_detection, event_name
)
# Final decision logic with weighted scoring
verification_score = 1.0 if verification.is_verified else 0.0
scalping_score = 0.0 if scalping_detection.is_scalper else 1.0
compliance_score = 1.0 if compliance.resale_allowed else 0.0
# Weighted decision (verification: 30%, scalping: 40%, compliance: 30%)
overall_score = (verification_score * 0.3 + scalping_score * 0.4 + compliance_score * 0.3)
final_decision = "APPROVED" if overall_score >= 0.7 else "DENIED"
# Generate intelligent action items
action_items = self._generate_action_items(
final_decision, verification, scalping_detection, compliance, pricing
)
# Create comprehensive report
report_data = {
"verification": verification.dict(),
"scalping_detection": scalping_detection.dict(),
"pricing": pricing.dict(),
"compliance": compliance.dict(),
"final_decision": final_decision,
"overall_score": overall_score
}
# Generate AI executive summary
ai_summary, confidence_score = await self.ai_service.generate_executive_summary(report_data)
# Create final report
report = IntelligentReport(
verification=verification,
scalping_detection=scalping_detection,
pricing=pricing,
compliance=compliance,
final_decision=final_decision,
action_items=action_items,
ai_summary=ai_summary,
confidence_score=confidence_score
)
return report.dict()
except Exception as e:
logger.error(f"Error processing intelligent transaction: {e}")
return self._create_error_response(str(e))
def _generate_action_items(
self,
decision: str,
verification: UserVerification,
scalping: ScalpingDetection,
compliance: ResaleCompliance,
pricing: PricingRecommendation
) -> List[str]:
"""Generate intelligent action items based on analysis results"""
actions = []
if decision == "APPROVED":
actions.extend([
"βœ… Process ticket resale at recommended price",
f"πŸ’° Set final price to ${pricing.recommended_resale_price:.2f}",
"πŸ“Š Monitor user activity for ongoing compliance"
])
if verification.risk_score > 0.3:
actions.append("⚠️ Enhanced monitoring due to elevated risk score")
if scalping.confidence > 0.5:
actions.append("πŸ” Close monitoring recommended due to scalping indicators")
else: # DENIED
actions.append("❌ Block transaction immediately")
if scalping.is_scalper:
actions.extend([
"🚨 Flag user account for comprehensive review",
"πŸ“§ Send scalping policy violation notice",
"πŸ”’ Consider temporary account restriction"
])
if not verification.is_verified:
actions.extend([
"πŸ“‹ Require enhanced identity verification",
"πŸ“ž Manual verification call recommended"
])
if compliance.violations:
actions.append(f"πŸ“ Send policy violation notice: {', '.join(compliance.violations)}")
if pricing.recommended_resale_price != pricing.original_price:
actions.append(f"πŸ’‘ Suggest alternative price: ${pricing.recommended_resale_price:.2f}")
# Add market-specific actions
if pricing.market_intelligence.market_sentiment == "negative":
actions.append("πŸ“‰ Market conditions unfavorable - consider delaying resale")
elif pricing.market_intelligence.price_trend == "falling":
actions.append("⏰ Price trend declining - urgent sale recommended")
return actions
def _create_error_response(self, error_msg: str) -> Dict[str, Any]:
"""Create error response with fallback data"""
return {
"error": True,
"message": f"System error: {error_msg}",
"final_decision": "ERROR",
"action_items": ["πŸ”§ Contact system administrator", "πŸ“ž Manual review required"],
"ai_summary": "System error occurred during analysis",
"confidence_score": 0.0
}
# ========== GRADIO INTERFACE ==========
def create_interface():
system = IntelligentAntiScalpingSystem()
def process_transaction(
name, email, user_id, event_name, ticket_type,
ticket_quantity, original_price, demand_level, proposed_resale_price
):
"""Process the transaction and return formatted results"""
# Validate inputs
if not all([name, email, user_id, event_name]):
return "❌ **Error**: Please fill in all required fields"
try:
original_price = float(original_price) if original_price else 0
proposed_resale_price = float(proposed_resale_price) if proposed_resale_price else 0
ticket_quantity = int(ticket_quantity) if ticket_quantity else 1
if original_price <= 0 or proposed_resale_price <= 0:
return "❌ **Error**: Prices must be greater than 0"
except (ValueError, TypeError):
return "❌ **Error**: Please enter valid numbers for prices and quantity"
# Process through the intelligent system (run async in sync context)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
system.process_intelligent_transaction(
name=name,
email=email,
user_id=user_id,
event_name=event_name,
ticket_type=ticket_type,
ticket_quantity=ticket_quantity,
original_price=original_price,
demand_level=demand_level,
proposed_resale_price=proposed_resale_price
)
)
finally:
loop.close()
# Handle errors
if result.get("error"):
return f"❌ **System Error**: {result.get('message', 'Unknown error occurred')}"
# Format the comprehensive output
decision_emoji = "βœ…" if result['final_decision'] == "APPROVED" else "❌"
confidence_color = "🟒" if result.get('confidence_score', 0) > 0.8 else "🟑" if result.get('confidence_score', 0) > 0.6 else "πŸ”΄"
output = f"""
# πŸ€– Intelligent Anti-Scalping Analysis Report
## {decision_emoji} Final Decision: **{result['final_decision']}**
**AI Confidence**: {result.get('confidence_score', 0):.1%} {confidence_color}
> 🧠 **AI Summary**: {result.get('ai_summary', 'Analysis completed successfully')}
---
## πŸ‘€ Enhanced User Verification
- **User ID**: `{result['verification']['user_id']}`
- **Name**: {result['verification']['name']}
- **Email**: {result['verification']['email']}
- **Verified**: {'βœ… Yes' if result['verification']['is_verified'] else '❌ No'}
- **Risk Score**: {result['verification']['risk_score']:.1%} {'🟒' if result['verification']['risk_score'] < 0.3 else '🟑' if result['verification']['risk_score'] < 0.6 else 'πŸ”΄'}
- **Verification Level**: {result['verification']['verification_level'].title()}
### 🧠 AI Behavioral Analysis:
{result['verification']['ai_behavioral_analysis']}
---
## πŸ” Advanced Scalping Detection
- **Scalper Detected**: {'🚨 YES' if result['scalping_detection']['is_scalper'] else 'βœ… NO'}
- **Detection Confidence**: {result['scalping_detection']['confidence']:.1%}
- **Purchase Velocity**: {result['scalping_detection']['purchase_velocity']} purchases/hour
- **IP Address Duplicates**: {result['scalping_detection']['ip_duplicates']} accounts
- **Network Connections**: {len(result['scalping_detection']['network_connections'])} suspicious links
- **Red Flags**: {', '.join(result['scalping_detection']['flags']) if result['scalping_detection']['flags'] else 'βœ… None detected'}
### 🧠 AI Pattern Analysis:
{result['scalping_detection']['ai_pattern_analysis']}
---
## πŸ’° Intelligent Market Pricing
- **Original Price**: ${result['pricing']['original_price']:.2f}
- **Proposed Resale**: ${proposed_resale_price:.2f}
- **AI Recommended**: ${result['pricing']['recommended_resale_price']:.2f}
- **Market Fair Price**: ${result['pricing']['market_fair_price']:.2f}
- **Price Ratio**: {result['pricing']['recommended_resale_price']/result['pricing']['original_price']:.2f}x
### πŸ“Š Market Intelligence:
- **Market Sentiment**: {result['pricing']['market_intelligence']['market_sentiment'].title()}
- **Price Trend**: {result['pricing']['market_intelligence']['price_trend'].title()} πŸ“ˆ
- **Supply/Demand**: {result['pricing']['market_intelligence']['supply_demand_ratio']:.2f}
- **Average Market Price**: ${result['pricing']['market_intelligence']['average_resale_price']:.2f}
"""
if result['pricing'].get('profit_margin'):
output += f"- **Profit Margin**: {result['pricing']['profit_margin']:.1f}% πŸ“ˆ\n"
elif result['pricing'].get('loss_percentage'):
output += f"- **Loss**: -{result['pricing']['loss_percentage']:.1f}% πŸ“‰\n"
output += f"""
### 🧠 AI Pricing Rationale:
{result['pricing']['ai_pricing_rationale']}
---
## βœ… Compliance & Policy Analysis
- **Policy Compliant**: {'βœ… Yes' if result['compliance']['is_compliant'] else '❌ No'}
- **Resale Permitted**: {'βœ… Yes' if result['compliance']['resale_allowed'] else '❌ No'}
- **Maximum Allowed**: ${result['compliance']['max_allowed_price']:.2f}
- **Violations**: {', '.join(result['compliance']['violations']) if result['compliance']['violations'] else 'βœ… None'}
### 🧠 AI Policy Analysis:
{result['compliance']['ai_policy_analysis']}
---
## πŸ“‹ Intelligent Action Items
"""
for i, action in enumerate(result['action_items'], 1):
output += f"{i}. {action}\n"
# Add summary box
if result['final_decision'] == "APPROVED":
output += f"""
---
> βœ… **TRANSACTION APPROVED**
>
> **Confidence Level**: {result.get('confidence_score', 0):.1%}
>
> All AI-powered checks passed. Resale can proceed with recommended monitoring.
"""
else:
output += f"""
---
> ❌ **TRANSACTION BLOCKED**
>
> **Risk Assessment**: High risk detected by AI analysis
>
> Policy violations and suspicious patterns identified. Manual review required.
"""
output += f"""
---
*πŸ€– Analysis powered by Google Gemini AI β€’ Generated at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
"""
return output
# Create enhanced Gradio interface
with gr.Blocks(
title="πŸ€– Intelligent Anti-Scalping System",
theme=gr.themes.Soft(
primary_hue="blue",
secondary_hue="purple",
),
css="""
.gradio-container {
max-width: 1200px !important;
}
"""
) as interface:
gr.Markdown("""
# πŸ€– Intelligent AI-Powered Anti-Scalping System
Next-generation ticket scalping prevention using **Google Gemini AI**, real-time market intelligence,
and advanced behavioral analysis. Protects consumers while ensuring fair market pricing.
## πŸš€ AI-Powered Features:
**🧠 Behavioral Analysis**: Gemini AI analyzes user patterns for authenticity
**πŸ” Pattern Recognition**: Advanced scalping detection using network analysis
**πŸ“Š Market Intelligence**: Real-time pricing data and sentiment analysis
**βš–οΈ Policy Compliance**: AI-powered policy interpretation and enforcement
**πŸ“ˆ Dynamic Pricing**: Market-aware fair pricing recommendations
---
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### πŸ‘€ User Information")
name = gr.Textbox(label="Full Name", placeholder="John Doe", value="")
email = gr.Textbox(label="Email Address", placeholder="john@example.com", value="")
user_id = gr.Textbox(label="User ID", placeholder="USER123", value="")
gr.Markdown("### 🎟️ Event Details")
event_name = gr.Textbox(label="Event Name", placeholder="Taylor Swift - Eras Tour", value="")
ticket_type = gr.Dropdown(
label="Ticket Type",
choices=["General Admission", "VIP", "Premium", "Standard", "Balcony", "Floor", "Front Row"],
value="Standard"
)
ticket_quantity = gr.Number(label="Number of Tickets", value=1, minimum=1, maximum=10, precision=0)
gr.Markdown("### πŸ’² Pricing Information")
original_price = gr.Number(label="Original Ticket Price ($)", value=100, minimum=1)
demand_level = gr.Radio(
label="Current Market Demand",
choices=["low", "medium", "high"],
value="medium"
)
proposed_resale_price = gr.Number(label="Proposed Resale Price ($)", value=150, minimum=1)
submit_btn = gr.Button("πŸ€– Run AI Analysis", variant="primary", size="lg")
with gr.Column(scale=2):
output = gr.Markdown(value="""
πŸ€– **Ready for AI Analysis!**
Fill in the transaction details and click 'Run AI Analysis' to get comprehensive:
- 🧠 AI behavioral analysis
- πŸ” Advanced scalping detection
- πŸ“Š Real-time market intelligence
- βš–οΈ Policy compliance assessment
- πŸ“ˆ Dynamic pricing recommendations
""")
# Event handlers
submit_btn.click(
fn=process_transaction,
inputs=[
name, email, user_id, event_name, ticket_type,
ticket_quantity, original_price, demand_level, proposed_resale_price
],
outputs=output
)
# Examples section
gr.Markdown("---")
gr.Markdown("### πŸ“‹ Example Scenarios")
examples = gr.Examples(
examples=[
["John Smith", "john@gmail.com", "USER001", "Taylor Swift - Eras Tour", "VIP", 2, 500, "high", 750],
["Jane Doe", "jane@company.com", "USER002", "NBA Finals Game 7", "Premium", 4, 300, "high", 1200],
["Bob Wilson", "bob@email.com", "USER003", "Local Concert", "General Admission", 1, 50, "low", 40],
["ScalperBot", "temp@fake.com", "BOT999", "Popular Event", "Standard", 8, 100, "high", 300],
["Sarah Johnson", "sarah.j@university.edu", "USER456", "Music Festival", "Premium", 3, 200, "medium", 280],
],
inputs=[
name, email, user_id, event_name, ticket_type,
ticket_quantity, original_price, demand_level, proposed_resale_price
]
)
gr.Markdown("""
---
### πŸ”‘ System Configuration
**AI Provider**: Google Gemini 1.5 Flash (Set `GEMINI_API_KEY` environment variable)
**Market Data**: Serper API (Optional: Set `SERPER_API_KEY` for real-time data)
**Fallback Mode**: System works without API keys using intelligent simulations
*πŸ”’ Your API keys are never stored or shared. All processing is secure and private.*
""")
return interface
# Main execution
if __name__ == "__main__":
interface = create_interface()
interface.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)