Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import torch | |
from PIL import Image, ImageDraw, ImageFont | |
import json | |
import os | |
from transformers import AutoProcessor, AutoModelForImageTextToText | |
from typing import List, Dict, Any | |
import logging | |
import spaces | |
title = """# L-Operator: 🤖Android📲Device🎮Control """ | |
description = """ | |
**Lightweight Multimodal Android Device Control Agent** | |
This demo showcases the L-Operator model, a fine-tuned multimodal AI agent based on LiquidAI/LFM2-VL-1.6B model, | |
optimized for Android device control through visual understanding and action generation. | |
## 🚀 How to Use | |
1. **Upload Screenshot**: Upload an Android device screenshot | |
2. **Describe Goal**: Enter what you want to accomplish | |
3. **Get Actions**: The model will generate JSON actions for Android device control | |
""" | |
joinus = """ | |
## Join us : | |
🌟TeamTonic🌟 is always making cool demos! Join our active builder's 🛠️community 👻 [](https://discord.gg/qdfnvSPcqP) On 🤗Huggingface:[MultiTransformer](https://huggingface.co/MultiTransformer) On 🌐Github: [Tonic-AI](https://github.com/tonic-ai) & contribute to🌟 [MultiTonic](https://github.com/MultiTonic)🤗Big thanks to Yuvi Sharma and all the folks at huggingface for the community grant 🤗 | |
""" | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Model configuration | |
MODEL_ID = "Tonic/l-operator" | |
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
# Get Hugging Face token from environment variable (Spaces secrets) | |
import os | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
if not HF_TOKEN: | |
logger.warning("HF_TOKEN not found in environment variables. Model access may be restricted.") | |
logger.warning("Please set HF_TOKEN in your environment variables or Spaces secrets.") | |
def create_annotated_image(image: Image.Image, x: int, y: int, action_type: str = "click") -> Image.Image: | |
"""Create an image with a bounding box around the specified coordinates""" | |
try: | |
# Create a copy of the original image | |
annotated_image = image.copy() | |
draw = ImageDraw.Draw(annotated_image) | |
# Define bounding box parameters - make it generous as requested | |
box_size = 120 # Increased size for more generous bounding box | |
box_color = (255, 0, 0) # Red color | |
line_width = 4 # Thicker line for better visibility | |
# Calculate bounding box coordinates | |
left = max(0, x - box_size // 2) | |
top = max(0, y - box_size // 2) | |
right = min(image.width, x + box_size // 2) | |
bottom = min(image.height, y + box_size // 2) | |
# Draw the bounding box with rounded corners effect | |
draw.rectangle([left, top, right, bottom], outline=box_color, width=line_width) | |
# Draw corner indicators for better visibility | |
corner_size = 15 | |
# Top-left corner | |
draw.line([left, top, left + corner_size, top], fill=box_color, width=line_width) | |
draw.line([left, top, left, top + corner_size], fill=box_color, width=line_width) | |
# Top-right corner | |
draw.line([right - corner_size, top, right, top], fill=box_color, width=line_width) | |
draw.line([right, top, right, top + corner_size], fill=box_color, width=line_width) | |
# Bottom-left corner | |
draw.line([left, bottom - corner_size, left, bottom], fill=box_color, width=line_width) | |
draw.line([left, bottom, left + corner_size, bottom], fill=box_color, width=line_width) | |
# Bottom-right corner | |
draw.line([right - corner_size, bottom, right, bottom], fill=box_color, width=line_width) | |
draw.line([right, bottom - corner_size, right, bottom], fill=box_color, width=line_width) | |
# Draw a crosshair at the exact point | |
crosshair_size = 15 | |
crosshair_color = (255, 255, 0) # Yellow crosshair for contrast | |
draw.line([x - crosshair_size, y, x + crosshair_size, y], fill=crosshair_color, width=3) | |
draw.line([x, y - crosshair_size, x, y + crosshair_size], fill=crosshair_color, width=3) | |
# Add a small circle at the center | |
circle_radius = 4 | |
draw.ellipse([x - circle_radius, y - circle_radius, x + circle_radius, y + circle_radius], | |
fill=crosshair_color, outline=box_color, width=2) | |
# Add text label with better positioning | |
try: | |
font = ImageFont.load_default() | |
except: | |
font = ImageFont.load_default() | |
label_text = f"{action_type.upper()}: ({x}, {y})" | |
text_bbox = draw.textbbox((0, 0), label_text, font=font) | |
text_width = text_bbox[2] - text_bbox[0] | |
text_height = text_bbox[3] - text_bbox[1] | |
# Position text above the bounding box, but ensure it's visible | |
text_x = max(5, left) | |
text_y = max(5, top - text_height - 10) | |
# If text would go off the top, position it below the box | |
if text_y < 5: | |
text_y = min(image.height - text_height - 5, bottom + 10) | |
# Draw text background with better contrast | |
draw.rectangle([text_x - 4, text_y - 4, text_x + text_width + 4, text_y + text_height + 4], | |
fill=(0, 0, 0, 180)) | |
# Draw text | |
draw.text((text_x, text_y), label_text, fill=(255, 255, 255), font=font) | |
return annotated_image | |
except Exception as e: | |
logger.error(f"Error creating annotated image: {str(e)}") | |
return image # Return original image if annotation fails | |
def parse_action_response(response: str) -> tuple: | |
"""Parse the action response and extract coordinates if present""" | |
try: | |
# Try to parse as JSON | |
if response.strip().startswith('{'): | |
action_data = json.loads(response) | |
# Check if it's a click action with coordinates | |
if (action_data.get('action_type') == 'click' and | |
'x' in action_data and 'y' in action_data): | |
return action_data, True | |
else: | |
return action_data, False | |
else: | |
return response, False | |
except json.JSONDecodeError: | |
return response, False | |
except Exception as e: | |
logger.error(f"Error parsing action response: {str(e)}") | |
return response, False | |
class LOperatorDemo: | |
def __init__(self): | |
self.model = None | |
self.processor = None | |
self.is_loaded = False | |
def load_model(self): | |
"""Load the L-Operator model and processor with timeout handling""" | |
try: | |
import time | |
start_time = time.time() | |
logger.info(f"Loading model {MODEL_ID} on device {DEVICE}") | |
# Check if token is available | |
if not HF_TOKEN: | |
return "❌ HF_TOKEN not found. Please set HF_TOKEN in Spaces secrets." | |
# Load model with progress logging | |
logger.info("Downloading and loading model weights...") | |
self.model = AutoModelForImageTextToText.from_pretrained( | |
MODEL_ID, | |
device_map="auto", | |
torch_dtype=torch.bfloat16 if DEVICE == "cuda" else torch.float32, | |
trust_remote_code=True | |
) | |
# Load processor | |
logger.info("Loading processor...") | |
self.processor = AutoProcessor.from_pretrained( | |
MODEL_ID, | |
trust_remote_code=True | |
) | |
if DEVICE == "cpu": | |
self.model = self.model.to(DEVICE) | |
self.is_loaded = True | |
load_time = time.time() - start_time | |
logger.info(f"Model loaded successfully in {load_time:.1f} seconds") | |
return f"✅ Model loaded successfully in {load_time:.1f} seconds" | |
except Exception as e: | |
logger.error(f"Error loading model: {str(e)}") | |
return f"❌ Error loading model: {str(e)} - This may be a custom model requiring special handling" | |
# 2 minutes for action generation | |
def generate_action(self, image: Image.Image, goal: str, instruction: str) -> str: | |
"""Generate action based on image and text inputs using the same format as training""" | |
if not self.is_loaded: | |
return "❌ Model not loaded. Please load the model first." | |
try: | |
# Convert image to RGB if needed | |
if image.mode != "RGB": | |
image = image.convert("RGB") | |
# Build conversation using the EXACT same format as training | |
user_text = ( | |
f"Goal: {goal}\n" | |
f"Step: {instruction}\n" | |
"Respond with a JSON action containing relevant keys (e.g., action_type, x, y, text, app_name, direction)." | |
) | |
conversation = [ | |
{ | |
"role": "system", | |
"content": [ | |
{"type": "text", "text": "You are a helpful multimodal assistant by Liquid AI."} | |
] | |
}, | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "image", "image": image}, | |
{"type": "text", "text": user_text} | |
] | |
} | |
] | |
logger.info("Processing conversation with processor...") | |
# Process inputs using the same method as training | |
inputs = self.processor.apply_chat_template( | |
conversation, | |
add_generation_prompt=True, | |
return_tensors="pt", | |
return_dict=True, | |
tokenize=True, | |
) | |
logger.info(f"Processor output keys: {list(inputs.keys())}") | |
# Move inputs to device | |
for key, value in inputs.items(): | |
if isinstance(value, torch.Tensor): | |
inputs[key] = value.to(self.model.device) | |
logger.info(f"Inputs shape: {inputs['input_ids'].shape}, device: {inputs['input_ids'].device}") | |
# Generate response | |
logger.info("Generating response...") | |
with torch.no_grad(): | |
outputs = self.model.generate( | |
**inputs, | |
max_new_tokens=128, | |
do_sample=True, | |
temperature=0.7, | |
top_p=0.9, | |
pad_token_id=self.processor.tokenizer.eos_token_id | |
) | |
logger.info("Decoding response...") | |
# Decode the generated tokens | |
response = self.processor.tokenizer.decode( | |
outputs[0][inputs['input_ids'].shape[1]:], | |
skip_special_tokens=True | |
) | |
# Try to parse as JSON for better formatting | |
try: | |
parsed_response = json.loads(response) | |
return json.dumps(parsed_response, indent=2) | |
except: | |
return response | |
except Exception as e: | |
logger.error(f"Error generating action: {str(e)}") | |
return f"❌ Error generating action: {str(e)}" | |
# Initialize demo | |
demo_instance = LOperatorDemo() | |
def process_input(image, goal, step_instructions): | |
"""Process the input and generate action""" | |
if image is None: | |
return "❌ Please upload an Android screenshot image.", None | |
if not goal.strip(): | |
return "❌ Please provide a goal.", None | |
if not step_instructions.strip(): | |
return "❌ Please provide step instructions.", None | |
if not demo_instance.is_loaded: | |
return "❌ Model not loaded. Please wait for it to load automatically.", None | |
try: | |
# Handle different image formats | |
pil_image = None | |
if hasattr(image, 'mode'): # PIL Image object | |
pil_image = image | |
elif isinstance(image, str) and os.path.exists(image): | |
# Handle file path (from examples) | |
pil_image = Image.open(image) | |
elif hasattr(image, 'name') and os.path.exists(image.name): | |
# Handle Gradio file object | |
pil_image = Image.open(image.name) | |
else: | |
return "❌ Invalid image format. Please upload a valid image.", None | |
if pil_image is None: | |
return "❌ Failed to process image. Please try again.", None | |
# Convert image to RGB if needed | |
if pil_image.mode != "RGB": | |
pil_image = pil_image.convert("RGB") | |
# Generate action using goal and step instructions | |
response = demo_instance.generate_action(pil_image, goal, step_instructions) | |
# Parse the response to check for coordinates | |
action_data, has_coordinates = parse_action_response(response) | |
# If coordinates are found, create annotated image | |
annotated_image = None | |
if has_coordinates and isinstance(action_data, dict): | |
x = action_data.get('x') | |
y = action_data.get('y') | |
action_type = action_data.get('action_type', 'click') | |
if x is not None and y is not None: | |
annotated_image = create_annotated_image(pil_image, x, y, action_type) | |
logger.info(f"Created annotated image for coordinates ({x}, {y})") | |
return response, annotated_image | |
except Exception as e: | |
logger.error(f"Error processing input: {str(e)}") | |
return f"❌ Error: {str(e)}", None | |
def update_annotated_image_visibility(response, annotated_image): | |
"""Update the visibility of the annotated image based on whether coordinates are present""" | |
if annotated_image is not None: | |
return gr.update(visible=True, value=annotated_image) | |
else: | |
return gr.update(visible=False, value=None) | |
def load_example_episodes(): | |
"""Load example episodes using PIL to load images directly""" | |
examples = [] | |
try: | |
# Updated to include all 12 episodes with appropriate screenshot selections | |
episode_screenshots = { | |
"episode_13": 3, # Cruise deals app | |
"episode_53": 5, # Pinterest sustainability | |
"episode_73": 3, # Moon phases app | |
"episode_16730": 4, # Weather app forecast | |
"episode_17562": 3, # Ticktick reminder app | |
"episode_19565": 4, # New episode | |
"episode_19649": 2, # New episode | |
"episode_5590": 3, # New episode | |
"episode_4712": 2, # New episode | |
"episode_3731": 2, # New episode | |
"episode_2080": 2, # New episode | |
"episode_1993": 2 # New episode | |
} | |
for episode_dir, screenshot_num in episode_screenshots.items(): | |
try: | |
metadata_path = f"extracted_episodes_duckdb/{episode_dir}/metadata.json" | |
image_path = f"extracted_episodes_duckdb/{episode_dir}/screenshots/screenshot_{screenshot_num}.png" | |
# Check if both files exist | |
if os.path.exists(metadata_path) and os.path.exists(image_path): | |
logger.info(f"Loading example from {episode_dir} using screenshot_{screenshot_num}.png") | |
with open(metadata_path, "r") as f: | |
metadata = json.load(f) | |
# Load image directly with PIL | |
pil_image = Image.open(image_path) | |
episode_num = episode_dir.split('_')[1] | |
goal_text = metadata.get('goal', f'Episode {episode_num} example') | |
# Get step instruction for the corresponding screenshot | |
step_instructions = metadata.get('step_instructions', []) | |
step_instruction = "" | |
if step_instructions and screenshot_num <= len(step_instructions): | |
step_instruction = step_instructions[screenshot_num - 1] | |
logger.info(f"Episode {episode_num} goal: {goal_text}") | |
logger.info(f"Episode {episode_num} step instruction: {step_instruction}") | |
examples.append([ | |
pil_image, # Use PIL Image object directly | |
goal_text, # Use the goal text from metadata | |
step_instruction # Use the step instruction for this screenshot | |
]) | |
logger.info(f"Successfully loaded example for Episode {episode_num}") | |
except Exception as e: | |
logger.warning(f"Could not load example for {episode_dir}: {str(e)}") | |
continue | |
except Exception as e: | |
logger.error(f"Error loading examples: {str(e)}") | |
examples = [] | |
logger.info(f"Loaded {len(examples)} examples using PIL") | |
return examples | |
# Create Gradio interface | |
def create_demo(): | |
"""Create the Gradio demo interface using Blocks""" | |
with gr.Blocks( | |
title=title, | |
theme=gr.themes.Monochrome(), | |
css=""" | |
.gradio-container { | |
max-width: 1200px !important; | |
} | |
.output-container { | |
min-height: 200px; | |
} | |
.annotated-image-container { | |
border: 2px solid #e0e0e0; | |
border-radius: 8px; | |
padding: 10px; | |
margin-top: 10px; | |
} | |
""" | |
) as demo: | |
# Header section | |
gr.Markdown(title) | |
# Info section | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown(description) | |
with gr.Column(scale=1): | |
gr.Markdown(joinus) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### 📱 Upload Screenshot") | |
image_input = gr.Image( | |
label="Android Screenshot", | |
type="pil", | |
height=400 | |
) | |
gr.Markdown("### 🎯 Goal") | |
goal_input = gr.Textbox( | |
label="What would you like to accomplish?", | |
placeholder="e.g., Open the Settings app and navigate to Display settings", | |
lines=3 | |
) | |
gr.Markdown("### 📝 Step Instructions") | |
step_instructions_input = gr.Textbox( | |
label="Specific step instruction for this screenshot", | |
placeholder="e.g., Tap on the Settings icon to open the app", | |
lines=2 | |
) | |
# Process button | |
process_btn = gr.Button("🚀 Generate Action", variant="primary", size="lg") | |
with gr.Column(scale=1): | |
gr.Markdown("### 🎯 Annotated Screenshot") | |
annotated_image_output = gr.Image( | |
label="Click Location Highlighted", | |
height=400, | |
visible=False, | |
interactive=False, | |
elem_classes=["annotated-image-container"] | |
) | |
gr.Markdown("### 📊 Generated Action") | |
output_text = gr.Textbox( | |
label="JSON Action Output", | |
lines=15, | |
max_lines=20, | |
interactive=False, | |
elem_classes=["output-container"] | |
) | |
# Connect the process button | |
process_btn.click( | |
fn=process_input, | |
inputs=[image_input, goal_input, step_instructions_input], | |
outputs=[output_text, annotated_image_output] | |
).then( | |
fn=update_annotated_image_visibility, | |
inputs=[output_text, annotated_image_output], | |
outputs=annotated_image_output | |
) | |
# Load examples | |
gr.Markdown("### 📚 Example Episodes") | |
try: | |
examples = load_example_episodes() | |
if examples: | |
# Organize examples in a grid layout (3 columns) | |
for row_start in range(0, len(examples), 3): | |
with gr.Row(): | |
for i in range(row_start, min(row_start + 3, len(examples))): | |
image, goal, step_instruction = examples[i] | |
with gr.Column(scale=1): | |
episode_num = i + 1 | |
gr.Markdown(f"**Episode {episode_num}**") | |
example_image = gr.Image( | |
value=image, | |
label=f"Example {episode_num}", | |
height=150, | |
interactive=False | |
) | |
example_goal = gr.Textbox( | |
value=goal, | |
label="Goal", | |
lines=3, | |
interactive=False | |
) | |
example_step_instruction = gr.Textbox( | |
value=step_instruction, | |
label="Step Instruction", | |
lines=2, | |
interactive=False | |
) | |
# Create a button to load this example | |
load_example_btn = gr.Button(f"Load Example {episode_num}", size="sm") | |
load_example_btn.click( | |
fn=lambda img, g, s: (img, g, s), | |
inputs=[example_image, example_goal, example_step_instruction], | |
outputs=[image_input, goal_input, step_instructions_input] | |
).then( | |
fn=lambda: (None, gr.update(visible=False)), | |
outputs=[output_text, annotated_image_output] | |
) | |
except Exception as e: | |
logger.warning(f"Failed to load examples: {str(e)}") | |
gr.Markdown("❌ Failed to load examples. Please upload your own screenshot.") | |
# Load model automatically on startup | |
def load_model_on_startup(): | |
"""Load model automatically without user feedback""" | |
if not demo_instance.is_loaded: | |
logger.info("Loading L-Operator model automatically...") | |
try: | |
demo_instance.load_model() | |
logger.info("Model loaded successfully in background") | |
except Exception as e: | |
logger.error(f"Failed to load model: {str(e)}") | |
# Load model automatically on page load | |
demo.load(fn=load_model_on_startup) | |
gr.Markdown(""" | |
--- | |
**Made with ❤️ by Tonic** | [Model on Hugging Face](https://huggingface.co/Tonic/l-android-control) | |
""") | |
return demo | |
# Create and launch the demo with optimized settings | |
if __name__ == "__main__": | |
try: | |
logger.info("Creating Gradio demo interface...") | |
demo = create_demo() | |
logger.info("Launching Gradio server...") | |
demo.launch( | |
# server_name="0.0.0.0", | |
# server_port=7860, | |
# share=False, | |
# debug=False, # Disable debug to reduce startup time | |
show_error=True, | |
ssr_mode=False, | |
# max_threads=2, # Limit threads to prevent resource exhaustion | |
# quiet=True # Reduce startup logging noise | |
) | |
except Exception as e: | |
logger.error(f"Failed to launch Gradio app: {str(e)}") | |
raise |