# -*- coding: utf-8 -*- """SERENITYAI Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1LV3l6IWVK64-7RI2C7wEiW9r7ghx9d-o """ # %% Cell 1 - Model Initialization with Checkpoint Saving # %% Cell 1 - Model Initialization with Checkpoint Saving import torch from unsloth import FastLanguageModel import os # Configuration model_name = "unsloth/llama-3-8B-bnb-4bit" max_seq_length = 2048 dtype = torch.float32 # ✅ Change to float32 for CPU checkpoint_dir = "./serenity_checkpoints/initial_checkpoint" os.makedirs(checkpoint_dir, exist_ok=True) # Load model with optimized configuration for CPU model, tokenizer = FastLanguageModel.from_pretrained( model_name=model_name, max_seq_length=max_seq_length, dtype=dtype, load_in_4bit=False, # ✅ Disable 4-bit quantization for CPU device_map="cpu", # ✅ Force CPU usage rope_scaling={"type": "dynamic", "factor": 2.0}, ) # Apply LoRA configuration model = FastLanguageModel.get_peft_model( model, r=64, target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"], lora_alpha=64, lora_dropout=0.1, bias="none", use_gradient_checkpointing="unsloth", random_state=3407, max_seq_length=max_seq_length, use_rslora=True, loftq_config={}, ) # %% Cell 2 - Save Initial Checkpoint # Save full model configuration model.save_pretrained(checkpoint_dir) tokenizer.save_pretrained(checkpoint_dir) # Save special formats model.save_pretrained_merged( os.path.join(checkpoint_dir, "merged_16bit"), tokenizer, save_method="merged_16bit", push_to_hub=False ) model.save_pretrained_merged( os.path.join(checkpoint_dir, "4bit_lora"), tokenizer, save_method="lora", push_to_hub=False ) print(f""" Checkpoint saved with structure: {checkpoint_dir} ├── config.json ├── generation_config.json ├── pytorch_model.bin ├── special_tokens_map.json ├── tokenizer_config.json ├── tokenizer.model ├── merged_16bit │ └── (16-bit merged model files) └── 4bit_lora └── (4-bit LoRA adapter files) """) # %% Cell 3 - Verification Load Test def load_from_checkpoint(checkpoint_path): return FastLanguageModel.from_pretrained( model_name=checkpoint_path, max_seq_length=max_seq_length, dtype=dtype, load_in_4bit=False, # ✅ Ensure 4-bit is off for CPU device_map="cpu", ) # Test loading loaded_model, loaded_tokenizer = load_from_checkpoint(checkpoint_dir) print("Checkpoint loaded successfully!") # Example inference prompt = "User: How can I preserve my mental energy throughout the day?\nAI:" inputs = loaded_tokenizer(prompt, return_tensors="pt").to("cpu") # ✅ Move to CPU outputs = loaded_model.generate(**inputs, max_new_tokens=100) print(loaded_tokenizer.decode(outputs[0], skip_special_tokens=True)) import os import json import datetime import uuid import numpy as np from typing import List, Dict, Any, Optional, Union from cryptography.fernet import Fernet # For encryption from transformers import LlamaTokenizer from langchain.embeddings import HuggingFaceEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores import Chroma from langchain.document_loaders import TextLoader, DirectoryLoader from langchain.schema import Document from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers.document_compressors import EmbeddingsFilter from sentence_transformers import CrossEncoder # For re-ranking from chromadb.config import Settings # Importing Settings # Configuration CHROMA_PERSIST_DIR = os.environ.get("CHROMA_PERSIST_DIR", "./chroma_db") CONVERSATIONS_DIR = os.environ.get("CONVERSATIONS_DIR", "./conversations") KNOWLEDGE_DIR = os.environ.get("KNOWLEDGE_DIR", "./knowledge") EMBEDDINGS_MODEL = os.environ.get("EMBEDDINGS_MODEL", "sentence-transformers/all-mpnet-base-v2") RERANKER_MODEL = "cross-encoder/ms-marco-MiniLM-L-6-v2" TOP_K_RETRIEVAL = 10 TOP_K_RERANK = 3 ENCRYPTION_KEY = Fernet.generate_key() # Store securely in production class AdvancedSerenityMemorySystem: def __init__(self, tokenizer=None): """Initialize advanced memory system with enhanced features""" # Initialize embedding model with GPU support self.embeddings = HuggingFaceEmbeddings( model_name=EMBEDDINGS_MODEL, model_kwargs={"device": "cuda"}, encode_kwargs={"normalize_embeddings": True} ) # Initialize cross-encoder for re-ranking self.reranker = CrossEncoder(RERANKER_MODEL, device="cuda") # Initialize text splitter with dialog-aware chunking self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=512, chunk_overlap=64, separators=["\n\n", "\n", "(Assistant):", "(User):", ". ", " ", ""] ) # Initialize ChromaDB with advanced configuration self._init_vector_stores() # Initialize encryption self.cipher = Fernet(ENCRYPTION_KEY) # Store tokenizer for context length management self.tokenizer = tokenizer # Load knowledge base self._initialize_knowledge_base() def _init_vector_stores(self): # Fixed indentation """Initialize vector stores with optimized settings""" # Create a ChromaDB Settings object chroma_settings = Settings( anonymized_telemetry=False, allow_reset=True, # Adapt optimizer_config to the new structure # optimizer_config={"hnsw": {"space": "cosine"}}, persist_directory=CHROMA_PERSIST_DIR, ) self.conversation_db = Chroma( collection_name="conversations", embedding_function=self.embeddings, client_settings=chroma_settings # Use Settings object ) self.knowledge_db = Chroma( collection_name="knowledge", embedding_function=self.embeddings, client_settings=chroma_settings # Use Settings object ) def _initialize_knowledge_base(self): """Enhanced knowledge base initialization with validation""" if self.knowledge_db._collection.count() == 0: try: # Load from directory or create default if os.path.exists(KNOWLEDGE_DIR): loader = DirectoryLoader( KNOWLEDGE_DIR, glob="**/*.txt", loader_cls=TextLoader, recursive=True ) documents = loader.load() # Advanced document cleaning cleaned_docs = self._clean_documents(documents) # Process and chunk documents chunks = self.text_splitter.split_documents(cleaned_docs) self.knowledge_db.add_documents(chunks) print(f"Initialized knowledge base with {len(chunks)} chunks") # Create default knowledge if empty if self.knowledge_db._collection.count() == 0: self._create_default_knowledge() except Exception as e: print(f"Knowledge base init error: {str(e)}") raise def _clean_documents(self, documents: List[Document]) -> List[Document]: """Advanced document cleaning pipeline""" cleaned = [] for doc in documents: try: # Remove special characters and normalize whitespace content = " ".join(doc.page_content.split()) content = content.replace("\x00", "") # Remove null bytes # Remove sensitive patterns (simple PII detection) content = re.sub(r"\b\d{3}-\d{2}-\d{4}\b", "[REDACTED]", content) # Update document content cleaned.append(Document( page_content=content, metadata=doc.metadata )) except Exception as e: print(f"Document cleaning error: {str(e)}") return cleaned def _create_default_knowledge(self): """Enhanced default knowledge creation""" # Define default knowledge content here default_knowledge = [ """Serenity AI is designed to provide support and guidance for mental health and emotional well-being. It is not a substitute for professional medical advice. If you are experiencing a crisis or need immediate help, please contact a qualified mental health professional or emergency services.""", """Anxiety is a common human emotion characterized by feelings of worry, nervousness, or unease, typically about an event or something with an uncertain outcome. It can manifest as physical symptoms like a racing heart, shortness of breath, or sweating.""", # Add more default knowledge entries as needed ] try: # Split the default knowledge into chunks chunks = self.text_splitter.create_documents(default_knowledge, metadatas=[{'source': 'default'}] * len(default_knowledge)) # Add with batch processing self.knowledge_db.add_documents(chunks, batch_size=32) except Exception as e: print(f"Default knowledge error: {str(e)}") raise def store_conversation(self, user_id: str, conversation: List[Dict[str, str]]): """Enhanced conversation storage with security features""" try: # Anonymize user ID anonymized_id = self._hash_user_id(user_id) # Encrypt conversation content encrypted_conv = self._encrypt_data(json.dumps(conversation)) # Create document with enhanced metadata doc = Document( page_content=encrypted_conv, metadata={ "user_hash": anonymized_id, "timestamp": datetime.datetime.now().isoformat(), "message_count": len(conversation), "language": "en", # Add language detection "sentiment": self._analyze_sentiment(conversation) } ) # Store in vector DB self.conversation_db.add_documents([doc]) # Securely store raw data self._secure_store_raw(anonymized_id, conversation) return True except Exception as e: print(f"Conversation storage error: {str(e)}") return False def _hash_user_id(self, user_id: str) -> str: """Pseudonymize user IDs""" return hashlib.sha256(user_id.encode()).hexdigest() def _encrypt_data(self, data: str) -> str: """Encrypt sensitive data""" return self.cipher.encrypt(data.encode()).decode() def _decrypt_data(self, encrypted_data: str) -> str: """Decrypt encrypted data""" return self.cipher.decrypt(encrypted_data.encode()).decode() def _secure_store_raw(self, user_hash: str, conversation: list): """Secure storage with encryption and access control""" try: file_path = os.path.join( CONVERSATIONS_DIR, f"{user_hash}_{uuid.uuid4()}.enc" ) with open(file_path, "w") as f: f.write(self._encrypt_data(json.dumps(conversation))) except Exception as e: print(f"Secure storage error: {str(e)}") def retrieve_relevant_context(self, query: str, user_hash: str = None) -> str: """Advanced hybrid retrieval with re-ranking""" try: # Step 1: Initial retrieval knowledge_results = self.knowledge_db.max_marginal_relevance_search( query, k=TOP_K_RETRIEVAL ) conversation_results = [] if user_hash: conversation_results = self.conversation_db.similarity_search( query, k=TOP_K_RETRIEVAL, filter={"user_hash": user_hash} ) # Step 2: Re-ranking with cross-encoder combined_results = knowledge_results + conversation_results pairs = [(query, doc.page_content) for doc in combined_results] scores = self.reranker.predict(pairs) # Sort documents by reranking scores sorted_docs = [doc for _, doc in sorted( zip(scores, combined_results), reverse=True )][:TOP_K_RERANK] # Step 3: Context compression context = self._compress_context(sorted_docs, query) # Step 4: Token length validation if self.tokenizer: context = self._truncate_to_token_limit(context, 512) # Adjust based on model return context except Exception as e: print(f"Retrieval error: {str(e)}") return "" def _compress_context(self, documents: List[Document], query: str) -> str: """Dynamic context compression based on relevance""" context = [] for doc in documents: content = self._decrypt_data(doc.page_content) if doc.metadata.get("encrypted", False) else doc.page_content context.append( f"Source: {doc.metadata.get('title', 'Conversation')}\n" f"Relevance: {doc.metadata.get('score', 0.0):.2f}\n" f"Content: {content}\n" ) return "\n".join(context) def _truncate_to_token_limit(self, text: str, max_tokens: int) -> str: """Ensure context fits within model's token limit""" tokens = self.tokenizer.encode(text) if len(tokens) > max_tokens: truncated = self.tokenizer.decode(tokens[:max_tokens]) return truncated + "\n[Context truncated due to length]" return text def enhance_prompt_with_rag(self, user_message: str, user_hash: str) -> str: """Advanced prompt engineering for Llama-3""" context = self.retrieve_relevant_context(user_message, user_hash) return f"""<|begin_of_text|> <|start_header_id|>system<|end_header_id|> You are Serenity AI, an advanced mental health assistant. Use this context: {context} Guidelines: 1. Combine therapeutic techniques with practical advice 2. Maintain strict ethical boundaries 3. Use evidence-based interventions 4. Detect and escalate crisis situations 5. Adapt communication style to user's emotional state<|eot_id|> <|start_header_id|>user<|end_header_id|> {user_message}<|eot_id|> <|start_header_id|>assistant<|end_header_id|> """ def real_time_knowledge_update(self, new_info: Dict[str, str]): """Handle dynamic knowledge updates""" try: # Validate input if not self._validate_knowledge_entry(new_info): return False # Create document doc = Document( page_content=new_info["content"], metadata={ "title": new_info["title"], "source": "dynamic_update", "timestamp": datetime.datetime.now().isoformat() } ) # Process and add to knowledge base chunks = self.text_splitter.split_documents([doc]) self.knowledge_db.add_documents(chunks) # Update vector index self.knowledge_db._collection.create_index() return True except Exception as e: print(f"Knowledge update error: {str(e)}") return False def _validate_knowledge_entry(self, entry: Dict) -> bool: """Validate new knowledge entries""" required = ["title", "content"] return all(key in entry for key in required) and len(entry["content"]) > 50 def analyze_conversation_patterns(self, user_hash: str): """Advanced conversation analysis""" try: # Retrieve user's conversation history docs = self.conversation_db.get( where={"user_hash": user_hash}, include=["metadatas", "documents"] ) # Perform sentiment analysis over time sentiments = [doc["sentiment"] for doc in docs["metadatas"]] # Build emotional profile profile = { "sentiment_trend": np.mean(sentiments), "common_topics": self._extract_topics(docs["documents"]), "crisis_signals": self._detect_crisis_signals(docs["documents"]) } return profile except Exception as e: print(f"Analysis error: {str(e)}") return {} def _extract_topics(self, documents: List[str]) -> List[str]: """Simple topic extraction (implement with proper NLP model)""" # Placeholder - integrate with LDA or BERTopic return ["general anxiety", "relationships", "work stress"] def _detect_crisis_signals(self, documents: List[str]) -> List[str]: """Crisis detection logic""" # Placeholder - implement with regex or ML model crisis_keywords = ["suicide", "self harm", "kill myself"] return [doc for doc in documents if any(kw in doc.lower() for kw in crisis_keywords)] # Usage Example memory_system = AdvancedSerenityMemorySystem(tokenizer=tokenizer) import os import uuid import logging import torch import gradio as gr from groq import Groq from datetime import datetime from pathlib import Path from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline # --- Configuration --- class Config: GROQ_API_KEY = "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" # Replace with your key WHISPER_MODEL = "distil-whisper/distil-small.en" # Using distil-whisper instead GROQ_MODEL = "llama-3.1-8b-instant" PORT = 7820 APP_NAME = "Serenity Mental Health Assistant" APP_DESCRIPTION = "Your compassionate AI companion for mental wellness support" # Color scheme COLORS = { "primary": "#6366f1", "secondary": "#3b82f6", "tertiary": "#8b5cf6", "accent": "#10b981", "light_bg": "#f0f7ff", "dark_bg": "#1e293b", "success": "#22c55e", "warning": "#f59e0b", "error": "#ef4444", "text_light": "#f8fafc", "text_dark": "#334155" } # --- Service Initialization --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("SerenityAI") # Initialize services try: # Set up device and dtype device = "cuda:0" if torch.cuda.is_available() else "cpu" torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 # Load distil-whisper model asr_model = AutoModelForSpeechSeq2Seq.from_pretrained( Config.WHISPER_MODEL, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True ) asr_model.to(device) # Load processor and create pipeline processor = AutoProcessor.from_pretrained(Config.WHISPER_MODEL) asr_pipeline = pipeline( "automatic-speech-recognition", model=asr_model, tokenizer=processor.tokenizer, feature_extractor=processor.feature_extractor, max_new_tokens=400, torch_dtype=torch_dtype, device=device, ) logger.info(f"Loaded Distil-Whisper on {device}") except Exception as e: logger.error(f"ASR model init failed: {e}") asr_pipeline = None try: groq_client = Groq(api_key=Config.GROQ_API_KEY) if Config.GROQ_API_KEY else None logger.info("Groq client initialized") if groq_client else logger.error("Groq client not initialized") except Exception as e: logger.error(f"Groq init failed: {e}") groq_client = None # --- Core Functions --- def process_message(message, history, system_prompt): """Process messages for both text and voice chat""" if not groq_client: return "Error: AI service unavailable. Check API key and connection." try: messages = [{"role": "system", "content": system_prompt}] # Add history to messages for user, ai in history: messages.append({"role": "user", "content": user}) messages.append({"role": "assistant", "content": ai}) # Add current message messages.append({"role": "user", "content": message}) # Call Groq API response = groq_client.chat.completions.create( model=Config.GROQ_MODEL, messages=messages, temperature=0.7, max_tokens=1200 ) return response.choices[0].message.content except Exception as e: logger.error(f"API Error: {e}") return f"Service error: {str(e)}" def transcribe_audio(audio_path): """Transcribe audio using the ASR pipeline""" if not audio_path: logger.error("No audio file received") return "Error: No audio file received" if not asr_pipeline: logger.error("Speech recognition service unavailable") return "Error: Speech recognition service unavailable" try: logger.info(f"Transcribing audio from {audio_path}") # Validate if the file exists and is readable if not os.path.exists(audio_path): logger.error(f"Audio file not found: {audio_path}") return "Error: Audio file not found" # Use the distil-whisper pipeline for transcription result = asr_pipeline(audio_path) transcription = result["text"].strip() logger.info(f"Transcription result: {transcription}") return transcription except Exception as e: logger.error(f"Transcription failed: {e}") return f"Transcription error: {str(e)}" # --- Usage Analytics --- def get_analytics_data(): # Placeholder for real analytics - you can expand this return { "sessions": 127, "messages": 1432, "voice_interactions": 89, "avg_session_length": "4m 32s", "sentiment_positive": 68, "sentiment_neutral": 24, "sentiment_negative": 8 } def format_analytics(): data = get_analytics_data() colors = Config.COLORS # Create more advanced analytics with visual elements analytics_html = f"""
{data['sessions']}
Active Engagements
{data['messages']}
Conversations
{data['voice_interactions']}
Audio Messages
{data['avg_session_length']}
Engagement Time
{Config.APP_DESCRIPTION}
Important: Serenity is not a replacement for professional mental health services. In case of emergency, please contact your local crisis helpline.
National Suicide Prevention Lifeline: 988
Your conversations are private and secure
help@serenity-assistant.com
Serenity Mental Health Assistant is designed for support, not to replace professional mental health services.
© 2025 Serenity AI. All rights reserved.
Speak freely with Serenity using voice interactions. Just click the microphone, share what's on your mind, and I'll be here to listen and respond.