l-operator-demo / app.py
Joseph Pollack
fixes titles
926f049 unverified
import gradio as gr
import torch
from PIL import Image, ImageDraw, ImageFont
import json
import os
from transformers import AutoProcessor, AutoModelForImageTextToText
from typing import List, Dict, Any
import logging
import spaces
title = """# L-Operator: 🤖Android📲Device🎮Control """
description = """
**Lightweight Multimodal Android Device Control Agent**
This demo showcases the L-Operator model, a fine-tuned multimodal AI agent based on LiquidAI/LFM2-VL-1.6B model,
optimized for Android device control through visual understanding and action generation.
## 🚀 How to Use
1. **Upload Screenshot**: Upload an Android device screenshot
2. **Describe Goal**: Enter what you want to accomplish
3. **Get Actions**: The model will generate JSON actions for Android device control
"""
joinus = """
## Join us :
🌟TeamTonic🌟 is always making cool demos! Join our active builder's 🛠️community 👻 [![Join us on Discord](https://img.shields.io/discord/1109943800132010065?label=Discord&logo=discord&style=flat-square)](https://discord.gg/qdfnvSPcqP) On 🤗Huggingface:[MultiTransformer](https://huggingface.co/MultiTransformer) On 🌐Github: [Tonic-AI](https://github.com/tonic-ai) & contribute to🌟 [MultiTonic](https://github.com/MultiTonic)🤗Big thanks to Yuvi Sharma and all the folks at huggingface for the community grant 🤗
"""
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Model configuration
MODEL_ID = "Tonic/l-operator"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
# Get Hugging Face token from environment variable (Spaces secrets)
import os
HF_TOKEN = os.getenv("HF_TOKEN")
if not HF_TOKEN:
logger.warning("HF_TOKEN not found in environment variables. Model access may be restricted.")
logger.warning("Please set HF_TOKEN in your environment variables or Spaces secrets.")
def create_annotated_image(image: Image.Image, x: int, y: int, action_type: str = "click") -> Image.Image:
"""Create an image with a bounding box around the specified coordinates"""
try:
# Create a copy of the original image
annotated_image = image.copy()
draw = ImageDraw.Draw(annotated_image)
# Define bounding box parameters - make it generous as requested
box_size = 120 # Increased size for more generous bounding box
box_color = (255, 0, 0) # Red color
line_width = 4 # Thicker line for better visibility
# Calculate bounding box coordinates
left = max(0, x - box_size // 2)
top = max(0, y - box_size // 2)
right = min(image.width, x + box_size // 2)
bottom = min(image.height, y + box_size // 2)
# Draw the bounding box with rounded corners effect
draw.rectangle([left, top, right, bottom], outline=box_color, width=line_width)
# Draw corner indicators for better visibility
corner_size = 15
# Top-left corner
draw.line([left, top, left + corner_size, top], fill=box_color, width=line_width)
draw.line([left, top, left, top + corner_size], fill=box_color, width=line_width)
# Top-right corner
draw.line([right - corner_size, top, right, top], fill=box_color, width=line_width)
draw.line([right, top, right, top + corner_size], fill=box_color, width=line_width)
# Bottom-left corner
draw.line([left, bottom - corner_size, left, bottom], fill=box_color, width=line_width)
draw.line([left, bottom, left + corner_size, bottom], fill=box_color, width=line_width)
# Bottom-right corner
draw.line([right - corner_size, bottom, right, bottom], fill=box_color, width=line_width)
draw.line([right, bottom - corner_size, right, bottom], fill=box_color, width=line_width)
# Draw a crosshair at the exact point
crosshair_size = 15
crosshair_color = (255, 255, 0) # Yellow crosshair for contrast
draw.line([x - crosshair_size, y, x + crosshair_size, y], fill=crosshair_color, width=3)
draw.line([x, y - crosshair_size, x, y + crosshair_size], fill=crosshair_color, width=3)
# Add a small circle at the center
circle_radius = 4
draw.ellipse([x - circle_radius, y - circle_radius, x + circle_radius, y + circle_radius],
fill=crosshair_color, outline=box_color, width=2)
# Add text label with better positioning
try:
font = ImageFont.load_default()
except:
font = ImageFont.load_default()
label_text = f"{action_type.upper()}: ({x}, {y})"
text_bbox = draw.textbbox((0, 0), label_text, font=font)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
# Position text above the bounding box, but ensure it's visible
text_x = max(5, left)
text_y = max(5, top - text_height - 10)
# If text would go off the top, position it below the box
if text_y < 5:
text_y = min(image.height - text_height - 5, bottom + 10)
# Draw text background with better contrast
draw.rectangle([text_x - 4, text_y - 4, text_x + text_width + 4, text_y + text_height + 4],
fill=(0, 0, 0, 180))
# Draw text
draw.text((text_x, text_y), label_text, fill=(255, 255, 255), font=font)
return annotated_image
except Exception as e:
logger.error(f"Error creating annotated image: {str(e)}")
return image # Return original image if annotation fails
def parse_action_response(response: str) -> tuple:
"""Parse the action response and extract coordinates if present"""
try:
# Try to parse as JSON
if response.strip().startswith('{'):
action_data = json.loads(response)
# Check if it's a click action with coordinates
if (action_data.get('action_type') == 'click' and
'x' in action_data and 'y' in action_data):
return action_data, True
else:
return action_data, False
else:
return response, False
except json.JSONDecodeError:
return response, False
except Exception as e:
logger.error(f"Error parsing action response: {str(e)}")
return response, False
class LOperatorDemo:
def __init__(self):
self.model = None
self.processor = None
self.is_loaded = False
def load_model(self):
"""Load the L-Operator model and processor with timeout handling"""
try:
import time
start_time = time.time()
logger.info(f"Loading model {MODEL_ID} on device {DEVICE}")
# Check if token is available
if not HF_TOKEN:
return "❌ HF_TOKEN not found. Please set HF_TOKEN in Spaces secrets."
# Load model with progress logging
logger.info("Downloading and loading model weights...")
self.model = AutoModelForImageTextToText.from_pretrained(
MODEL_ID,
device_map="auto",
torch_dtype=torch.bfloat16 if DEVICE == "cuda" else torch.float32,
trust_remote_code=True
)
# Load processor
logger.info("Loading processor...")
self.processor = AutoProcessor.from_pretrained(
MODEL_ID,
trust_remote_code=True
)
if DEVICE == "cpu":
self.model = self.model.to(DEVICE)
self.is_loaded = True
load_time = time.time() - start_time
logger.info(f"Model loaded successfully in {load_time:.1f} seconds")
return f"✅ Model loaded successfully in {load_time:.1f} seconds"
except Exception as e:
logger.error(f"Error loading model: {str(e)}")
return f"❌ Error loading model: {str(e)} - This may be a custom model requiring special handling"
@spaces.GPU(duration=120) # 2 minutes for action generation
def generate_action(self, image: Image.Image, goal: str, instruction: str) -> str:
"""Generate action based on image and text inputs using the same format as training"""
if not self.is_loaded:
return "❌ Model not loaded. Please load the model first."
try:
# Convert image to RGB if needed
if image.mode != "RGB":
image = image.convert("RGB")
# Build conversation using the EXACT same format as training
user_text = (
f"Goal: {goal}\n"
f"Step: {instruction}\n"
"Respond with a JSON action containing relevant keys (e.g., action_type, x, y, text, app_name, direction)."
)
conversation = [
{
"role": "system",
"content": [
{"type": "text", "text": "You are a helpful multimodal assistant by Liquid AI."}
]
},
{
"role": "user",
"content": [
{"type": "image", "image": image},
{"type": "text", "text": user_text}
]
}
]
logger.info("Processing conversation with processor...")
# Process inputs using the same method as training
inputs = self.processor.apply_chat_template(
conversation,
add_generation_prompt=True,
return_tensors="pt",
return_dict=True,
tokenize=True,
)
logger.info(f"Processor output keys: {list(inputs.keys())}")
# Move inputs to device
for key, value in inputs.items():
if isinstance(value, torch.Tensor):
inputs[key] = value.to(self.model.device)
logger.info(f"Inputs shape: {inputs['input_ids'].shape}, device: {inputs['input_ids'].device}")
# Generate response
logger.info("Generating response...")
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=128,
do_sample=True,
temperature=0.7,
top_p=0.9,
pad_token_id=self.processor.tokenizer.eos_token_id
)
logger.info("Decoding response...")
# Decode the generated tokens
response = self.processor.tokenizer.decode(
outputs[0][inputs['input_ids'].shape[1]:],
skip_special_tokens=True
)
# Try to parse as JSON for better formatting
try:
parsed_response = json.loads(response)
return json.dumps(parsed_response, indent=2)
except:
return response
except Exception as e:
logger.error(f"Error generating action: {str(e)}")
return f"❌ Error generating action: {str(e)}"
# Initialize demo
demo_instance = LOperatorDemo()
def process_input(image, goal, step_instructions):
"""Process the input and generate action"""
if image is None:
return "❌ Please upload an Android screenshot image.", None
if not goal.strip():
return "❌ Please provide a goal.", None
if not step_instructions.strip():
return "❌ Please provide step instructions.", None
if not demo_instance.is_loaded:
return "❌ Model not loaded. Please wait for it to load automatically.", None
try:
# Handle different image formats
pil_image = None
if hasattr(image, 'mode'): # PIL Image object
pil_image = image
elif isinstance(image, str) and os.path.exists(image):
# Handle file path (from examples)
pil_image = Image.open(image)
elif hasattr(image, 'name') and os.path.exists(image.name):
# Handle Gradio file object
pil_image = Image.open(image.name)
else:
return "❌ Invalid image format. Please upload a valid image.", None
if pil_image is None:
return "❌ Failed to process image. Please try again.", None
# Convert image to RGB if needed
if pil_image.mode != "RGB":
pil_image = pil_image.convert("RGB")
# Generate action using goal and step instructions
response = demo_instance.generate_action(pil_image, goal, step_instructions)
# Parse the response to check for coordinates
action_data, has_coordinates = parse_action_response(response)
# If coordinates are found, create annotated image
annotated_image = None
if has_coordinates and isinstance(action_data, dict):
x = action_data.get('x')
y = action_data.get('y')
action_type = action_data.get('action_type', 'click')
if x is not None and y is not None:
annotated_image = create_annotated_image(pil_image, x, y, action_type)
logger.info(f"Created annotated image for coordinates ({x}, {y})")
return response, annotated_image
except Exception as e:
logger.error(f"Error processing input: {str(e)}")
return f"❌ Error: {str(e)}", None
def update_annotated_image_visibility(response, annotated_image):
"""Update the visibility of the annotated image based on whether coordinates are present"""
if annotated_image is not None:
return gr.update(visible=True, value=annotated_image)
else:
return gr.update(visible=False, value=None)
def load_example_episodes():
"""Load example episodes using PIL to load images directly"""
examples = []
try:
# Updated to include all 12 episodes with appropriate screenshot selections
episode_screenshots = {
"episode_13": 3, # Cruise deals app
"episode_53": 5, # Pinterest sustainability
"episode_73": 3, # Moon phases app
"episode_16730": 4, # Weather app forecast
"episode_17562": 3, # Ticktick reminder app
"episode_19565": 4, # New episode
"episode_19649": 2, # New episode
"episode_5590": 3, # New episode
"episode_4712": 2, # New episode
"episode_3731": 2, # New episode
"episode_2080": 2, # New episode
"episode_1993": 2 # New episode
}
for episode_dir, screenshot_num in episode_screenshots.items():
try:
metadata_path = f"extracted_episodes_duckdb/{episode_dir}/metadata.json"
image_path = f"extracted_episodes_duckdb/{episode_dir}/screenshots/screenshot_{screenshot_num}.png"
# Check if both files exist
if os.path.exists(metadata_path) and os.path.exists(image_path):
logger.info(f"Loading example from {episode_dir} using screenshot_{screenshot_num}.png")
with open(metadata_path, "r") as f:
metadata = json.load(f)
# Load image directly with PIL
pil_image = Image.open(image_path)
episode_num = episode_dir.split('_')[1]
goal_text = metadata.get('goal', f'Episode {episode_num} example')
# Get step instruction for the corresponding screenshot
step_instructions = metadata.get('step_instructions', [])
step_instruction = ""
if step_instructions and screenshot_num <= len(step_instructions):
step_instruction = step_instructions[screenshot_num - 1]
logger.info(f"Episode {episode_num} goal: {goal_text}")
logger.info(f"Episode {episode_num} step instruction: {step_instruction}")
examples.append([
pil_image, # Use PIL Image object directly
goal_text, # Use the goal text from metadata
step_instruction # Use the step instruction for this screenshot
])
logger.info(f"Successfully loaded example for Episode {episode_num}")
except Exception as e:
logger.warning(f"Could not load example for {episode_dir}: {str(e)}")
continue
except Exception as e:
logger.error(f"Error loading examples: {str(e)}")
examples = []
logger.info(f"Loaded {len(examples)} examples using PIL")
return examples
# Create Gradio interface
def create_demo():
"""Create the Gradio demo interface using Blocks"""
with gr.Blocks(
title=title,
theme=gr.themes.Monochrome(),
css="""
.gradio-container {
max-width: 1200px !important;
}
.output-container {
min-height: 200px;
}
.annotated-image-container {
border: 2px solid #e0e0e0;
border-radius: 8px;
padding: 10px;
margin-top: 10px;
}
"""
) as demo:
# Header section
gr.Markdown(title)
# Info section
with gr.Row():
with gr.Column(scale=1):
gr.Markdown(description)
with gr.Column(scale=1):
gr.Markdown(joinus)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 📱 Upload Screenshot")
image_input = gr.Image(
label="Android Screenshot",
type="pil",
height=400
)
gr.Markdown("### 🎯 Goal")
goal_input = gr.Textbox(
label="What would you like to accomplish?",
placeholder="e.g., Open the Settings app and navigate to Display settings",
lines=3
)
gr.Markdown("### 📝 Step Instructions")
step_instructions_input = gr.Textbox(
label="Specific step instruction for this screenshot",
placeholder="e.g., Tap on the Settings icon to open the app",
lines=2
)
# Process button
process_btn = gr.Button("🚀 Generate Action", variant="primary", size="lg")
with gr.Column(scale=1):
gr.Markdown("### 🎯 Annotated Screenshot")
annotated_image_output = gr.Image(
label="Click Location Highlighted",
height=400,
visible=False,
interactive=False,
elem_classes=["annotated-image-container"]
)
gr.Markdown("### 📊 Generated Action")
output_text = gr.Textbox(
label="JSON Action Output",
lines=15,
max_lines=20,
interactive=False,
elem_classes=["output-container"]
)
# Connect the process button
process_btn.click(
fn=process_input,
inputs=[image_input, goal_input, step_instructions_input],
outputs=[output_text, annotated_image_output]
).then(
fn=update_annotated_image_visibility,
inputs=[output_text, annotated_image_output],
outputs=annotated_image_output
)
# Load examples
gr.Markdown("### 📚 Example Episodes")
try:
examples = load_example_episodes()
if examples:
# Organize examples in a grid layout (3 columns)
for row_start in range(0, len(examples), 3):
with gr.Row():
for i in range(row_start, min(row_start + 3, len(examples))):
image, goal, step_instruction = examples[i]
with gr.Column(scale=1):
episode_num = i + 1
gr.Markdown(f"**Episode {episode_num}**")
example_image = gr.Image(
value=image,
label=f"Example {episode_num}",
height=150,
interactive=False
)
example_goal = gr.Textbox(
value=goal,
label="Goal",
lines=3,
interactive=False
)
example_step_instruction = gr.Textbox(
value=step_instruction,
label="Step Instruction",
lines=2,
interactive=False
)
# Create a button to load this example
load_example_btn = gr.Button(f"Load Example {episode_num}", size="sm")
load_example_btn.click(
fn=lambda img, g, s: (img, g, s),
inputs=[example_image, example_goal, example_step_instruction],
outputs=[image_input, goal_input, step_instructions_input]
).then(
fn=lambda: (None, gr.update(visible=False)),
outputs=[output_text, annotated_image_output]
)
except Exception as e:
logger.warning(f"Failed to load examples: {str(e)}")
gr.Markdown("❌ Failed to load examples. Please upload your own screenshot.")
# Load model automatically on startup
def load_model_on_startup():
"""Load model automatically without user feedback"""
if not demo_instance.is_loaded:
logger.info("Loading L-Operator model automatically...")
try:
demo_instance.load_model()
logger.info("Model loaded successfully in background")
except Exception as e:
logger.error(f"Failed to load model: {str(e)}")
# Load model automatically on page load
demo.load(fn=load_model_on_startup)
gr.Markdown("""
---
**Made with ❤️ by Tonic** | [Model on Hugging Face](https://huggingface.co/Tonic/l-android-control)
""")
return demo
# Create and launch the demo with optimized settings
if __name__ == "__main__":
try:
logger.info("Creating Gradio demo interface...")
demo = create_demo()
logger.info("Launching Gradio server...")
demo.launch(
# server_name="0.0.0.0",
# server_port=7860,
# share=False,
# debug=False, # Disable debug to reduce startup time
show_error=True,
ssr_mode=False,
# max_threads=2, # Limit threads to prevent resource exhaustion
# quiet=True # Reduce startup logging noise
)
except Exception as e:
logger.error(f"Failed to launch Gradio app: {str(e)}")
raise