|
import os |
|
import json |
|
import time |
|
import requests |
|
import gradio as gr |
|
|
|
|
|
ENDPOINT_URL = (os.environ.get("ENDPOINT_URL") or "https://erxvjreo1onxvdf7.us-east4.gcp.endpoints.huggingface.cloud").strip().rstrip("/") |
|
HF_TOKEN = (os.environ.get("HF_TOKEN") or "").strip() |
|
|
|
|
|
print(f"🚀 DEBUG: ENDPOINT_URL set to: {ENDPOINT_URL}") |
|
print(f"🚀 DEBUG: HF_TOKEN present: {'Yes' if HF_TOKEN else 'No'}") |
|
|
|
if not ENDPOINT_URL: |
|
raise RuntimeError("Missing ENDPOINT_URL Space secret") |
|
|
|
HEADERS = { |
|
"Content-Type": "application/json", |
|
"Accept": "application/json", |
|
} |
|
if HF_TOKEN: |
|
HEADERS["Authorization"] = f"Bearer {HF_TOKEN}" |
|
|
|
SYSTEM_PROMPT_DEFAULT = """You are a helpful AI assistant for Isaac Sim 5.0, Isaac Lab 2.1, and Omniverse Kit 107.3 robotics development. You specialize in NVIDIA robotics development, computer vision, sensor integration, and simulation workflows. |
|
|
|
CRITICAL API GUIDANCE - Isaac Sim 5.0 Extension System: |
|
|
|
📦 CORE EXTENSIONS (isaacsim.*): |
|
✅ isaacsim.core.api - World, SimulationContext |
|
✅ isaacsim.core.prims - Articulation, RigidPrim, XFormPrim |
|
✅ isaacsim.core.api.objects - DynamicCuboid, VisualCuboid |
|
✅ isaacsim.core.utils.stage - add_reference_to_stage |
|
✅ isaacsim.storage.native - get_assets_root_path |
|
✅ isaacsim.sensors.camera - Camera APIs |
|
✅ isaacsim.sensors.physics - Contact, Effort, IMU sensors |
|
✅ isaacsim.robot.manipulators - Manipulator APIs |
|
✅ isaacsim.replicator.* - Synthetic data generation |
|
|
|
🔧 USD/PHYSICS (pxr/omni): |
|
✅ from pxr import UsdPhysics, PhysxSchema, Gf, UsdGeom |
|
✅ import omni.usd - USD stage operations |
|
✅ import omni.graph.core as og - OmniGraph |
|
✅ import carb - Logging framework |
|
|
|
🎯 CORRECT PATTERNS: |
|
|
|
Basic Setup: |
|
```python |
|
from isaacsim import SimulationApp |
|
simulation_app = SimulationApp({"headless": False}) |
|
|
|
from isaacsim.core.api import World |
|
from isaacsim.core.prims import Articulation |
|
from isaacsim.storage.native import get_assets_root_path |
|
``` |
|
|
|
Robot Loading: |
|
```python |
|
from isaacsim.core.utils.stage import add_reference_to_stage |
|
asset_path = get_assets_root_path() + "<path_to_asset>" |
|
add_reference_to_stage(usd_path=asset_path, prim_path="/World/Robot") |
|
robot = Articulation(prim_paths_expr="/World/Robot") |
|
``` |
|
|
|
Sensors: |
|
```python |
|
from isaacsim.sensors.camera import Camera |
|
from isaacsim.sensors.physics import ContactSensor, IMUSensor |
|
``` |
|
|
|
USD Operations: |
|
```python |
|
from pxr import UsdPhysics, UsdGeom, Gf |
|
import omni.usd |
|
stage = omni.usd.get_context().get_stage() |
|
``` |
|
|
|
Always provide complete, executable Isaac Sim 5.0 code with proper extension imports.""" |
|
|
|
DEFAULT_MAX_NEW_TOKENS = 1024 |
|
DEFAULT_MAX_INPUT_TOKENS = 2048 |
|
|
|
def to_single_turn(messages): |
|
lines = [] |
|
for m in messages: |
|
role = m.get("role", "user").capitalize() |
|
lines.append(f"{role}: {m.get('content','')}") |
|
lines.append("Assistant:") |
|
return "\n".join(lines) |
|
|
|
def call_endpoint(messages, parameters): |
|
start = time.time() |
|
|
|
|
|
print(f"🔍 DEBUG: Calling endpoint: {ENDPOINT_URL}") |
|
print(f"🔍 DEBUG: Headers: {HEADERS}") |
|
|
|
|
|
payload_inputs = {"inputs": to_single_turn(messages), "parameters": parameters} |
|
print(f"🔍 DEBUG: Payload: {payload_inputs}") |
|
|
|
resp = requests.post(ENDPOINT_URL, headers=HEADERS, json=payload_inputs, timeout=120) |
|
latency = time.time() - start |
|
|
|
print(f"🔍 DEBUG: Response status: {resp.status_code}") |
|
print(f"🔍 DEBUG: Response body: {resp.text}") |
|
|
|
if resp.status_code == 200: |
|
data = resp.json() |
|
text = data.get("generated_text") if isinstance(data, dict) else str(data) |
|
return text or "", latency |
|
|
|
|
|
print(f"🔍 DEBUG: First attempt failed, trying messages format...") |
|
resp2 = requests.post( |
|
ENDPOINT_URL, |
|
headers=HEADERS, |
|
json={"messages": messages, "parameters": parameters}, |
|
timeout=120, |
|
) |
|
latency = time.time() - start |
|
print(f"🔍 DEBUG: Fallback response status: {resp2.status_code}") |
|
print(f"🔍 DEBUG: Fallback response body: {resp2.text}") |
|
|
|
if resp2.status_code == 200: |
|
data = resp2.json() |
|
text = data.get("generated_text") if isinstance(data, dict) else str(data) |
|
return text or "", latency |
|
|
|
return f"HTTP {resp.status_code}/{resp2.status_code}: {resp.text or resp2.text}", latency |
|
|
|
def build_messages(chat_history, user_input, system_prompt): |
|
messages = [] |
|
if system_prompt and system_prompt.strip(): |
|
messages.append({"role": "system", "content": system_prompt.strip()}) |
|
else: |
|
messages.append({"role": "system", "content": SYSTEM_PROMPT_DEFAULT}) |
|
for u, b in chat_history: |
|
if u: |
|
messages.append({"role": "user", "content": u}) |
|
if b: |
|
messages.append({"role": "assistant", "content": b}) |
|
if user_input: |
|
messages.append({"role": "user", "content": user_input}) |
|
return messages |
|
|
|
def trim_history(chat_history, max_turns=4): |
|
return chat_history[-max_turns:] |
|
|
|
def to_chatbot_messages(chat_history): |
|
msgs = [] |
|
for u, a in chat_history: |
|
if u: |
|
msgs.append({"role": "user", "content": u}) |
|
if a: |
|
msgs.append({"role": "assistant", "content": a}) |
|
return msgs |
|
|
|
def respond(user_input, chat_history, temperature, top_p, max_new_tokens, max_input_tokens, system_prompt): |
|
if not user_input: |
|
return gr.update(value=""), chat_history, to_chatbot_messages(chat_history), gr.update(value="") |
|
|
|
chat_history = trim_history(chat_history, max_turns=4) |
|
|
|
params = { |
|
"max_new_tokens": int(max_new_tokens), |
|
"temperature": float(temperature), |
|
"top_p": float(top_p), |
|
"max_input_tokens": int(max_input_tokens), |
|
} |
|
messages = build_messages(chat_history, user_input, system_prompt) |
|
|
|
|
|
chat_history = chat_history + [(user_input, None)] |
|
reply, latency = call_endpoint(messages, params) |
|
chat_history[-1] = (user_input, reply) |
|
|
|
|
|
return "", chat_history, to_chatbot_messages(chat_history), f"{latency:.2f}s" |
|
|
|
def new_chat(): |
|
return [], [], "" |
|
|
|
custom_css = """ |
|
#app {max-width: 980px; margin: 0 auto;} |
|
footer {visibility: hidden;} |
|
.gradio-container {font-size: 14px;} |
|
#controls .label-wrap {min-width: 160px;} |
|
""" |
|
|
|
with gr.Blocks(title="Qwen2.5‑Coder‑7B‑Instruct‑Omni1.1 (Isaac Sim Robotics Assistant)", css=custom_css) as demo: |
|
gr.Markdown("### Qwen2.5‑Coder‑7B‑Instruct‑Omni1.1\nChat with your Isaac Sim 5.0 robotics development assistant. This Space calls the TomBombadyl/Qwen2.5-Coder-7B-Instruct-Omni1.1 Inference Endpoint powered by NVIDIA L4 GPU.") |
|
|
|
|
|
chatbot = gr.Chatbot(height=520, show_copy_button=True, type="messages") |
|
|
|
|
|
with gr.Row(): |
|
user_input = gr.Textbox(placeholder="Ask about Isaac Sim robotics, computer vision, sensors, simulation...", lines=2, scale=8) |
|
send_btn = gr.Button("Send", variant="primary", scale=1) |
|
new_btn = gr.Button("New chat", scale=1) |
|
|
|
|
|
with gr.Row(): |
|
latency_lbl = gr.Label(value="", label="Latency") |
|
|
|
|
|
with gr.Accordion("Advanced settings", open=False): |
|
with gr.Row(elem_id="controls"): |
|
temperature = gr.Slider(0.0, 1.5, value=0.2, step=0.05, label="temperature") |
|
top_p = gr.Slider(0.1, 1.0, value=0.7, step=0.01, label="top_p") |
|
max_new_tokens = gr.Slider(16, 1024, value=DEFAULT_MAX_NEW_TOKENS, step=128, label="max_new_tokens") |
|
max_input_tokens = gr.Slider(256, 8192, value=DEFAULT_MAX_INPUT_TOKENS, step=256, label="max_input_tokens") |
|
system_prompt = gr.Textbox( |
|
value=SYSTEM_PROMPT_DEFAULT, |
|
label="System prompt", |
|
lines=3, |
|
placeholder="Optional system instruction for the assistant", |
|
) |
|
|
|
chat_state = gr.State([]) |
|
|
|
|
|
send_btn.click( |
|
fn=respond, |
|
inputs=[user_input, chat_state, temperature, top_p, max_new_tokens, max_input_tokens, system_prompt], |
|
outputs=[user_input, chat_state, chatbot, latency_lbl], |
|
) |
|
user_input.submit( |
|
fn=respond, |
|
inputs=[user_input, chat_state, temperature, top_p, max_new_tokens, max_input_tokens, system_prompt], |
|
outputs=[user_input, chat_state, chatbot, latency_lbl], |
|
) |
|
|
|
|
|
new_btn.click(fn=new_chat, outputs=[chat_state, chatbot, latency_lbl]) |
|
|
|
|
|
demo.queue() |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |