# app.py - 올바른 API 처리가 포함된 최종 버전 import gradio as gr import sqlite3 import json import uuid from datetime import datetime import pandas as pd import os import threading # 데이터베이스 파일 경로 DB_FILE = "tracking_data.db" # 데이터베이스 초기화 def init_database(): """SQLite 데이터베이스 초기화 및 테이블 생성""" conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() # 방문자 테이블 생성 cursor.execute(""" CREATE TABLE IF NOT EXISTS visitors ( device_id TEXT PRIMARY KEY, first_seen TIMESTAMP NOT NULL, last_seen TIMESTAMP NOT NULL, user_agent TEXT, platform TEXT, language TEXT, screen_resolution TEXT, cpu_cores INTEGER, device_memory REAL, gpu_info TEXT, timezone TEXT ) """) # 이벤트 테이블 생성 cursor.execute(""" CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT NOT NULL, site_id TEXT NOT NULL, event_type TEXT NOT NULL, event_data TEXT, page_url TEXT, page_title TEXT, referrer TEXT, client_timestamp TIMESTAMP, server_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (device_id) REFERENCES visitors(device_id) ) """) # 인덱스 생성 (쿼리 성능 향상) cursor.execute("CREATE INDEX IF NOT EXISTS idx_device_id ON events(device_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON events(server_timestamp)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_event_type ON events(event_type)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_site_id ON events(site_id)") conn.commit() conn.close() print(f"✅ 데이터베이스 초기화 완료: {DB_FILE}") # 앱 시작 시 DB 초기화 init_database() # 스레드 안전성을 위한 락 db_lock = threading.Lock() def process_tracking_data(data_json): """추적 데이터를 받아서 SQLite DB에 저장""" with db_lock: conn = None try: # 로깅 추가 print(f"[API] Received data: {data_json[:200] if isinstance(data_json, str) else data_json}") # JSON 파싱 data = json.loads(data_json) if isinstance(data_json, str) else data_json # 데이터베이스 연결 conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() # 필수 필드 확인 device_id = data.get('deviceId', 'unknown') site_id = data.get('siteId', 'unknown') # 1. 방문자 정보 업데이트 (UPSERT) cursor.execute(""" SELECT device_id FROM visitors WHERE device_id = ? """, (device_id,)) if cursor.fetchone() is None: # 신규 방문자 삽입 cursor.execute(""" INSERT INTO visitors ( device_id, first_seen, last_seen, user_agent, platform, language, screen_resolution, cpu_cores, device_memory, gpu_info, timezone ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( device_id, datetime.now(), datetime.now(), data.get('userAgent', ''), data.get('platform', ''), data.get('language', ''), data.get('screenResolution', ''), data.get('cpuCores', 0), data.get('deviceMemory', 0), data.get('gpuInfo', ''), data.get('timezone', '') )) print(f"🆕 신규 방문자 등록: {device_id}") else: # 기존 방문자 업데이트 cursor.execute(""" UPDATE visitors SET last_seen = ?, user_agent = ?, platform = ?, language = ?, screen_resolution = ?, cpu_cores = ?, device_memory = ?, gpu_info = ?, timezone = ? WHERE device_id = ? """, ( datetime.now(), data.get('userAgent', ''), data.get('platform', ''), data.get('language', ''), data.get('screenResolution', ''), data.get('cpuCores', 0), data.get('deviceMemory', 0), data.get('gpuInfo', ''), data.get('timezone', ''), device_id )) # 2. 이벤트 저장 event_data = data.get('eventData', {}) cursor.execute(""" INSERT INTO events ( device_id, site_id, event_type, event_data, page_url, page_title, referrer, client_timestamp ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( device_id, site_id, data.get('eventType', 'pageview'), json.dumps(event_data), data.get('pageUrl', ''), data.get('pageTitle', ''), data.get('referrer', ''), data.get('timestamp', datetime.now().isoformat()) )) tracking_id = cursor.lastrowid conn.commit() print(f"✅ 이벤트 저장: ID={tracking_id}, Type={data.get('eventType')}, Device={device_id}") return { "status": "success", "message": "Data tracked successfully", "trackingId": tracking_id, "deviceId": device_id } except Exception as e: if conn: conn.rollback() print(f"❌ 오류 발생: {str(e)}") return {"status": "error", "message": str(e)} finally: if conn: conn.close() def get_statistics(): """DB에서 통계 데이터 조회""" conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() try: # 총 방문자 수 cursor.execute("SELECT COUNT(*) FROM visitors") total_visitors = cursor.fetchone()[0] # 오늘 방문자 수 cursor.execute(""" SELECT COUNT(DISTINCT device_id) FROM events WHERE DATE(server_timestamp) = DATE('now', 'localtime') """) today_visitors = cursor.fetchone()[0] # 총 이벤트 수 cursor.execute("SELECT COUNT(*) FROM events") total_events = cursor.fetchone()[0] # 이벤트 타입별 통계 cursor.execute(""" SELECT event_type, COUNT(*) as count FROM events GROUP BY event_type ORDER BY count DESC """) event_types = cursor.fetchall() # 사이트별 통계 cursor.execute(""" SELECT site_id, COUNT(*) as count FROM events GROUP BY site_id ORDER BY count DESC LIMIT 5 """) site_stats = cursor.fetchall() # 최근 이벤트 cursor.execute(""" SELECT e.server_timestamp, e.event_type, e.device_id, e.page_url, e.site_id FROM events e ORDER BY e.id DESC LIMIT 15 """) recent_events = cursor.fetchall() # 시간대별 통계 cursor.execute(""" SELECT strftime('%H', server_timestamp) as hour, COUNT(*) as count FROM events WHERE DATE(server_timestamp) = DATE('now', 'localtime') GROUP BY hour ORDER BY hour """) hourly_stats = cursor.fetchall() return { 'total_visitors': total_visitors, 'today_visitors': today_visitors, 'total_events': total_events, 'event_types': event_types, 'site_stats': site_stats, 'recent_events': recent_events, 'hourly_stats': hourly_stats } finally: conn.close() def view_statistics(): """통계 데이터 포맷팅하여 표시""" stats = get_statistics() summary = f""" ### 📊 방문자 통계 - **총 방문자**: {stats['total_visitors']}명 - **오늘 방문자**: {stats['today_visitors']}명 - **총 이벤트**: {stats['total_events']}회 - **마지막 업데이트**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ### 📈 이벤트 타입별 통계 """ for event_type, count in stats['event_types']: summary += f"\n- **{event_type}**: {count:,}회" if stats['site_stats']: summary += "\n\n### 🌐 사이트별 통계 (상위 5개)" for site_id, count in stats['site_stats']: summary += f"\n- **{site_id}**: {count:,}회" if stats['hourly_stats']: summary += "\n\n### ⏰ 오늘 시간대별 활동" for hour, count in stats['hourly_stats']: summary += f"\n- **{hour}시**: {count}회" summary += "\n\n### 📍 최근 이벤트 (최대 15개)" for event in stats['recent_events']: timestamp, event_type, device_id, page_url, site_id = event summary += f"\n- {timestamp} | {event_type} | {device_id[:8]}... | {site_id} | {page_url[:40]}..." return summary def get_recent_data(): """최근 데이터를 DataFrame으로 반환""" conn = sqlite3.connect(DB_FILE) try: query = """ SELECT e.server_timestamp as 'Timestamp', e.device_id as 'Device ID', e.event_type as 'Event Type', e.page_url as 'Page URL', v.platform as 'Platform', v.language as 'Language', e.site_id as 'Site ID' FROM events e JOIN visitors v ON e.device_id = v.device_id ORDER BY e.id DESC LIMIT 200 """ df = pd.read_sql_query(query, conn) return df finally: conn.close() def export_data(start_date, end_date): """데이터를 CSV로 내보내기""" conn = sqlite3.connect(DB_FILE) try: query = """ SELECT e.id, e.server_timestamp, e.client_timestamp, e.device_id, e.site_id, e.event_type, e.event_data, e.page_url, e.page_title, e.referrer, v.user_agent, v.platform, v.language, v.screen_resolution, v.cpu_cores, v.device_memory, v.gpu_info, v.timezone, v.first_seen, v.last_seen FROM events e JOIN visitors v ON e.device_id = v.device_id WHERE DATE(e.server_timestamp) BETWEEN ? AND ? ORDER BY e.id DESC """ df = pd.read_sql_query(query, conn, params=(start_date, end_date)) # CSV 파일로 저장 filename = f"tracking_export_{start_date}_to_{end_date}.csv" df.to_csv(filename, index=False) return filename, f"✅ 데이터를 {filename} 파일로 내보냈습니다. ({len(df):,}개 레코드)" except Exception as e: return None, f"❌ 내보내기 실패: {str(e)}" finally: conn.close() def test_tracking(): """테스트 데이터 생성""" test_data = { "siteId": "test-site", "deviceId": f"TEST_{str(uuid.uuid4())[:8]}", "eventType": "test_event", "eventData": {"test": True, "timestamp": datetime.now().isoformat()}, "pageUrl": "https://test.example.com/page", "pageTitle": "Test Page", "referrer": "https://google.com", "userAgent": "Test Browser/1.0", "screenResolution": "1920x1080", "platform": "Test Platform", "language": "ko-KR", "timezone": "Asia/Seoul", "cpuCores": 8, "deviceMemory": 16, "gpuInfo": "Test GPU", "timestamp": datetime.now().isoformat() } result = process_tracking_data(test_data) return f"테스트 결과:\n{json.dumps(result, indent=2)}\n\n테스트 데이터:\n{json.dumps(test_data, indent=2)}" def get_db_info(): """데이터베이스 정보 조회""" conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() try: # DB 파일 크기 db_size = os.path.getsize(DB_FILE) / (1024 * 1024) # MB # 테이블 정보 cursor.execute(""" SELECT name, sql FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' """) tables = cursor.fetchall() # 레코드 수 cursor.execute("SELECT COUNT(*) FROM visitors") visitor_count = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM events") event_count = cursor.fetchone()[0] info = f""" ### 💾 데이터베이스 정보 - **DB 파일**: {DB_FILE} - **파일 크기**: {db_size:.2f} MB - **방문자 수**: {visitor_count:,}개 - **이벤트 수**: {event_count:,}개 ### 📋 테이블 구조 """ for table_name, sql in tables: info += f"\n**{table_name}**\n```sql\n{sql}\n```\n" return info finally: conn.close() # 추적 스크립트 생성 함수 def generate_tracking_script(site_id, server_url): """외부 사이트에 삽입할 추적 스크립트 생성""" script = f""" """ return script.strip() def create_site_id(server_url): """새로운 추적 사이트 ID 생성""" if not server_url: server_url = "https://your-space-name.hf.space" site_id = str(uuid.uuid4())[:8] return site_id, generate_tracking_script(site_id, server_url) # CSS 스타일 custom_css = """ .gradio-container { font-family: 'Arial', sans-serif; } .tracking-script { background: #f5f5f5; border: 1px solid #ddd; border-radius: 5px; padding: 10px; font-family: monospace; font-size: 12px; } .db-info { background: #e3f2fd; padding: 15px; border-radius: 5px; margin: 10px 0; } """ # Gradio UI with gr.Blocks(title="방문자 추적 관리 시스템", css=custom_css, theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🔍 방문자 추적 관리 시스템 ### 외부 웹사이트에 추적 스크립트를 삽입하여 방문자를 모니터링합니다. > ✅ **SQLite 데이터베이스에 모든 데이터가 영구 저장됩니다.** """) # 숨겨진 API 처리 (올바른 방식) with gr.Column(visible=False): api_input = gr.Textbox(label="API Input") api_output = gr.JSON(label="API Output") api_button = gr.Button("Process Tracking Data") # Button click에 api_name 지정 api_button.click( fn=process_tracking_data, inputs=api_input, outputs=api_output, api_name="process_tracking" ) with gr.Tab("🔧 추적 스크립트 생성"): gr.Markdown("### 새로운 추적 스크립트 생성") with gr.Row(): with gr.Column(): server_url_input = gr.Textbox( label="추적 서버 URL", placeholder="https://your-space-name.hf.space", info="이 Hugging Face Space의 URL을 입력하세요" ) gr.Markdown(""" ℹ️ **서버 URL 찾기**: 1. 이 Space의 URL을 확인하세요 2. 형식: `https://username-spacename.hf.space` 3. 예시: `https://seawolf2357-evcook.hf.space` """) create_btn = gr.Button("🔧 새 추적 코드 생성", variant="primary") with gr.Row(): site_id_output = gr.Textbox(label="사이트 ID", interactive=False) script_output = gr.Code( label="HTML에 삽입할 추적 스크립트", language="html", interactive=True, elem_classes="tracking-script" ) create_btn.click( fn=create_site_id, inputs=[server_url_input], outputs=[site_id_output, script_output], api_name="create_site_id" ) with gr.Tab("📊 통계 대시보드"): gr.Markdown("### 📊 실시간 방문자 통계") with gr.Row(): refresh_btn = gr.Button("🔄 통계 새로고침", variant="secondary") test_btn = gr.Button("🧪 테스트 데이터 생성", variant="secondary") db_info_btn = gr.Button("💾 DB 정보 보기", variant="secondary") stats_output = gr.Markdown() test_output = gr.Textbox(label="테스트 결과", visible=False) db_info_output = gr.Markdown(visible=False, elem_classes="db-info") # 페이지 로드 시 통계 표시 demo.load(fn=view_statistics, outputs=stats_output, api_name="view_statistics") refresh_btn.click(fn=view_statistics, outputs=stats_output, api_name="view_statistics_1") def test_and_refresh(): test_result = test_tracking() stats = view_statistics() return stats, gr.update(visible=True, value=test_result) test_btn.click( fn=test_and_refresh, outputs=[stats_output, test_output], api_name="test_and_refresh" ) def show_db_info(): info = get_db_info() return gr.update(visible=True, value=info) db_info_btn.click( fn=show_db_info, outputs=db_info_output, api_name="show_db_info" ) with gr.Tab("📁 데이터 뷰어"): gr.Markdown("### 📁 최근 추적 데이터") data_refresh_btn = gr.Button("🔄 데이터 새로고침") data_output = gr.Dataframe( headers=["Timestamp", "Device ID", "Event Type", "Page URL", "Platform", "Language", "Site ID"], label="최근 이벤트 (최대 200개)" ) demo.load(fn=get_recent_data, outputs=data_output, api_name="get_recent_data") data_refresh_btn.click(fn=get_recent_data, outputs=data_output, api_name="get_recent_data_1") gr.Markdown("### 📥 데이터 내보내기") with gr.Row(): start_date = gr.Textbox( label="시작 날짜", placeholder="2025-01-01", value=datetime.now().strftime('%Y-%m-%d') ) end_date = gr.Textbox( label="종료 날짜", placeholder="2025-01-31", value=datetime.now().strftime('%Y-%m-%d') ) export_btn = gr.Button("💾 CSV로 내보내기", variant="primary") export_output = gr.Textbox(label="내보내기 결과") export_btn.click( fn=export_data, inputs=[start_date, end_date], outputs=[gr.File(label="다운로드 파일"), export_output], api_name="export_data" ) with gr.Tab("❓ 도움말"): gr.Markdown(""" ### 🚀 빠른 시작 가이드 1. **이 Space의 URL 확인** - 브라우저 주소창에서 URL 복사 - 예: `https://username-spacename.hf.space` 2. **추적 스크립트 생성** - "추적 스크립트 생성" 탭으로 이동 - URL 입력 후 스크립트 생성 3. **웹사이트에 설치** ```html ``` 4. **추적 확인** - "통계 대시보드" 탭에서 실시간 모니터링 ### 💾 데이터 저장 - **SQLite DB 파일**: `tracking_data.db` - **자동 저장**: 모든 이벤트가 실시간으로 DB에 저장 - **영구 보관**: 서버 재시작 후에도 데이터 유지 ### 📡 API 사용법 ```javascript // Gradio Client API 사용 fetch('https://your-space.hf.space/call/process_tracking', { method: 'POST', headers: {'Content-Type': 'application/json'}, body: JSON.stringify({ data: [JSON.stringify({ siteId: 'your-site-id', deviceId: 'device-id', eventType: 'pageview', // ... 기타 데이터 })] }) }); ``` """) # 앱 실행 if __name__ == "__main__": demo.queue().launch() # requirements.txt 내용: """ gradio==4.44.1 pandas==2.2.3 """