import gradio as gr import torch from PIL import Image, ImageDraw, ImageFont import json import os from transformers import AutoProcessor, AutoModelForImageTextToText from typing import List, Dict, Any import logging import spaces title = """# L-Operator: 🤖Android📲Device🎮Control """ description = """ **Lightweight Multimodal Android Device Control Agent** This demo showcases the L-Operator model, a fine-tuned multimodal AI agent based on LiquidAI/LFM2-VL-1.6B model, optimized for Android device control through visual understanding and action generation. ## 🚀 How to Use 1. **Upload Screenshot**: Upload an Android device screenshot 2. **Describe Goal**: Enter what you want to accomplish 3. **Get Actions**: The model will generate JSON actions for Android device control """ joinus = """ ## Join us : 🌟TeamTonic🌟 is always making cool demos! Join our active builder's 🛠️community 👻 [![Join us on Discord](https://img.shields.io/discord/1109943800132010065?label=Discord&logo=discord&style=flat-square)](https://discord.gg/qdfnvSPcqP) On 🤗Huggingface:[MultiTransformer](https://huggingface.co/MultiTransformer) On 🌐Github: [Tonic-AI](https://github.com/tonic-ai) & contribute to🌟 [MultiTonic](https://github.com/MultiTonic)🤗Big thanks to Yuvi Sharma and all the folks at huggingface for the community grant 🤗 """ # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Model configuration MODEL_ID = "Tonic/l-operator" DEVICE = "cuda" if torch.cuda.is_available() else "cpu" # Get Hugging Face token from environment variable (Spaces secrets) import os HF_TOKEN = os.getenv("HF_TOKEN") if not HF_TOKEN: logger.warning("HF_TOKEN not found in environment variables. Model access may be restricted.") logger.warning("Please set HF_TOKEN in your environment variables or Spaces secrets.") def create_annotated_image(image: Image.Image, x: int, y: int, action_type: str = "click") -> Image.Image: """Create an image with a bounding box around the specified coordinates""" try: # Create a copy of the original image annotated_image = image.copy() draw = ImageDraw.Draw(annotated_image) # Define bounding box parameters - make it generous as requested box_size = 120 # Increased size for more generous bounding box box_color = (255, 0, 0) # Red color line_width = 4 # Thicker line for better visibility # Calculate bounding box coordinates left = max(0, x - box_size // 2) top = max(0, y - box_size // 2) right = min(image.width, x + box_size // 2) bottom = min(image.height, y + box_size // 2) # Draw the bounding box with rounded corners effect draw.rectangle([left, top, right, bottom], outline=box_color, width=line_width) # Draw corner indicators for better visibility corner_size = 15 # Top-left corner draw.line([left, top, left + corner_size, top], fill=box_color, width=line_width) draw.line([left, top, left, top + corner_size], fill=box_color, width=line_width) # Top-right corner draw.line([right - corner_size, top, right, top], fill=box_color, width=line_width) draw.line([right, top, right, top + corner_size], fill=box_color, width=line_width) # Bottom-left corner draw.line([left, bottom - corner_size, left, bottom], fill=box_color, width=line_width) draw.line([left, bottom, left + corner_size, bottom], fill=box_color, width=line_width) # Bottom-right corner draw.line([right - corner_size, bottom, right, bottom], fill=box_color, width=line_width) draw.line([right, bottom - corner_size, right, bottom], fill=box_color, width=line_width) # Draw a crosshair at the exact point crosshair_size = 15 crosshair_color = (255, 255, 0) # Yellow crosshair for contrast draw.line([x - crosshair_size, y, x + crosshair_size, y], fill=crosshair_color, width=3) draw.line([x, y - crosshair_size, x, y + crosshair_size], fill=crosshair_color, width=3) # Add a small circle at the center circle_radius = 4 draw.ellipse([x - circle_radius, y - circle_radius, x + circle_radius, y + circle_radius], fill=crosshair_color, outline=box_color, width=2) # Add text label with better positioning try: font = ImageFont.load_default() except: font = ImageFont.load_default() label_text = f"{action_type.upper()}: ({x}, {y})" text_bbox = draw.textbbox((0, 0), label_text, font=font) text_width = text_bbox[2] - text_bbox[0] text_height = text_bbox[3] - text_bbox[1] # Position text above the bounding box, but ensure it's visible text_x = max(5, left) text_y = max(5, top - text_height - 10) # If text would go off the top, position it below the box if text_y < 5: text_y = min(image.height - text_height - 5, bottom + 10) # Draw text background with better contrast draw.rectangle([text_x - 4, text_y - 4, text_x + text_width + 4, text_y + text_height + 4], fill=(0, 0, 0, 180)) # Draw text draw.text((text_x, text_y), label_text, fill=(255, 255, 255), font=font) return annotated_image except Exception as e: logger.error(f"Error creating annotated image: {str(e)}") return image # Return original image if annotation fails def parse_action_response(response: str) -> tuple: """Parse the action response and extract coordinates if present""" try: # Try to parse as JSON if response.strip().startswith('{'): action_data = json.loads(response) # Check if it's a click action with coordinates if (action_data.get('action_type') == 'click' and 'x' in action_data and 'y' in action_data): return action_data, True else: return action_data, False else: return response, False except json.JSONDecodeError: return response, False except Exception as e: logger.error(f"Error parsing action response: {str(e)}") return response, False class LOperatorDemo: def __init__(self): self.model = None self.processor = None self.is_loaded = False def load_model(self): """Load the L-Operator model and processor with timeout handling""" try: import time start_time = time.time() logger.info(f"Loading model {MODEL_ID} on device {DEVICE}") # Check if token is available if not HF_TOKEN: return "❌ HF_TOKEN not found. Please set HF_TOKEN in Spaces secrets." # Load model with progress logging logger.info("Downloading and loading model weights...") self.model = AutoModelForImageTextToText.from_pretrained( MODEL_ID, device_map="auto", torch_dtype=torch.bfloat16 if DEVICE == "cuda" else torch.float32, trust_remote_code=True ) # Load processor logger.info("Loading processor...") self.processor = AutoProcessor.from_pretrained( MODEL_ID, trust_remote_code=True ) if DEVICE == "cpu": self.model = self.model.to(DEVICE) self.is_loaded = True load_time = time.time() - start_time logger.info(f"Model loaded successfully in {load_time:.1f} seconds") return f"✅ Model loaded successfully in {load_time:.1f} seconds" except Exception as e: logger.error(f"Error loading model: {str(e)}") return f"❌ Error loading model: {str(e)} - This may be a custom model requiring special handling" @spaces.GPU(duration=120) # 2 minutes for action generation def generate_action(self, image: Image.Image, goal: str, instruction: str) -> str: """Generate action based on image and text inputs using the same format as training""" if not self.is_loaded: return "❌ Model not loaded. Please load the model first." try: # Convert image to RGB if needed if image.mode != "RGB": image = image.convert("RGB") # Build conversation using the EXACT same format as training user_text = ( f"Goal: {goal}\n" f"Step: {instruction}\n" "Respond with a JSON action containing relevant keys (e.g., action_type, x, y, text, app_name, direction)." ) conversation = [ { "role": "system", "content": [ {"type": "text", "text": "You are a helpful multimodal assistant by Liquid AI."} ] }, { "role": "user", "content": [ {"type": "image", "image": image}, {"type": "text", "text": user_text} ] } ] logger.info("Processing conversation with processor...") # Process inputs using the same method as training inputs = self.processor.apply_chat_template( conversation, add_generation_prompt=True, return_tensors="pt", return_dict=True, tokenize=True, ) logger.info(f"Processor output keys: {list(inputs.keys())}") # Move inputs to device for key, value in inputs.items(): if isinstance(value, torch.Tensor): inputs[key] = value.to(self.model.device) logger.info(f"Inputs shape: {inputs['input_ids'].shape}, device: {inputs['input_ids'].device}") # Generate response logger.info("Generating response...") with torch.no_grad(): outputs = self.model.generate( **inputs, max_new_tokens=128, do_sample=True, temperature=0.7, top_p=0.9, pad_token_id=self.processor.tokenizer.eos_token_id ) logger.info("Decoding response...") # Decode the generated tokens response = self.processor.tokenizer.decode( outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True ) # Try to parse as JSON for better formatting try: parsed_response = json.loads(response) return json.dumps(parsed_response, indent=2) except: return response except Exception as e: logger.error(f"Error generating action: {str(e)}") return f"❌ Error generating action: {str(e)}" # Initialize demo demo_instance = LOperatorDemo() def process_input(image, goal, step_instructions): """Process the input and generate action""" if image is None: return "❌ Please upload an Android screenshot image.", None if not goal.strip(): return "❌ Please provide a goal.", None if not step_instructions.strip(): return "❌ Please provide step instructions.", None if not demo_instance.is_loaded: return "❌ Model not loaded. Please wait for it to load automatically.", None try: # Handle different image formats pil_image = None if hasattr(image, 'mode'): # PIL Image object pil_image = image elif isinstance(image, str) and os.path.exists(image): # Handle file path (from examples) pil_image = Image.open(image) elif hasattr(image, 'name') and os.path.exists(image.name): # Handle Gradio file object pil_image = Image.open(image.name) else: return "❌ Invalid image format. Please upload a valid image.", None if pil_image is None: return "❌ Failed to process image. Please try again.", None # Convert image to RGB if needed if pil_image.mode != "RGB": pil_image = pil_image.convert("RGB") # Generate action using goal and step instructions response = demo_instance.generate_action(pil_image, goal, step_instructions) # Parse the response to check for coordinates action_data, has_coordinates = parse_action_response(response) # If coordinates are found, create annotated image annotated_image = None if has_coordinates and isinstance(action_data, dict): x = action_data.get('x') y = action_data.get('y') action_type = action_data.get('action_type', 'click') if x is not None and y is not None: annotated_image = create_annotated_image(pil_image, x, y, action_type) logger.info(f"Created annotated image for coordinates ({x}, {y})") return response, annotated_image except Exception as e: logger.error(f"Error processing input: {str(e)}") return f"❌ Error: {str(e)}", None def update_annotated_image_visibility(response, annotated_image): """Update the visibility of the annotated image based on whether coordinates are present""" if annotated_image is not None: return gr.update(visible=True, value=annotated_image) else: return gr.update(visible=False, value=None) def load_example_episodes(): """Load example episodes using PIL to load images directly""" examples = [] try: # Updated to include all 12 episodes with appropriate screenshot selections episode_screenshots = { "episode_13": 3, # Cruise deals app "episode_53": 5, # Pinterest sustainability "episode_73": 3, # Moon phases app "episode_16730": 4, # Weather app forecast "episode_17562": 3, # Ticktick reminder app "episode_19565": 4, # New episode "episode_19649": 2, # New episode "episode_5590": 3, # New episode "episode_4712": 2, # New episode "episode_3731": 2, # New episode "episode_2080": 2, # New episode "episode_1993": 2 # New episode } for episode_dir, screenshot_num in episode_screenshots.items(): try: metadata_path = f"extracted_episodes_duckdb/{episode_dir}/metadata.json" image_path = f"extracted_episodes_duckdb/{episode_dir}/screenshots/screenshot_{screenshot_num}.png" # Check if both files exist if os.path.exists(metadata_path) and os.path.exists(image_path): logger.info(f"Loading example from {episode_dir} using screenshot_{screenshot_num}.png") with open(metadata_path, "r") as f: metadata = json.load(f) # Load image directly with PIL pil_image = Image.open(image_path) episode_num = episode_dir.split('_')[1] goal_text = metadata.get('goal', f'Episode {episode_num} example') # Get step instruction for the corresponding screenshot step_instructions = metadata.get('step_instructions', []) step_instruction = "" if step_instructions and screenshot_num <= len(step_instructions): step_instruction = step_instructions[screenshot_num - 1] logger.info(f"Episode {episode_num} goal: {goal_text}") logger.info(f"Episode {episode_num} step instruction: {step_instruction}") examples.append([ pil_image, # Use PIL Image object directly goal_text, # Use the goal text from metadata step_instruction # Use the step instruction for this screenshot ]) logger.info(f"Successfully loaded example for Episode {episode_num}") except Exception as e: logger.warning(f"Could not load example for {episode_dir}: {str(e)}") continue except Exception as e: logger.error(f"Error loading examples: {str(e)}") examples = [] logger.info(f"Loaded {len(examples)} examples using PIL") return examples # Create Gradio interface def create_demo(): """Create the Gradio demo interface using Blocks""" with gr.Blocks( title=title, theme=gr.themes.Monochrome(), css=""" .gradio-container { max-width: 1200px !important; } .output-container { min-height: 200px; } .annotated-image-container { border: 2px solid #e0e0e0; border-radius: 8px; padding: 10px; margin-top: 10px; } """ ) as demo: # Header section gr.Markdown(title) # Info section with gr.Row(): with gr.Column(scale=1): gr.Markdown(description) with gr.Column(scale=1): gr.Markdown(joinus) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📱 Upload Screenshot") image_input = gr.Image( label="Android Screenshot", type="pil", height=400 ) gr.Markdown("### 🎯 Goal") goal_input = gr.Textbox( label="What would you like to accomplish?", placeholder="e.g., Open the Settings app and navigate to Display settings", lines=3 ) gr.Markdown("### 📝 Step Instructions") step_instructions_input = gr.Textbox( label="Specific step instruction for this screenshot", placeholder="e.g., Tap on the Settings icon to open the app", lines=2 ) # Process button process_btn = gr.Button("🚀 Generate Action", variant="primary", size="lg") with gr.Column(scale=1): gr.Markdown("### 🎯 Annotated Screenshot") annotated_image_output = gr.Image( label="Click Location Highlighted", height=400, visible=False, interactive=False, elem_classes=["annotated-image-container"] ) gr.Markdown("### 📊 Generated Action") output_text = gr.Textbox( label="JSON Action Output", lines=15, max_lines=20, interactive=False, elem_classes=["output-container"] ) # Connect the process button process_btn.click( fn=process_input, inputs=[image_input, goal_input, step_instructions_input], outputs=[output_text, annotated_image_output] ).then( fn=update_annotated_image_visibility, inputs=[output_text, annotated_image_output], outputs=annotated_image_output ) # Load examples gr.Markdown("### 📚 Example Episodes") try: examples = load_example_episodes() if examples: # Organize examples in a grid layout (3 columns) for row_start in range(0, len(examples), 3): with gr.Row(): for i in range(row_start, min(row_start + 3, len(examples))): image, goal, step_instruction = examples[i] with gr.Column(scale=1): episode_num = i + 1 gr.Markdown(f"**Episode {episode_num}**") example_image = gr.Image( value=image, label=f"Example {episode_num}", height=150, interactive=False ) example_goal = gr.Textbox( value=goal, label="Goal", lines=3, interactive=False ) example_step_instruction = gr.Textbox( value=step_instruction, label="Step Instruction", lines=2, interactive=False ) # Create a button to load this example load_example_btn = gr.Button(f"Load Example {episode_num}", size="sm") load_example_btn.click( fn=lambda img, g, s: (img, g, s), inputs=[example_image, example_goal, example_step_instruction], outputs=[image_input, goal_input, step_instructions_input] ).then( fn=lambda: (None, gr.update(visible=False)), outputs=[output_text, annotated_image_output] ) except Exception as e: logger.warning(f"Failed to load examples: {str(e)}") gr.Markdown("❌ Failed to load examples. Please upload your own screenshot.") # Load model automatically on startup def load_model_on_startup(): """Load model automatically without user feedback""" if not demo_instance.is_loaded: logger.info("Loading L-Operator model automatically...") try: demo_instance.load_model() logger.info("Model loaded successfully in background") except Exception as e: logger.error(f"Failed to load model: {str(e)}") # Load model automatically on page load demo.load(fn=load_model_on_startup) gr.Markdown(""" --- **Made with ❤️ by Tonic** | [Model on Hugging Face](https://huggingface.co/Tonic/l-android-control) """) return demo # Create and launch the demo with optimized settings if __name__ == "__main__": try: logger.info("Creating Gradio demo interface...") demo = create_demo() logger.info("Launching Gradio server...") demo.launch( # server_name="0.0.0.0", # server_port=7860, # share=False, # debug=False, # Disable debug to reduce startup time show_error=True, ssr_mode=False, # max_threads=2, # Limit threads to prevent resource exhaustion # quiet=True # Reduce startup logging noise ) except Exception as e: logger.error(f"Failed to launch Gradio app: {str(e)}") raise