Codettev3 / app.py
Raiff1982's picture
Upload 20 files
19a14ad verified
raw
history blame
11.2 kB
import os
import json
import asyncio
import logging
import psutil
import random
import re
import sqlite3
from typing import Dict, List, Optional, Any
from cryptography.fernet import Fernet
import tkinter as tk
from tkinter import scrolledtext, messagebox
from threading import Thread, Lock
import numpy as np
from collections import deque
from sklearn.ensemble import IsolationForest
import time
from werkzeug.security import generate_password_hash, check_password_hash
from openai import AsyncOpenAI
# Initialize async OpenAI client
try:
aclient = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
except Exception as e:
logger.error(f"Failed to initialize OpenAI client: {e}")
aclient = None
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class EnhancedAIConfig:
"""Advanced configuration manager with encryption and validation"""
_DEFAULTS = {
"model": "gpt-4-turbo",
"safety_thresholds": {
"memory": 85,
"cpu": 90,
"response_time": 2.0
},
"defense_strategies": ["evasion", "adaptability", "barrier"],
"cognitive_modes": ["scientific", "creative", "emotional"]
}
def __init__(self, config_path: str = "ai_config.json"):
self.config = self._load_config(config_path)
self._validate()
self.encryption = self._init_encryption()
def _load_config(self, path: str) -> Dict:
try:
with open(path, 'r') as f:
return self._merge_configs(json.load(f))
except (FileNotFoundError, json.JSONDecodeError):
return self._DEFAULTS
def _merge_configs(self, user_config: Dict) -> Dict:
merged = self._DEFAULTS.copy()
for key in user_config:
if isinstance(user_config[key], dict):
merged[key].update(user_config[key])
else:
merged[key] = user_config[key]
return merged
def _validate(self):
if not all(isinstance(mode, str) for mode in self.config["cognitive_modes"]):
raise ValueError("Invalid cognitive mode configuration")
class SecureDatabase:
"""Thread-safe SQLite database manager"""
def __init__(self, db_path: str = "ai_system.db"):
self.db_path = db_path
self.lock = Lock()
self._init_db()
def _init_db(self):
with self.lock, sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
username TEXT UNIQUE,
password_hash TEXT
)""")
conn.execute("""
CREATE TABLE IF NOT EXISTS interactions (
id INTEGER PRIMARY KEY,
user_id INTEGER,
query TEXT,
response TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(user_id) REFERENCES users(id)
)""")
def create_user(self, username: str, password: str):
with self.lock, sqlite3.connect(self.db_path) as conn:
conn.execute("INSERT INTO users (username, password_hash) VALUES (?, ?)",
(username, generate_password_hash(password)))
def authenticate(self, username: str, password: str) -> bool:
with self.lock, sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT password_hash FROM users WHERE username = ?", (username,))
result = cursor.fetchone()
return result and check_password_hash(result[0], password)
class DefenseSystem:
"""Advanced threat mitigation framework"""
STRATEGIES = {
"evasion": lambda x: re.sub(r'\b\d{4}\b', '****', x),
"adaptability": lambda x: x + "\n[System optimized response]",
"barrier": lambda x: x.replace("malicious", "safe")
}
def __init__(self, strategies: List[str]):
self.active_strategies = [self.STRATEGIES[s] for s in strategies if s in self.STRATEGIES]
def apply_defenses(self, text: str) -> str:
for strategy in self.active_strategies:
text = strategy(text)
return text
class CognitiveProcessor:
"""Multi-perspective analysis engine"""
MODES = {
"scientific": lambda q: f"Scientific Analysis: {q} demonstrates fundamental principles",
"creative": lambda q: f"Creative Insight: {q} suggests innovative approaches",
"emotional": lambda q: f"Emotional Interpretation: {q} conveys hopeful intent"
}
def __init__(self, modes: List[str]):
self.active_modes = [self.MODES[m] for m in modes if m in self.MODES]
def generate_insights(self, query: str) -> List[str]:
return [mode(query) for mode in self.active_modes]
class HealthMonitor:
"""Real-time system diagnostics with anomaly detection"""
def __init__(self):
self.metrics = deque(maxlen=100)
self.model = IsolationForest(n_estimators=100)
self.lock = Lock()
async def check_status(self) -> Dict:
status = {
"memory": psutil.virtual_memory().percent,
"cpu": psutil.cpu_percent(),
"response_time": await self._measure_latency()
}
with self.lock:
self.metrics.append(status)
self._detect_anomalies()
return status
async def _measure_latency(self) -> float:
start = time.monotonic()
await asyncio.sleep(0.1)
return time.monotonic() - start
def _detect_anomalies(self):
if len(self.metrics) > 50:
data = np.array([[m["memory"], m["cpu"], m["response_time"]] for m in self.metrics])
self.model.fit(data)
class AICoreSystem:
"""Main AI orchestration framework"""
def __init__(self):
self.config = EnhancedAIConfig()
self.db = SecureDatabase()
self.defense = DefenseSystem(self.config.config["defense_strategies"])
self.cognition = CognitiveProcessor(self.config.config["cognitive_modes"])
self.health = HealthMonitor()
self.running = True
async def process_query(self, query: str, user: str) -> Dict:
try:
# Security check
if not query.strip():
return {"error": "Empty query"}
# Generate response
response = await self._generate_openai_response(query)
# Apply security measures
secured_response = self.defense.apply_defenses(response)
# Add cognitive insights
insights = self.cognition.generate_insights(query)
# Get system health
health_status = await self.health.check_status()
return {
"response": secured_response,
"insights": insights,
"health": health_status,
"security": len(self.config.config["defense_strategies"])
}
except Exception as e:
logger.error(f"Processing error: {e}")
return {"error": "System error occurred"}
async def _generate_openai_response(self, query: str) -> str:
if aclient is None:
raise RuntimeError("OpenAI client is not initialized")
response = await aclient.chat.completions.create(
model=self.config.config["model"],
messages=[{"role": "user", "content": query}],
max_tokens=2000
)
return response.choices[0].message.content
class AIApplication(tk.Tk):
"""Enhanced GUI with async integration"""
def __init__(self):
super().__init__()
self.ai = AICoreSystem()
self.title("Advanced AI Assistant")
self._init_ui()
self._start_event_loop()
def _init_ui(self):
"""Initialize user interface components"""
self.geometry("800x600")
# Authentication Frame
self.auth_frame = tk.Frame(self)
self.username = tk.Entry(self.auth_frame, width=30)
self.password = tk.Entry(self.auth_frame, show="*", width=30)
tk.Button(self.auth_frame, text="Login", command=self._login).grid(row=0, column=2)
tk.Button(self.auth_frame, text="Register", command=self._register).grid(row=0, column=3)
self.auth_frame.pack(pady=10)
# Query Interface
self.query_entry = tk.Entry(self, width=80)
self.query_entry.pack(pady=10)
tk.Button(self, text="Submit", command=self._submit_query).pack()
# Response Display
self.response_area = scrolledtext.ScrolledText(self, width=100, height=25)
self.response_area.pack(pady=10)
# Status Bar
self.status = tk.Label(self, text="System Ready", bd=1, relief=tk.SUNKEN)
self.status.pack(side=tk.BOTTOM, fill=tk.X)
def _start_event_loop(self):
"""Initialize async event processing"""
self.loop = asyncio.new_event_loop()
Thread(target=self._run_async_tasks, daemon=True).start()
def _run_async_tasks(self):
"""Run async tasks in background thread"""
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def _login(self):
"""Handle user login"""
username = self.username.get()
password = self.password.get()
if self.ai.db.authenticate(username, password):
self.status.config(text=f"Logged in as {username}")
else:
messagebox.showerror("Error", "Invalid credentials")
def _register(self):
"""Handle user registration"""
username = self.username.get()
password = self.password.get()
try:
self.ai.db.create_user(username, password)
messagebox.showinfo("Success", "Registration complete")
except sqlite3.IntegrityError:
messagebox.showerror("Error", "Username already exists")
def _submit_query(self):
"""Handle query submission"""
query = self.query_entry.get()
if not query:
return
async def process():
result = await self.ai.process_query(query, self.username.get())
self.response_area.insert(tk.END, f"Response: {result.get('response', '')}\n\n")
self.status.config(text=f"Security Level: {result.get('security', 0)}")
asyncio.run_coroutine_threadsafe(process(), self.loop)
def on_closing(self):
"""Clean shutdown handler"""
self.ai.running = False
self.loop.call_soon_threadsafe(self.loop.stop)
self.destroy()
if __name__ == "__main__":
app = AIApplication()
app.protocol("WM_DELETE_WINDOW", app.on_closing)
app.mainloop()