import gradio as gr import os import threading import time from pathlib import Path from huggingface_hub import hf_hub_download, login, list_repo_files # Try to import llama-cpp-python, fallback to instructions if not available try: from llama_cpp import Llama LLAMA_CPP_AVAILABLE = True except ImportError: LLAMA_CPP_AVAILABLE = False print("llama-cpp-python not installed. Please install it with: pip install llama-cpp-python") # Global variables for model model = None model_loaded = False # Default system prompt DEFAULT_SYSTEM_PROMPT = """You are MMed-Llama-Alpaca, a helpful AI assistant specialized in medical and healthcare topics. You provide accurate, evidence-based information while being empathetic and understanding. Important guidelines: - Always remind users that your responses are for educational purposes only - Encourage users to consult healthcare professionals for medical advice - Be thorough but clear in your explanations - If unsure about medical information, acknowledge limitations - Maintain a professional yet caring tone""" # HuggingFace repository information HF_REPO_ID = "Axcel1/MMed-llama-alpaca-Q4_K_M-GGUF" HF_FILENAME = "mmed-llama-alpaca-q4_k_m.gguf" hf_token = os.environ.get("HF_TOKEN") if hf_token: login(token=hf_token) def find_gguf_file(directory="."): """Find GGUF files in the specified directory""" gguf_files = [] for root, dirs, files in os.walk(directory): for file in files: if file.endswith('.gguf'): gguf_files.append(os.path.join(root, file)) return gguf_files def get_repo_gguf_files(repo_id=HF_REPO_ID): """Get all GGUF files from the HuggingFace repository""" try: print(f"Fetching file list from {repo_id}...") files = list_repo_files(repo_id=repo_id, token=hf_token) gguf_files = [f for f in files if f.endswith('.gguf')] print(f"Found {len(gguf_files)} GGUF files in repository") return gguf_files, None except Exception as e: error_msg = f"Error fetching repository files: {str(e)}" print(error_msg) return [], error_msg def download_model_from_hf(repo_id=HF_REPO_ID, filename=HF_FILENAME): """Download GGUF model from HuggingFace Hub""" try: print(f"Downloading model from {repo_id}/{filename}...") gguf_path = hf_hub_download( repo_id=repo_id, filename=filename, cache_dir="./models", resume_download=True # Resume partial downloads ) print(f"Model downloaded to: {gguf_path}") return gguf_path, None except Exception as e: error_msg = f"Error downloading model: {str(e)}" print(error_msg) return None, error_msg def get_optimal_settings(): """Get optimal CPU threads and GPU layers automatically""" # Auto-detect CPU threads (use all available cores) n_threads = os.cpu_count() # For Hugging Face Spaces, limit threads to avoid resource issues if n_threads and n_threads > 4: n_threads = 4 # Auto-detect GPU layers (try to use GPU if available) n_gpu_layers = 0 try: # Try to detect if CUDA is available import subprocess result = subprocess.run(['nvidia-smi'], capture_output=True, text=True) if result.returncode == 0: # NVIDIA GPU detected, use more layers n_gpu_layers = 35 # Good default for Llama-3-8B except: # No GPU or CUDA not available n_gpu_layers = 0 return n_threads, n_gpu_layers def load_model_from_gguf(gguf_path=None, filename=None, n_ctx=2048, use_hf_download=True): """Load the model from a GGUF file with automatic optimization""" global model, model_loaded if not LLAMA_CPP_AVAILABLE: return False, "llama-cpp-python not installed. Please install it with: pip install llama-cpp-python" try: # If no path provided, try different approaches if gguf_path is None: if use_hf_download: # Use the specified filename or default selected_filename = filename if filename else HF_FILENAME # Try to download from HuggingFace first gguf_path, error = download_model_from_hf(filename=selected_filename) if error: return False, f"āŒ Failed to download from HuggingFace: {error}" else: # Try to find local GGUF files gguf_files = find_gguf_file() if not gguf_files: return False, "No GGUF files found in the repository" gguf_path = gguf_files[0] # Use the first one found print(f"Found local GGUF file: {gguf_path}") # Check if file exists if not os.path.exists(gguf_path): return False, f"GGUF file not found: {gguf_path}" print(f"Loading model from: {gguf_path}") # Get optimal settings automatically n_threads, n_gpu_layers = get_optimal_settings() print(f"Auto-detected settings: {n_threads} CPU threads, {n_gpu_layers} GPU layers") # Load model with optimized settings for Hugging Face Spaces model = Llama( model_path=gguf_path, n_ctx=n_ctx, # Context window (configurable) n_threads=n_threads, # CPU threads (limited for Spaces) n_gpu_layers=n_gpu_layers, # Number of layers to offload to GPU verbose=False, chat_format="llama-3", # Use Llama-3 chat format n_batch=256, # Smaller batch size for Spaces use_mlock=False, # Disabled for Spaces compatibility use_mmap=True, # Use memory mapping ) model_loaded = True selected_filename = filename if filename else os.path.basename(gguf_path) print("Model loaded successfully!") return True, f"āœ… Model loaded successfully: {selected_filename}\nšŸ“Š Context: {n_ctx} tokens\nšŸ–„ļø CPU Threads: {n_threads}\nšŸŽ® GPU Layers: {n_gpu_layers}\nšŸ“¦ Source: {HF_REPO_ID}" except Exception as e: model_loaded = False error_msg = f"Error loading model: {str(e)}" print(error_msg) return False, f"āŒ {error_msg}" def generate_response_stream(message, history, system_prompt, max_tokens=512, temperature=0.7, top_p=0.9, repeat_penalty=1.1): """Generate response from the model with streaming""" global model, model_loaded if not model_loaded or model is None: yield "Error: Model not loaded. Please load the model first." return try: # Format the conversation history for Llama-3 conversation = [] # Add system prompt if provided if system_prompt and system_prompt.strip(): conversation.append({"role": "system", "content": system_prompt.strip()}) # Add conversation history for human, assistant in history: conversation.append({"role": "user", "content": human}) if assistant: # Only add if assistant response exists conversation.append({"role": "assistant", "content": assistant}) # Add current message conversation.append({"role": "user", "content": message}) # Generate response with streaming response = "" stream = model.create_chat_completion( messages=conversation, max_tokens=max_tokens, temperature=temperature, top_p=top_p, repeat_penalty=repeat_penalty, stream=True, stop=["<|eot_id|>", "<|end_of_text|>"] ) for chunk in stream: if chunk['choices'][0]['delta'].get('content'): new_text = chunk['choices'][0]['delta']['content'] response += new_text yield response except Exception as e: yield f"Error generating response: {str(e)}" def chat_interface(message, history, system_prompt, max_tokens, temperature, top_p, repeat_penalty): """Main chat interface function""" if not message.strip(): return history, "" if not model_loaded: history.append((message, "Please load the model first using the 'Load Model' button.")) return history, "" # Add user message to history history = history + [(message, "")] # Generate response for response in generate_response_stream(message, history[:-1], system_prompt, max_tokens, temperature, top_p, repeat_penalty): history[-1] = (message, response) yield history, "" def clear_chat(): """Clear the chat history""" return [], "" def reset_system_prompt(): """Reset system prompt to default""" return DEFAULT_SYSTEM_PROMPT def load_model_interface(context_size, selected_model): """Interface function to load model with configurable context size""" success, message = load_model_from_gguf(gguf_path=None, filename=selected_model, n_ctx=int(context_size), use_hf_download=True) return message def refresh_model_list(): """Refresh the list of available GGUF models from the repository""" gguf_files, error = get_repo_gguf_files() if error: return gr.Dropdown(choices=["Error loading models"], value="Error loading models") if not gguf_files: return gr.Dropdown(choices=["No GGUF files found"], value="No GGUF files found") # Set default value to the original default file if it exists default_value = HF_FILENAME if HF_FILENAME in gguf_files else gguf_files[0] return gr.Dropdown(choices=gguf_files, value=default_value) def get_available_gguf_files(): """Get list of available GGUF files""" gguf_files = find_gguf_file() if not gguf_files: return ["No local GGUF files found"] return [os.path.basename(f) for f in gguf_files] def check_model_availability(): """Check if model is available locally or needs to be downloaded""" local_files = find_gguf_file() if local_files: return f"Local GGUF files found: {len(local_files)}" else: return "No local GGUF files found. Will download from HuggingFace." # Create the Gradio interface def create_interface(): # Check for available models availability_status = check_model_availability() # Get initial list of GGUF files from repository gguf_files, error = get_repo_gguf_files() if error or not gguf_files: initial_choices = ["Error loading models" if error else "No GGUF files found"] initial_value = initial_choices[0] else: initial_choices = gguf_files initial_value = HF_FILENAME if HF_FILENAME in gguf_files else gguf_files[0] with gr.Blocks(title="MMed-Llama-Alpaca GGUF Chatbot", theme=gr.themes.Soft()) as demo: gr.HTML("""

šŸ¦™ MMed-Llama-Alpaca Chatbot

Chat with the MMed-Llama-Alpaca model (Q4_K_M quantized) for medical assistance!
āš ļø This is for educational purposes only. Always consult healthcare professionals for medical advice.

""") with gr.Row(): with gr.Column(scale=4): # System prompt configuration gr.HTML("

šŸŽÆ System Prompt Configuration

") with gr.Row(): system_prompt = gr.Textbox( label="System Prompt", value=DEFAULT_SYSTEM_PROMPT, placeholder="Enter system prompt to define the AI's behavior and role...", lines=4, max_lines=15, scale=4, autoscroll=True, ) # with gr.Column(scale=1): # reset_prompt_btn = gr.Button("Reset to Default", variant="secondary", size="sm") # gr.HTML("

The system prompt defines how the AI should behave and respond. Changes apply to new conversations.

") # Chat interface chatbot = gr.Chatbot( height=400, show_copy_button=True, bubble_full_width=False, show_label=False, placeholder="Ask anything" ) with gr.Row(): msg = gr.Textbox( placeholder="Type your medical question here...", container=False, scale=7, show_label=False ) submit_btn = gr.Button("Send", variant="primary", scale=1) clear_btn = gr.Button("Clear", variant="secondary", scale=1) with gr.Column(scale=1): # Model loading section gr.HTML("

šŸ”§ Model Control

") # Model selection dropdown model_dropdown = gr.Dropdown( choices=initial_choices, value=initial_value, label="Select GGUF Model", info="Choose from available models in the repository", interactive=True ) # Context size (limited for Spaces) context_size = gr.Slider( minimum=512, maximum=8192, value=2048, step=256, label="Context Size", info="Token context window (requires model reload)" ) load_btn = gr.Button("Load Model", variant="primary", size="lg") model_status = gr.Textbox( label="Status", value=f"Model not loaded.\n{availability_status}\nāš™ļø Auto-optimized: CPU threads & GPU layers auto-detected\nšŸ“ Context size can be configured below", interactive=False, max_lines=10 ) # Generation parameters gr.HTML("

āš™ļø Generation Settings

") max_tokens = gr.Slider( minimum=50, maximum=1024, value=512, step=50, label="Max Tokens", info="Maximum response length" ) temperature = gr.Slider( minimum=0.1, maximum=2.0, value=0.7, step=0.1, label="Temperature", info="Creativity (higher = more creative)" ) top_p = gr.Slider( minimum=0.1, maximum=1.0, value=0.9, step=0.1, label="Top-p", info="Nucleus sampling" ) repeat_penalty = gr.Slider( minimum=1.0, maximum=1.5, value=1.1, step=0.1, label="Repeat Penalty", info="Penalize repetition" ) # Information section gr.HTML("""

ā„¹ļø About

Model: MMed-Llama-Alpaca

Quantization: Q4_K_M

Format: GGUF (optimized)

Backend: llama-cpp-python

Features: CPU/GPU support, streaming, system prompts

Specialty: Medical assistance

Auto-Optimization: CPU threads & GPU layers detected automatically

""") if not LLAMA_CPP_AVAILABLE: gr.HTML("""

āš ļø Missing Dependency

Install llama-cpp-python:
pip install llama-cpp-python

""") # Event handlers load_btn.click( load_model_interface, inputs=[context_size, model_dropdown], outputs=model_status ) submit_btn.click( chat_interface, inputs=[msg, chatbot, system_prompt, max_tokens, temperature, top_p, repeat_penalty], outputs=[chatbot, msg] ) msg.submit( chat_interface, inputs=[msg, chatbot, system_prompt, max_tokens, temperature, top_p, repeat_penalty], outputs=[chatbot, msg] ) clear_btn.click( clear_chat, outputs=[chatbot, msg] ) # reset_prompt_btn.click( # reset_system_prompt, # outputs=system_prompt # ) return demo if __name__ == "__main__": # Create and launch the interface demo = create_interface() # Launch with settings optimized for Hugging Face Spaces demo.launch( server_name="0.0.0.0", server_port=7860, share=False, debug=False, show_error=True, quiet=False )