Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import logging, faulthandler, sys, time, tempfile, os, requests, zipfile, io, gc, threading, psutil, json, configargparse | |
faulthandler.enable() | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger("jax_ik_server") | |
def _excepthook(t, v, tb): | |
logger.exception("Uncaught exception", exc_info=(t, v, tb)) | |
sys.excepthook = _excepthook | |
# Environment (CPU only) | |
os.environ['JAX_PLATFORMS'] = 'cpu' | |
os.environ['CUDA_VISIBLE_DEVICES'] = '' | |
import jax | |
jax.config.update("jax_default_device", "cpu") | |
import numpy as np | |
from fastapi import FastAPI, Request, Response | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.middleware.cors import CORSMiddleware | |
from starlette.responses import FileResponse, JSONResponse | |
from jax_ik.helper import deform_mesh, load_mesh_data_from_gltf, load_mesh_data_from_urdf | |
from jax_ik.hand_specification import HandSpecification | |
from jax_ik.smplx_statics import left_arm_bounds_dict, right_arm_bounds_dict, complete_full_body_bounds_dict | |
from jax_ik.ik import InverseKinematicsSolver | |
from jax_ik.objectives import ( | |
BoneZeroRotationObj, | |
CombinedDerivativeObj, | |
DistanceObjTraj, | |
SphereCollisionPenaltyObjTraj, | |
) | |
def download_and_setup_files(): | |
os.makedirs("files", exist_ok=True) | |
# Pepper | |
pepper_dir = "files/pepper_description-master" | |
if not os.path.isdir(pepper_dir): | |
logger.info("Downloading Pepper model...") | |
try: | |
r = requests.get("https://uni-bielefeld.sciebo.de/s/98Sy5s143XgNntb/download", stream=True); r.raise_for_status() | |
with zipfile.ZipFile(io.BytesIO(r.content)) as z: z.extractall("files/") | |
except Exception as e: | |
logger.warning(f"Pepper download failed: {e}") | |
# SMPLX | |
smplx_file = "files/smplx.glb" | |
if not os.path.isfile(smplx_file): | |
logger.info("Downloading SMPLX model...") | |
try: | |
r = requests.get("https://uni-bielefeld.sciebo.de/s/B5StwQdiR4DW5mc/download"); r.raise_for_status() | |
open(smplx_file, "wb").write(r.content) | |
except Exception as e: | |
logger.warning(f"SMPLX download failed: {e}") | |
class IKServer: | |
def __init__(self, args): | |
self.args = args | |
self.solve_lock = threading.Lock() | |
self.process = psutil.Process(os.getpid()) | |
# Caches | |
self.solver_cache = {} | |
self.urdf_solver_cache = {} | |
self.max_cache_size = 5 | |
self.cache_access_order = [] | |
self.urdf_cache_access_order = [] | |
# Animation buffers | |
self.animation_frames_agent = [] | |
self.animation_frames_urdf = [] | |
# FastAPI | |
self.app = FastAPI() | |
self.app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], allow_credentials=True, | |
allow_methods=["*"], allow_headers=["*"] | |
) | |
os.makedirs("static", exist_ok=True) | |
os.makedirs("files", exist_ok=True) | |
self.app.mount("/static", StaticFiles(directory="static"), name="static") | |
self.app.mount("/files", StaticFiles(directory="files"), name="files") | |
# Init models (once) | |
self._init_agent() | |
self._setup_agent_objectives() | |
self._init_urdf() | |
self._setup_urdf_objectives() | |
# Cleanup tracking | |
self.last_cleanup_time = time.time() | |
self.cleanup_interval = 30 | |
self.agent_solve_counter = 0 # added | |
self.urdf_solve_counter = 0 # added | |
self._register_routes() | |
logger.info("Server ready.") | |
# ---------- Cache ---------- | |
def _evict_lru(self, is_urdf=False): | |
cache = self.urdf_solver_cache if is_urdf else self.solver_cache | |
order = self.urdf_cache_access_order if is_urdf else self.cache_access_order | |
if not order: return | |
key = order.pop(0) | |
cache.pop(key, None) | |
gc.collect() | |
def _cache_key(self, bones): return tuple(sorted(bones)) | |
def _get_solver(self, bones, is_urdf=False): | |
key = self._cache_key(bones) | |
cache = self.urdf_solver_cache if is_urdf else self.solver_cache | |
order = self.urdf_cache_access_order if is_urdf else self.cache_access_order | |
if key in cache: | |
if key in order: order.remove(key) | |
order.append(key) | |
return cache[key] | |
if len(cache) >= self.max_cache_size: | |
self._evict_lru(is_urdf) | |
if is_urdf: | |
solver = InverseKinematicsSolver( | |
model_file=self.urdf_file, | |
controlled_bones=bones, | |
bounds=None, | |
threshold=self.args.threshold, | |
num_steps=self.args.num_steps, | |
compute_sdf=False, | |
) | |
else: | |
bounds = [] | |
for b in bones: | |
if b in self.bounds_dict: | |
lower, upper = self.bounds_dict[b] | |
bounds.extend(list(zip(lower, upper))) | |
else: | |
bounds.extend([(-90, 90)] * 3) | |
solver = InverseKinematicsSolver( | |
model_file=self.args.gltf_file, | |
controlled_bones=bones, | |
bounds=bounds, | |
threshold=self.args.threshold, | |
num_steps=self.args.num_steps, | |
compute_sdf=False, | |
) | |
cache[key] = solver | |
order.append(key) | |
return solver | |
# ---------- Initialization ---------- | |
def _init_agent(self): | |
basic = InverseKinematicsSolver( | |
model_file=self.args.gltf_file, | |
controlled_bones=["left_collar"], | |
bounds=[(-90,90)]*3, | |
threshold=self.args.threshold, | |
num_steps=self.args.num_steps, | |
compute_sdf=False, | |
) | |
self.available_bones = basic.fk_solver.bone_names | |
self.bounds_dict = complete_full_body_bounds_dict | |
if self.args.hand == "left": | |
self.default_controlled_bones = list(left_arm_bounds_dict.keys()) | |
self.default_end_effector = "left_index3" | |
else: | |
self.default_controlled_bones = list(right_arm_bounds_dict.keys()) | |
self.default_end_effector = "right_index3" | |
self.selectable_bones = [b for b in self.available_bones if b in self.bounds_dict] | |
self.current_controlled_bones = self.default_controlled_bones.copy() | |
self.current_end_effector = self.default_end_effector | |
self.solver = self._get_solver(self.current_controlled_bones, is_urdf=False) | |
self.initial_rotations = np.zeros(len(self.solver.controlled_bones) * 3, dtype=np.float32) | |
self.best_angles = self.initial_rotations.copy() | |
self.mesh_data = load_mesh_data_from_gltf(self.args.gltf_file, self.solver.fk_solver) | |
self.animation_frames_agent = self._frames_from_angles([self.initial_rotations], False) | |
def _init_urdf(self): | |
self.urdf_file = "files/pepper_description-master/urdf/pepper.urdf" | |
basic = InverseKinematicsSolver( | |
model_file=self.urdf_file, | |
controlled_bones=["LShoulder"], | |
bounds=None, | |
threshold=self.args.threshold, | |
num_steps=self.args.num_steps, | |
compute_sdf=False, | |
) | |
self.urdf_available_bones = basic.fk_solver.bone_names | |
self.urdf_default_controlled_bones = ["LShoulder","LBicep","LElbow","LForeArm","l_wrist"] | |
self.urdf_default_end_effector = "LFinger13_link" | |
self.urdf_selectable_bones = list(self.urdf_available_bones) | |
self.urdf_current_controlled_bones = self.urdf_default_controlled_bones.copy() | |
self.urdf_current_end_effector = self.urdf_default_end_effector | |
self.urdf_solver = self._get_solver(self.urdf_current_controlled_bones, is_urdf=True) | |
self.urdf_initial_rotations = np.zeros(len(self.urdf_solver.controlled_bones) * 3, dtype=np.float32) | |
self.urdf_best_angles = self.urdf_initial_rotations.copy() | |
self.urdf_mesh_data = load_mesh_data_from_urdf(self.urdf_file, self.urdf_solver.fk_solver) | |
self.animation_frames_urdf = self._frames_from_angles([self.urdf_initial_rotations], True) | |
# ---------- Objectives ---------- | |
def _setup_agent_objectives(self): | |
self.distance_obj = DistanceObjTraj( | |
target_points=np.array([0.0,0.2,0.35]), | |
bone_name=self.current_end_effector, | |
use_head=True, | |
weight=1.0, | |
) | |
self.collision_obj = SphereCollisionPenaltyObjTraj( | |
{"center":[0.1,0.0,0.35],"radius":0.1}, | |
min_clearance=0.0, | |
weight=1.0, | |
) | |
def _setup_urdf_objectives(self): | |
self.urdf_distance_obj = DistanceObjTraj( | |
target_points=np.array([0.3,0.3,0.35]), | |
bone_name=self.urdf_current_end_effector, | |
use_head=True, | |
weight=1.0, | |
) | |
self.urdf_collision_obj = SphereCollisionPenaltyObjTraj( | |
{"center":[0.2,0.0,0.35],"radius":0.1}, | |
min_clearance=0.0, | |
weight=1.0, | |
) | |
# ---------- Configuration ---------- | |
def configure_agent(self, bones, eff): | |
if not bones: bones = self.default_controlled_bones | |
if eff not in self.available_bones: eff = self.default_end_effector | |
changed = bones != self.current_controlled_bones or eff != self.current_end_effector | |
if bones != self.current_controlled_bones: | |
self.current_controlled_bones = bones | |
self.solver = self._get_solver(bones, is_urdf=False) | |
self.initial_rotations = np.zeros(len(self.solver.controlled_bones)*3, dtype=np.float32) | |
self.best_angles = self.initial_rotations.copy() | |
if eff != self.current_end_effector: | |
self.current_end_effector = eff | |
if changed: | |
self._setup_agent_objectives() | |
return {"controlled_bones": self.current_controlled_bones, "end_effector": self.current_end_effector} | |
def configure_urdf(self, bones, eff): | |
if not bones: bones = self.urdf_default_controlled_bones | |
if eff not in self.urdf_available_bones: eff = self.urdf_default_end_effector | |
changed = bones != self.urdf_current_controlled_bones or eff != self.urdf_current_end_effector | |
if bones != self.urdf_current_controlled_bones: | |
self.urdf_current_controlled_bones = bones | |
self.urdf_solver = self._get_solver(bones, is_urdf=True) | |
self.urdf_initial_rotations = np.zeros(len(self.urdf_solver.controlled_bones)*3, dtype=np.float32) | |
self.urdf_best_angles = self.urdf_initial_rotations.copy() | |
if eff != self.urdf_current_end_effector: | |
self.urdf_current_end_effector = eff | |
if changed: | |
self._setup_urdf_objectives() | |
return {"controlled_bones": self.urdf_current_controlled_bones, "end_effector": self.urdf_current_end_effector} | |
# ---------- Objectives build ---------- | |
def _build_agent_objectives(self, payload): | |
tgt = np.array(payload.get("target",[0.0,0.2,0.35])) | |
self.distance_obj.update_params({"bone_name": self.current_end_effector, "target_points": tgt, "weight": float(payload.get("distance_weight",1.0))}) | |
self.collision_obj.update_params({"weight": float(payload.get("collision_weight",1.0))}) | |
subpoints = int(payload.get("subpoints",1)) | |
mandatory, optional = [], [] | |
if payload.get("distance_enabled", True): mandatory.append(self.distance_obj) | |
if payload.get("collision_enabled", False): optional.append(self.collision_obj) | |
if payload.get("bone_zero_enabled", True): | |
optional.append(BoneZeroRotationObj(weight=float(payload.get("bone_zero_weight",0.05)))) | |
if payload.get("derivative_enabled", True) and subpoints > 1: | |
optional.append(CombinedDerivativeObj(max_order=3, weights=[float(payload.get("derivative_weight",0.05))]*3)) | |
elif not payload.get("bone_zero_enabled", True) and not payload.get("derivative_enabled", True): | |
optional.append(BoneZeroRotationObj(weight=0.01)) | |
# Hand spec | |
hand_shape = payload.get("hand_shape","None") | |
hand_position = payload.get("hand_position","None") | |
params = { | |
"is_pointing": hand_shape=="Pointing", | |
"is_shaping": hand_shape=="Shaping", | |
"is_flat": hand_shape=="Flat", | |
"look_forward": hand_position=="Look Forward", | |
"look_45_up": hand_position=="Look 45° Up", | |
"look_45_down": hand_position=="Look 45° Down", | |
"look_up": hand_position=="Look Up", | |
"look_down": hand_position=="Look Down", | |
"look_45_x_downwards": hand_position=="Look 45° X Downwards", | |
"look_45_x_upwards": hand_position=="Look 45° X Upwards", | |
"look_x_inward": hand_position=="Look X Inward", | |
"look_to_body": hand_position=="Look to Body", | |
"arm_down": hand_position=="Arm Down", | |
"arm_45_down": hand_position=="Arm 45° Down", | |
"arm_flat": hand_position=="Arm Flat", | |
} | |
if any(params.values()): | |
spec = HandSpecification(**params) | |
optional.extend(spec.get_objectives( | |
left_hand=self.args.hand=="left", | |
controlled_bones=self.current_controlled_bones, | |
full_trajectory=subpoints>1, | |
last_position=True, | |
weight=0.5, | |
)) | |
return mandatory, optional, subpoints | |
def _build_urdf_objectives(self, payload): | |
tgt = np.array(payload.get("target",[0.3,0.3,0.35])) | |
self.urdf_distance_obj.update_params({"bone_name": self.urdf_current_end_effector, "target_points": tgt, "weight": float(payload.get("distance_weight",1.0))}) | |
self.urdf_collision_obj.update_params({"weight": float(payload.get("collision_weight",1.0))}) | |
subpoints = int(payload.get("subpoints",1)) | |
mandatory, optional = [], [] | |
if payload.get("distance_enabled", True): mandatory.append(self.urdf_distance_obj) | |
if payload.get("collision_enabled", False): optional.append(self.urdf_collision_obj) | |
if payload.get("bone_zero_enabled", True): | |
optional.append(BoneZeroRotationObj(weight=float(payload.get("bone_zero_weight",0.05)))) | |
if payload.get("derivative_enabled", True) and subpoints > 1: | |
optional.append(CombinedDerivativeObj(max_order=3, weights=[float(payload.get("derivative_weight",0.05))]*3)) | |
elif not payload.get("bone_zero_enabled", True) and not payload.get("derivative_enabled", True): | |
optional.append(BoneZeroRotationObj(weight=0.01)) | |
return mandatory, optional, subpoints | |
# ---------- Solving ---------- | |
def _frames_from_angles(self, angles_seq, is_urdf): | |
frames = [] | |
for ang in angles_seq: | |
if is_urdf: | |
verts = deform_mesh(ang, self.urdf_solver.fk_solver, self.urdf_mesh_data) | |
faces = self.urdf_mesh_data["faces"] | |
else: | |
verts = deform_mesh(ang, self.solver.fk_solver, self.mesh_data) | |
faces = self.mesh_data["faces"] | |
frames.append({"vertices": verts.tolist(), "faces": faces.tolist()}) | |
return frames | |
def solve_agent(self, payload): | |
mand, opt, sub = self._build_agent_objectives(payload) | |
start = time.time() | |
best_angles, obj_val, steps = self.solver.solve( | |
initial_rotations=self.initial_rotations, | |
learning_rate=self.args.learning_rate, | |
mandatory_objective_functions=tuple(mand), | |
optional_objective_functions=tuple(opt), | |
ik_points=sub, | |
verbose=False, | |
) | |
self.best_angles = best_angles[-1].copy() | |
self.initial_rotations = self.best_angles.copy() | |
self.animation_frames_agent = self._frames_from_angles(best_angles, False) | |
print(f"Solved in {time.time()-start:.2f}s over {steps} steps. Obj: {obj_val:.6f}") | |
self.agent_solve_counter += 1 # added | |
return { | |
"solve_time": time.time()-start, | |
"iterations": steps, | |
"objective": obj_val, | |
"frames": len(self.animation_frames_agent), | |
"solve_id": self.agent_solve_counter, # added | |
} | |
def solve_urdf(self, payload): | |
mand, opt, sub = self._build_urdf_objectives(payload) | |
start = time.time() | |
best_angles, obj_val, steps = self.urdf_solver.solve( | |
initial_rotations=self.urdf_initial_rotations, | |
learning_rate=self.args.learning_rate, | |
mandatory_objective_functions=tuple(mand), | |
optional_objective_functions=tuple(opt), | |
ik_points=sub, | |
verbose=False, | |
) | |
self.urdf_best_angles = best_angles[-1].copy() | |
self.urdf_initial_rotations = self.urdf_best_angles.copy() | |
self.animation_frames_urdf = self._frames_from_angles(best_angles, True) | |
print( | |
f"Solved in {time.time() - start:.2f}s over {steps} steps. Obj: {obj_val:.6f}" | |
) | |
self.urdf_solve_counter += 1 # added | |
return { | |
"solve_time": time.time()-start, | |
"iterations": steps, | |
"objective": obj_val, | |
"frames": len(self.animation_frames_urdf), | |
"solve_id": self.urdf_solve_counter, # added | |
} | |
# ---------- Housekeeping ---------- | |
def _cleanup(self): | |
now = time.time() | |
if now - self.last_cleanup_time < self.cleanup_interval: return | |
gc.collect() | |
self.last_cleanup_time = now | |
# ---------- API ---------- | |
def _register_routes(self): | |
def index(): return FileResponse("static/index.html") | |
def legacy(): return FileResponse("static/index.html") | |
def animation(request: Request): | |
model = request.query_params.get("model","agent").lower() | |
data = self.animation_frames_urdf if model == "pepper" else self.animation_frames_agent | |
return Response(content=json.dumps(data), media_type="application/json") | |
def cfg(model: str = "agent"): | |
if model.lower() == "pepper": | |
return { | |
"model":"pepper", | |
"available_bones": self.urdf_available_bones, | |
"selectable_bones": self.urdf_selectable_bones, | |
"default_controlled_bones": self.urdf_current_controlled_bones, | |
"default_end_effector": self.urdf_current_end_effector, | |
"end_effector_choices": self.urdf_available_bones, | |
"hand_shapes": [], | |
"hand_positions": [], | |
"max_subpoints": 20, # added | |
} | |
return { | |
"model":"agent", | |
"available_bones": self.available_bones, | |
"selectable_bones": self.selectable_bones, | |
"default_controlled_bones": self.current_controlled_bones, | |
"default_end_effector": self.current_end_effector, | |
"end_effector_choices": self.available_bones, | |
"hand_shapes":["None","Pointing","Shaping","Flat"], | |
"hand_positions":[ | |
"None","Look Forward","Look 45° Up","Look 45° Down","Look Up","Look Down", | |
"Look 45° X Downwards","Look 45° X Upwards","Look X Inward","Look to Body", | |
"Arm Down","Arm 45° Down","Arm Flat" | |
], | |
"max_subpoints": 20, # added | |
} | |
async def configure(request: Request): | |
payload = await request.json() | |
model = payload.get("model","agent").lower() | |
if model == "pepper": | |
cfg = self.configure_urdf(payload.get("controlled_bones", []), payload.get("end_effector")) | |
else: | |
cfg = self.configure_agent(payload.get("controlled_bones", []), payload.get("end_effector")) | |
return JSONResponse({"status":"ok","config":cfg}) | |
async def solve(request: Request): | |
payload = await request.json() | |
model = payload.get("model","agent").lower() | |
return_mode = payload.get("frames_mode", "auto") | |
self._cleanup() | |
with self.solve_lock: | |
try: | |
if model == "pepper": | |
self.configure_urdf(payload.get("controlled_bones", []), | |
payload.get("end_effector", self.urdf_current_end_effector)) | |
result = self.solve_urdf(payload); result["model"]="pepper" | |
if return_mode == "all" or (return_mode == "auto" and payload.get("subpoints",1) > 1): | |
frames = list(self.animation_frames_urdf) # ensure fresh list | |
else: | |
frames = [self.animation_frames_urdf[-1].copy()] if hasattr(self.animation_frames_urdf[-1],'copy') else [dict(self.animation_frames_urdf[-1])] | |
result["frames_data"] = frames | |
else: | |
self.configure_agent(payload.get("controlled_bones", []), | |
payload.get("end_effector", self.current_end_effector)) | |
result = self.solve_agent(payload); result["model"]="agent" | |
if return_mode == "all" or (return_mode == "auto" and payload.get("subpoints",1) > 1): | |
frames = list(self.animation_frames_agent) | |
else: | |
frames = [self.animation_frames_agent[-1].copy()] if hasattr(self.animation_frames_agent[-1],'copy') else [dict(self.animation_frames_agent[-1])] | |
result["frames_data"] = frames | |
return JSONResponse({"status":"ok","result":result}) | |
except Exception as e: | |
logger.exception("Solve failed") | |
return JSONResponse({"status":"error","message":str(e)}, status_code=500) | |
def health(): | |
return { | |
"status":"ok", | |
"agent_frames": len(self.animation_frames_agent), | |
"urdf_frames": len(self.animation_frames_urdf), | |
"cache_agent": len(self.solver_cache), | |
"cache_urdf": len(self.urdf_solver_cache), | |
} | |
def favicon(): return Response(status_code=204) | |
# ---------- Main ---------- | |
def main(): | |
parser = configargparse.ArgumentParser(description="Inverse Kinematics Solver - Three.js Web UI", default_config_files=["config.ini"]) | |
parser.add("--gltf_file", type=str, default="files/smplx.glb") | |
parser.add("--hand", type=str, choices=["left","right"], default="left") | |
parser.add("--threshold", type=float, default=0.0001) | |
parser.add("--num_steps", type=int, default=100) | |
parser.add("--learning_rate", type=float, default=0.2) | |
parser.add("--subpoints", type=int, default=1) | |
parser.add("--api_port", type=int, default=17861) | |
args = parser.parse_args() | |
download_and_setup_files() | |
srv = IKServer(args) | |
import uvicorn | |
uvicorn.run(srv.app, host="0.0.0.0", port=args.api_port) | |
if __name__ == "__main__": | |
main() | |