import os import json import time import requests import gradio as gr # Read secrets and sanitize URL ENDPOINT_URL = (os.environ.get("ENDPOINT_URL") or "https://erxvjreo1onxvdf7.us-east4.gcp.endpoints.huggingface.cloud").strip().rstrip("/") HF_TOKEN = (os.environ.get("HF_TOKEN") or "").strip() # Debug logging print(f"πŸš€ DEBUG: ENDPOINT_URL set to: {ENDPOINT_URL}") print(f"πŸš€ DEBUG: HF_TOKEN present: {'Yes' if HF_TOKEN else 'No'}") if not ENDPOINT_URL: raise RuntimeError("Missing ENDPOINT_URL Space secret") HEADERS = { "Content-Type": "application/json", "Accept": "application/json", } if HF_TOKEN: HEADERS["Authorization"] = f"Bearer {HF_TOKEN}" SYSTEM_PROMPT_DEFAULT = """You are a helpful AI assistant for Isaac Sim 5.0, Isaac Lab 2.1, and Omniverse Kit 107.3 robotics development. You specialize in NVIDIA robotics development, computer vision, sensor integration, and simulation workflows. CRITICAL API GUIDANCE - Isaac Sim 5.0 Extension System: πŸ“¦ CORE EXTENSIONS (isaacsim.*): βœ… isaacsim.core.api - World, SimulationContext βœ… isaacsim.core.prims - Articulation, RigidPrim, XFormPrim βœ… isaacsim.core.api.objects - DynamicCuboid, VisualCuboid βœ… isaacsim.core.utils.stage - add_reference_to_stage βœ… isaacsim.storage.native - get_assets_root_path βœ… isaacsim.sensors.camera - Camera APIs βœ… isaacsim.sensors.physics - Contact, Effort, IMU sensors βœ… isaacsim.robot.manipulators - Manipulator APIs βœ… isaacsim.replicator.* - Synthetic data generation πŸ”§ USD/PHYSICS (pxr/omni): βœ… from pxr import UsdPhysics, PhysxSchema, Gf, UsdGeom βœ… import omni.usd - USD stage operations βœ… import omni.graph.core as og - OmniGraph βœ… import carb - Logging framework 🎯 CORRECT PATTERNS: Basic Setup: ```python from isaacsim import SimulationApp simulation_app = SimulationApp({"headless": False}) from isaacsim.core.api import World from isaacsim.core.prims import Articulation from isaacsim.storage.native import get_assets_root_path ``` Robot Loading: ```python from isaacsim.core.utils.stage import add_reference_to_stage asset_path = get_assets_root_path() + "" add_reference_to_stage(usd_path=asset_path, prim_path="/World/Robot") robot = Articulation(prim_paths_expr="/World/Robot") ``` Sensors: ```python from isaacsim.sensors.camera import Camera from isaacsim.sensors.physics import ContactSensor, IMUSensor ``` USD Operations: ```python from pxr import UsdPhysics, UsdGeom, Gf import omni.usd stage = omni.usd.get_context().get_stage() ``` Always provide complete, executable Isaac Sim 5.0 code with proper extension imports.""" DEFAULT_MAX_NEW_TOKENS = 1024 DEFAULT_MAX_INPUT_TOKENS = 2048 def to_single_turn(messages): lines = [] for m in messages: role = m.get("role", "user").capitalize() lines.append(f"{role}: {m.get('content','')}") lines.append("Assistant:") return "\n".join(lines) def call_endpoint(messages, parameters): start = time.time() # Debug logging print(f"πŸ” DEBUG: Calling endpoint: {ENDPOINT_URL}") print(f"πŸ” DEBUG: Headers: {HEADERS}") # Prefer single-turn first (matches your handler expectations) payload_inputs = {"inputs": to_single_turn(messages), "parameters": parameters} print(f"πŸ” DEBUG: Payload: {payload_inputs}") resp = requests.post(ENDPOINT_URL, headers=HEADERS, json=payload_inputs, timeout=120) latency = time.time() - start print(f"πŸ” DEBUG: Response status: {resp.status_code}") print(f"πŸ” DEBUG: Response body: {resp.text}") if resp.status_code == 200: data = resp.json() text = data.get("generated_text") if isinstance(data, dict) else str(data) return text or "", latency # Fallback to messages for servers that support chat print(f"πŸ” DEBUG: First attempt failed, trying messages format...") resp2 = requests.post( ENDPOINT_URL, headers=HEADERS, json={"messages": messages, "parameters": parameters}, timeout=120, ) latency = time.time() - start print(f"πŸ” DEBUG: Fallback response status: {resp2.status_code}") print(f"πŸ” DEBUG: Fallback response body: {resp2.text}") if resp2.status_code == 200: data = resp2.json() text = data.get("generated_text") if isinstance(data, dict) else str(data) return text or "", latency return f"HTTP {resp.status_code}/{resp2.status_code}: {resp.text or resp2.text}", latency def build_messages(chat_history, user_input, system_prompt): messages = [] if system_prompt and system_prompt.strip(): messages.append({"role": "system", "content": system_prompt.strip()}) else: messages.append({"role": "system", "content": SYSTEM_PROMPT_DEFAULT}) for u, b in chat_history: if u: messages.append({"role": "user", "content": u}) if b: messages.append({"role": "assistant", "content": b}) if user_input: messages.append({"role": "user", "content": user_input}) return messages def trim_history(chat_history, max_turns=4): return chat_history[-max_turns:] def to_chatbot_messages(chat_history): msgs = [] for u, a in chat_history: if u: msgs.append({"role": "user", "content": u}) if a: msgs.append({"role": "assistant", "content": a}) return msgs def respond(user_input, chat_history, temperature, top_p, max_new_tokens, max_input_tokens, system_prompt): if not user_input: return gr.update(value=""), chat_history, to_chatbot_messages(chat_history), gr.update(value="") chat_history = trim_history(chat_history, max_turns=4) params = { "max_new_tokens": int(max_new_tokens), "temperature": float(temperature), "top_p": float(top_p), "max_input_tokens": int(max_input_tokens), } messages = build_messages(chat_history, user_input, system_prompt) # Show the user message immediately chat_history = chat_history + [(user_input, None)] reply, latency = call_endpoint(messages, params) chat_history[-1] = (user_input, reply) # Clear input, update state, update chatbot (messages format), update latency return "", chat_history, to_chatbot_messages(chat_history), f"{latency:.2f}s" def new_chat(): return [], [], "" custom_css = """ #app {max-width: 980px; margin: 0 auto;} footer {visibility: hidden;} .gradio-container {font-size: 14px;} #controls .label-wrap {min-width: 160px;} """ with gr.Blocks(title="Qwen2.5‑Coder‑7B‑Instruct‑Omni1.1 (Isaac Sim Robotics Assistant)", css=custom_css) as demo: gr.Markdown("### Qwen2.5‑Coder‑7B‑Instruct‑Omni1.1\nChat with your Isaac Sim 5.0 robotics development assistant. This Space calls the TomBombadyl/Qwen2.5-Coder-7B-Instruct-Omni1.1 Inference Endpoint powered by NVIDIA L4 GPU.") # Chat at the top (messages format to avoid deprecation) chatbot = gr.Chatbot(height=520, show_copy_button=True, type="messages") # Input row with gr.Row(): user_input = gr.Textbox(placeholder="Ask about Isaac Sim robotics, computer vision, sensors, simulation...", lines=2, scale=8) send_btn = gr.Button("Send", variant="primary", scale=1) new_btn = gr.Button("New chat", scale=1) # Right-aligned utility row with gr.Row(): latency_lbl = gr.Label(value="", label="Latency") # Advanced settings (collapsed) with gr.Accordion("Advanced settings", open=False): with gr.Row(elem_id="controls"): temperature = gr.Slider(0.0, 1.5, value=0.2, step=0.05, label="temperature") top_p = gr.Slider(0.1, 1.0, value=0.7, step=0.01, label="top_p") max_new_tokens = gr.Slider(16, 1024, value=DEFAULT_MAX_NEW_TOKENS, step=128, label="max_new_tokens") max_input_tokens = gr.Slider(256, 8192, value=DEFAULT_MAX_INPUT_TOKENS, step=256, label="max_input_tokens") system_prompt = gr.Textbox( value=SYSTEM_PROMPT_DEFAULT, label="System prompt", lines=3, placeholder="Optional system instruction for the assistant", ) chat_state = gr.State([]) # still store as list of (user, assistant) tuples # Return chatbot directly so responses render immediately send_btn.click( fn=respond, inputs=[user_input, chat_state, temperature, top_p, max_new_tokens, max_input_tokens, system_prompt], outputs=[user_input, chat_state, chatbot, latency_lbl], ) user_input.submit( fn=respond, inputs=[user_input, chat_state, temperature, top_p, max_new_tokens, max_input_tokens, system_prompt], outputs=[user_input, chat_state, chatbot, latency_lbl], ) # New chat resets state and chatbot new_btn.click(fn=new_chat, outputs=[chat_state, chatbot, latency_lbl]) # Enable queuing with defaults (avoid unsupported keyword args on older Gradio) demo.queue() if __name__ == "__main__": demo.launch()