import logging, faulthandler, sys, time, tempfile, os, requests, zipfile, io, gc, threading, psutil, json, configargparse faulthandler.enable() logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("jax_ik_server") def _excepthook(t, v, tb): logger.exception("Uncaught exception", exc_info=(t, v, tb)) sys.excepthook = _excepthook # Environment (CPU only) os.environ['JAX_PLATFORMS'] = 'cpu' os.environ['CUDA_VISIBLE_DEVICES'] = '' import jax jax.config.update("jax_default_device", "cpu") import numpy as np from fastapi import FastAPI, Request, Response from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from starlette.responses import FileResponse, JSONResponse from jax_ik.helper import deform_mesh, load_mesh_data_from_gltf, load_mesh_data_from_urdf from jax_ik.hand_specification import HandSpecification from jax_ik.smplx_statics import left_arm_bounds_dict, right_arm_bounds_dict, complete_full_body_bounds_dict from jax_ik.ik import InverseKinematicsSolver from jax_ik.objectives import ( BoneZeroRotationObj, CombinedDerivativeObj, DistanceObjTraj, SphereCollisionPenaltyObjTraj, ) def download_and_setup_files(): os.makedirs("files", exist_ok=True) # Pepper pepper_dir = "files/pepper_description-master" if not os.path.isdir(pepper_dir): logger.info("Downloading Pepper model...") try: r = requests.get("https://uni-bielefeld.sciebo.de/s/98Sy5s143XgNntb/download", stream=True); r.raise_for_status() with zipfile.ZipFile(io.BytesIO(r.content)) as z: z.extractall("files/") except Exception as e: logger.warning(f"Pepper download failed: {e}") # SMPLX smplx_file = "files/smplx.glb" if not os.path.isfile(smplx_file): logger.info("Downloading SMPLX model...") try: r = requests.get("https://uni-bielefeld.sciebo.de/s/B5StwQdiR4DW5mc/download"); r.raise_for_status() open(smplx_file, "wb").write(r.content) except Exception as e: logger.warning(f"SMPLX download failed: {e}") class IKServer: def __init__(self, args): self.args = args self.solve_lock = threading.Lock() self.process = psutil.Process(os.getpid()) # Caches self.solver_cache = {} self.urdf_solver_cache = {} self.max_cache_size = 5 self.cache_access_order = [] self.urdf_cache_access_order = [] # Animation buffers self.animation_frames_agent = [] self.animation_frames_urdf = [] # FastAPI self.app = FastAPI() self.app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"] ) os.makedirs("static", exist_ok=True) os.makedirs("files", exist_ok=True) self.app.mount("/static", StaticFiles(directory="static"), name="static") self.app.mount("/files", StaticFiles(directory="files"), name="files") # Init models (once) self._init_agent() self._setup_agent_objectives() self._init_urdf() self._setup_urdf_objectives() # Cleanup tracking self.last_cleanup_time = time.time() self.cleanup_interval = 30 self.agent_solve_counter = 0 # added self.urdf_solve_counter = 0 # added self._register_routes() logger.info("Server ready.") # NEW warmup thread to JIT both solvers early if getattr(self.args, 'warmup', True): threading.Thread(target=self._warmup_all, daemon=True).start() # ---- Warmup helpers (NEW) ---- def _warmup_agent(self): try: mand = [DistanceObjTraj(target_points=np.array([0.0,0.2,0.35]), bone_name=self.current_end_effector, use_head=True, weight=1.0)] opt = [BoneZeroRotationObj(weight=0.01)] _ = self.solver.solve(initial_rotations=self.initial_rotations, learning_rate=self.args.learning_rate, mandatory_objective_functions=tuple(mand), optional_objective_functions=tuple(opt), ik_points=1, verbose=False) logger.info("Agent warmup complete") except Exception as e: logger.warning(f"Agent warmup failed: {e}") def _warmup_urdf(self): try: mand = [DistanceObjTraj(target_points=np.array([0.3,0.3,0.35]), bone_name=self.urdf_current_end_effector, use_head=True, weight=1.0)] opt = [BoneZeroRotationObj(weight=0.01)] _ = self.urdf_solver.solve(initial_rotations=self.urdf_initial_rotations, learning_rate=self.args.learning_rate, mandatory_objective_functions=tuple(mand), optional_objective_functions=tuple(opt), ik_points=1, verbose=False) logger.info("URDF warmup complete") except Exception as e: logger.warning(f"URDF warmup failed: {e}") def _warmup_all(self): t0 = time.time() logger.info("Starting background warmup...") self._warmup_agent() self._warmup_urdf() logger.info(f"Warmup finished in {time.time()-t0:.2f}s") # ---------- Cache ---------- def _evict_lru(self, is_urdf=False): cache = self.urdf_solver_cache if is_urdf else self.solver_cache order = self.urdf_cache_access_order if is_urdf else self.cache_access_order if not order: return key = order.pop(0) cache.pop(key, None) gc.collect() def _cache_key(self, bones, num_steps): return tuple(sorted(bones)) + (int(num_steps),) def _get_solver(self, bones, is_urdf=False, num_steps=None): if num_steps is None: num_steps = self.args.num_steps key = self._cache_key(bones, num_steps) cache = self.urdf_solver_cache if is_urdf else self.solver_cache order = self.urdf_cache_access_order if is_urdf else self.cache_access_order if key in cache: if key in order: order.remove(key) order.append(key) return cache[key] if len(cache) >= self.max_cache_size: self._evict_lru(is_urdf) if is_urdf: solver = InverseKinematicsSolver( model_file=self.urdf_file, controlled_bones=bones, bounds=None, threshold=self.args.threshold, num_steps=int(num_steps), compute_sdf=False, ) else: bounds = [] for b in bones: if b in self.bounds_dict: lower, upper = self.bounds_dict[b] bounds.extend(list(zip(lower, upper))) else: bounds.extend([(-90, 90)] * 3) solver = InverseKinematicsSolver( model_file=self.args.gltf_file, controlled_bones=bones, bounds=bounds, threshold=self.args.threshold, num_steps=int(num_steps), compute_sdf=False, ) cache[key] = solver order.append(key) return solver # ---------- Initialization ---------- def _init_agent(self): self.current_num_steps = self.args.num_steps # NEW basic = InverseKinematicsSolver( model_file=self.args.gltf_file, controlled_bones=["left_collar"], bounds=[(-90,90)]*3, threshold=self.args.threshold, num_steps=self.current_num_steps, compute_sdf=False, ) self.available_bones = basic.fk_solver.bone_names self.bounds_dict = complete_full_body_bounds_dict if self.args.hand == "left": self.default_controlled_bones = list(left_arm_bounds_dict.keys()) self.default_end_effector = "left_index3" else: self.default_controlled_bones = list(right_arm_bounds_dict.keys()) self.default_end_effector = "right_index3" self.selectable_bones = [b for b in self.available_bones if b in self.bounds_dict] self.current_controlled_bones = self.default_controlled_bones.copy() self.current_end_effector = self.default_end_effector self.solver = self._get_solver(self.current_controlled_bones, is_urdf=False, num_steps=self.current_num_steps) self.initial_rotations = np.zeros(len(self.solver.controlled_bones) * 3, dtype=np.float32) self.best_angles = self.initial_rotations.copy() self.mesh_data = load_mesh_data_from_gltf(self.args.gltf_file, self.solver.fk_solver) self.animation_frames_agent = self._frames_from_angles([self.initial_rotations], False) def _init_urdf(self): self.urdf_current_num_steps = self.args.num_steps # NEW self.urdf_file = "files/pepper_description-master/urdf/pepper.urdf" basic = InverseKinematicsSolver( model_file=self.urdf_file, controlled_bones=["LShoulder"], bounds=None, threshold=self.args.threshold, num_steps=self.urdf_current_num_steps, compute_sdf=False, ) self.urdf_available_bones = basic.fk_solver.bone_names self.urdf_default_controlled_bones = ["LShoulder","LBicep","LElbow","LForeArm","l_wrist"] self.urdf_default_end_effector = "LFinger13_link" self.urdf_selectable_bones = list(self.urdf_available_bones) self.urdf_current_controlled_bones = self.urdf_default_controlled_bones.copy() self.urdf_current_end_effector = self.urdf_default_end_effector self.urdf_solver = self._get_solver(self.urdf_current_controlled_bones, is_urdf=True, num_steps=self.urdf_current_num_steps) self.urdf_initial_rotations = np.zeros(len(self.urdf_solver.controlled_bones) * 3, dtype=np.float32) self.urdf_best_angles = self.urdf_initial_rotations.copy() self.urdf_mesh_data = load_mesh_data_from_urdf(self.urdf_file, self.urdf_solver.fk_solver) self.animation_frames_urdf = self._frames_from_angles([self.urdf_initial_rotations], True) # ---------- Objectives ---------- def _setup_agent_objectives(self): self.distance_obj = DistanceObjTraj( target_points=np.array([0.0,0.2,0.35]), bone_name=self.current_end_effector, use_head=True, weight=1.0, ) self.collision_obj = SphereCollisionPenaltyObjTraj( {"center":[0.1,0.0,0.35],"radius":0.1}, min_clearance=0.0, weight=1.0, ) def _setup_urdf_objectives(self): self.urdf_distance_obj = DistanceObjTraj( target_points=np.array([0.3,0.3,0.35]), bone_name=self.urdf_current_end_effector, use_head=True, weight=1.0, ) self.urdf_collision_obj = SphereCollisionPenaltyObjTraj( {"center":[0.2,0.0,0.35],"radius":0.1}, min_clearance=0.0, weight=1.0, ) # ---------- Configuration ---------- def configure_agent(self, bones, eff, num_steps=None): if num_steps is None: num_steps = self.current_num_steps if not bones: bones = self.default_controlled_bones if eff not in self.available_bones: eff = self.default_end_effector changed = ( bones != self.current_controlled_bones or eff != self.current_end_effector or int(num_steps) != int(self.current_num_steps) ) if bones != self.current_controlled_bones or int(num_steps) != int(self.current_num_steps): self.current_controlled_bones = bones self.current_num_steps = int(num_steps) self.solver = self._get_solver(bones, is_urdf=False, num_steps=self.current_num_steps) self.initial_rotations = np.zeros(len(self.solver.controlled_bones)*3, dtype=np.float32) self.best_angles = self.initial_rotations.copy() if eff != self.current_end_effector: self.current_end_effector = eff if changed: self._setup_agent_objectives() return {"controlled_bones": self.current_controlled_bones, "end_effector": self.current_end_effector, "num_steps": self.current_num_steps} def configure_urdf(self, bones, eff, num_steps=None): if num_steps is None: num_steps = self.urdf_current_num_steps if not bones: bones = self.urdf_default_controlled_bones if eff not in self.urdf_available_bones: eff = self.urdf_default_end_effector changed = ( bones != self.urdf_current_controlled_bones or eff != self.urdf_current_end_effector or int(num_steps) != int(self.urdf_current_num_steps) ) if bones != self.urdf_current_controlled_bones or int(num_steps) != int(self.urdf_current_num_steps): self.urdf_current_controlled_bones = bones self.urdf_current_num_steps = int(num_steps) self.urdf_solver = self._get_solver(bones, is_urdf=True, num_steps=self.urdf_current_num_steps) self.urdf_initial_rotations = np.zeros(len(self.urdf_solver.controlled_bones)*3, dtype=np.float32) self.urdf_best_angles = self.urdf_initial_rotations.copy() if eff != self.urdf_current_end_effector: self.urdf_current_end_effector = eff if changed: self._setup_urdf_objectives() return {"controlled_bones": self.urdf_current_controlled_bones, "end_effector": self.urdf_current_end_effector, "num_steps": self.urdf_current_num_steps} # ---------- Objectives build ---------- def _build_agent_objectives(self, payload): tgt = np.array(payload.get("target",[0.0,0.2,0.35])) self.distance_obj.update_params({"bone_name": self.current_end_effector, "target_points": tgt, "weight": float(payload.get("distance_weight",1.0))}) self.collision_obj.update_params({"weight": float(payload.get("collision_weight",1.0))}) subpoints = int(payload.get("subpoints",1)) mandatory, optional = [], [] if payload.get("distance_enabled", True): mandatory.append(self.distance_obj) if payload.get("collision_enabled", False): optional.append(self.collision_obj) if payload.get("bone_zero_enabled", True): optional.append(BoneZeroRotationObj(weight=float(payload.get("bone_zero_weight",0.05)))) if payload.get("derivative_enabled", True) and subpoints > 1: optional.append(CombinedDerivativeObj(max_order=3, weights=[float(payload.get("derivative_weight",0.05))]*3)) elif not payload.get("bone_zero_enabled", True) and not payload.get("derivative_enabled", True): optional.append(BoneZeroRotationObj(weight=0.01)) # Hand spec hand_shape = payload.get("hand_shape","None") hand_position = payload.get("hand_position","None") params = { "is_pointing": hand_shape=="Pointing", "is_shaping": hand_shape=="Shaping", "is_flat": hand_shape=="Flat", "look_forward": hand_position=="Look Forward", "look_45_up": hand_position=="Look 45° Up", "look_45_down": hand_position=="Look 45° Down", "look_up": hand_position=="Look Up", "look_down": hand_position=="Look Down", "look_45_x_downwards": hand_position=="Look 45° X Downwards", "look_45_x_upwards": hand_position=="Look 45° X Upwards", "look_x_inward": hand_position=="Look X Inward", "look_to_body": hand_position=="Look to Body", "arm_down": hand_position=="Arm Down", "arm_45_down": hand_position=="Arm 45° Down", "arm_flat": hand_position=="Arm Flat", } if any(params.values()): spec = HandSpecification(**params) optional.extend(spec.get_objectives( left_hand=self.args.hand=="left", controlled_bones=self.current_controlled_bones, full_trajectory=subpoints>1, last_position=True, weight=0.5, )) return mandatory, optional, subpoints def _build_urdf_objectives(self, payload): tgt = np.array(payload.get("target",[0.3,0.3,0.35])) self.urdf_distance_obj.update_params({"bone_name": self.urdf_current_end_effector, "target_points": tgt, "weight": float(payload.get("distance_weight",1.0))}) self.urdf_collision_obj.update_params({"weight": float(payload.get("collision_weight",1.0))}) subpoints = int(payload.get("subpoints",1)) mandatory, optional = [], [] if payload.get("distance_enabled", True): mandatory.append(self.urdf_distance_obj) if payload.get("collision_enabled", False): optional.append(self.urdf_collision_obj) if payload.get("bone_zero_enabled", True): optional.append(BoneZeroRotationObj(weight=float(payload.get("bone_zero_weight",0.05)))) if payload.get("derivative_enabled", True) and subpoints > 1: optional.append(CombinedDerivativeObj(max_order=3, weights=[float(payload.get("derivative_weight",0.05))]*3)) elif not payload.get("bone_zero_enabled", True) and not payload.get("derivative_enabled", True): optional.append(BoneZeroRotationObj(weight=0.01)) return mandatory, optional, subpoints # ---------- Solving ---------- def _frames_from_angles(self, angles_seq, is_urdf): frames = [] for ang in angles_seq: if is_urdf: verts = deform_mesh(ang, self.urdf_solver.fk_solver, self.urdf_mesh_data) faces = self.urdf_mesh_data["faces"] else: verts = deform_mesh(ang, self.solver.fk_solver, self.mesh_data) faces = self.mesh_data["faces"] frames.append({"vertices": verts.tolist(), "faces": faces.tolist()}) return frames def solve_agent(self, payload, last_only=False): mand, opt, sub = self._build_agent_objectives(payload) start = time.time() best_angles, obj_val, steps = self.solver.solve( initial_rotations=self.initial_rotations, learning_rate=self.args.learning_rate, mandatory_objective_functions=tuple(mand), optional_objective_functions=tuple(opt), ik_points=sub, verbose=False, ) self.best_angles = best_angles[-1].copy() self.initial_rotations = self.best_angles.copy() if last_only: self.animation_frames_agent = self._frames_from_angles([best_angles[-1]], False) else: self.animation_frames_agent = self._frames_from_angles(best_angles, False) self.agent_solve_counter += 1 return { "solve_time": time.time()-start, "iterations": steps, "objective": obj_val, "frames": len(self.animation_frames_agent), "solve_id": self.agent_solve_counter, } def solve_urdf(self, payload, last_only=False): mand, opt, sub = self._build_urdf_objectives(payload) start = time.time() best_angles, obj_val, steps = self.urdf_solver.solve( initial_rotations=self.urdf_initial_rotations, learning_rate=self.args.learning_rate, mandatory_objective_functions=tuple(mand), optional_objective_functions=tuple(opt), ik_points=sub, verbose=False, ) self.urdf_best_angles = best_angles[-1].copy() self.urdf_initial_rotations = self.urdf_best_angles.copy() if last_only: self.animation_frames_urdf = self._frames_from_angles([best_angles[-1]], True) else: self.animation_frames_urdf = self._frames_from_angles(best_angles, True) self.urdf_solve_counter += 1 return { "solve_time": time.time()-start, "iterations": steps, "objective": obj_val, "frames": len(self.animation_frames_urdf), "solve_id": self.urdf_solve_counter, } # ---------- Housekeeping ---------- def _cleanup(self): now = time.time() if now - self.last_cleanup_time < self.cleanup_interval: return gc.collect() self.last_cleanup_time = now # ---------- API ---------- def _register_routes(self): @self.app.get("/") def index(): return FileResponse("static/index.html") @self.app.get("/threejs_viewer") def legacy(): return FileResponse("static/index.html") @self.app.get("/animation") def animation(request: Request): model = request.query_params.get("model","agent").lower() data = self.animation_frames_urdf if model == "pepper" else self.animation_frames_agent return Response(content=json.dumps(data), media_type="application/json") @self.app.get("/config") def cfg(model: str = "agent"): if model.lower() == "pepper": return { "model":"pepper", "available_bones": self.urdf_available_bones, "selectable_bones": self.urdf_selectable_bones, "default_controlled_bones": self.urdf_current_controlled_bones, "default_end_effector": self.urdf_current_end_effector, "end_effector_choices": self.urdf_available_bones, "hand_shapes": [], "hand_positions": [], "max_subpoints": 20, # added "default_num_steps": self.urdf_current_num_steps, # NEW } return { "model":"agent", "available_bones": self.available_bones, "selectable_bones": self.selectable_bones, "default_controlled_bones": self.current_controlled_bones, "default_end_effector": self.current_end_effector, "end_effector_choices": self.available_bones, "hand_shapes":["None","Pointing","Shaping","Flat"], "hand_positions":[ "None","Look Forward","Look 45° Up","Look 45° Down","Look Up","Look Down", "Look 45° X Downwards","Look 45° X Upwards","Look X Inward","Look to Body", "Arm Down","Arm 45° Down","Arm Flat" ], "max_subpoints": 20, # added "default_num_steps": self.current_num_steps, # NEW } @self.app.post("/configure") async def configure(request: Request): payload = await request.json() model = payload.get("model","agent").lower() num_steps = int(payload.get("num_steps", self.args.num_steps)) if model == "pepper": cfg = self.configure_urdf(payload.get("controlled_bones", []), payload.get("end_effector"), num_steps=num_steps) else: cfg = self.configure_agent(payload.get("controlled_bones", []), payload.get("end_effector"), num_steps=num_steps) return JSONResponse({"status":"ok","config":cfg}) @self.app.post("/solve") async def solve(request: Request): payload = await request.json() model = payload.get("model","agent").lower() return_mode = payload.get("frames_mode", "auto") num_steps = int(payload.get("num_steps", self.args.num_steps)) # NEW subpoints = int(payload.get("subpoints",1)) last_only = (return_mode != 'all' and subpoints == 1) self._cleanup() with self.solve_lock: try: if model == "pepper": self.configure_urdf(payload.get("controlled_bones", []), payload.get("end_effector", self.urdf_current_end_effector), num_steps=num_steps) result = self.solve_urdf(payload, last_only=last_only); result["model"]="pepper"; result["num_steps"] = num_steps if last_only: frames = list(self.animation_frames_urdf) # only one frame elif return_mode == "all" or (return_mode == "auto" and subpoints > 1): frames = list(self.animation_frames_urdf) else: frames = [self.animation_frames_urdf[-1]] result["frames_data"] = frames else: self.configure_agent(payload.get("controlled_bones", []), payload.get("end_effector", self.current_end_effector), num_steps=num_steps) result = self.solve_agent(payload, last_only=last_only); result["model"]="agent"; result["num_steps"] = num_steps if last_only: frames = list(self.animation_frames_agent) elif return_mode == "all" or (return_mode == "auto" and subpoints > 1): frames = list(self.animation_frames_agent) else: frames = [self.animation_frames_agent[-1]] result["frames_data"] = frames return JSONResponse({"status":"ok","result":result}) except Exception as e: logger.exception("Solve failed") return JSONResponse({"status":"error","message":str(e)}, status_code=500) @self.app.get("/health") def health(): return { "status":"ok", "agent_frames": len(self.animation_frames_agent), "urdf_frames": len(self.animation_frames_urdf), "cache_agent": len(self.solver_cache), "cache_urdf": len(self.urdf_solver_cache), } @self.app.get("/favicon.ico") def favicon(): return Response(status_code=204) # ---------- Main ---------- def main(): parser = configargparse.ArgumentParser(description="Inverse Kinematics Solver - Three.js Web UI", default_config_files=["config.ini"]) parser.add("--gltf_file", type=str, default="files/smplx.glb") parser.add("--hand", type=str, choices=["left","right"], default="left") parser.add("--threshold", type=float, default=0.0001) parser.add("--num_steps", type=int, default=100) parser.add("--learning_rate", type=float, default=0.2) parser.add("--subpoints", type=int, default=1) parser.add("--api_port", type=int, default=17861) parser.add("--warmup", action='store_true', default=True, help="Enable background JIT warmup") args = parser.parse_args() download_and_setup_files() srv = IKServer(args) import uvicorn uvicorn.run(srv.app, host="0.0.0.0", port=args.api_port) if __name__ == "__main__": main()