# analyzer.py import os, time, zipfile, numpy as np, cv2, math, pathlib from datetime import datetime from collections import deque, Counter import mediapipe as mp # -------- Paths -------- OUTPUT_DIR = "/tmp/form_analysis" os.makedirs(OUTPUT_DIR, exist_ok=True) # -------- Styles & constants -------- mp_pose = mp.solutions.pose FONT=cv2.FONT_HERSHEY_SIMPLEX; LINE=cv2.LINE_AA COLOR_TEXT=(255,255,255); COLOR_BOX=(30,30,30); COLOR_BORDER=(80,80,80) COLOR_BONE=(255,220,180); COLOR_BONE_SHADOW=(35,35,35) COLOR_JOINT=(255,80,80); COLOR_JOINT_EDGE=(255,255,255) COLOR_GREEN=(80,190,80); COLOR_AMBER=(80,170,255); COLOR_RED=(40,60,255) COLOR_CYAN=(255,255,0) # ankle line + stride label border (BGR) TIMES="x"; VIS_THRESH=0.60 LABEL_WINDOW_SEC=1.5 MIN_DROPOUT_HOLD_SEC=0.07 # -------- Drawing helpers -------- def measure_box(text, fs=0.9, th=2, pad_x=12, pad_y=10): (w,h),_=cv2.getTextSize(text,FONT,fs,th) return w+2*pad_x, h+2*pad_y def draw_box(img, text, xy, border_color=COLOR_BORDER, fs=0.9, th=2, alpha=0.82): x,y=xy; w,h=measure_box(text,fs,th) overlay=img.copy() cv2.rectangle(overlay,(x,y),(x+w,y+h),COLOR_BOX,-1) cv2.addWeighted(overlay,alpha,img,1-alpha,0,img) cv2.rectangle(img,(x,y),(x+w,y+h),border_color,2,LINE) (tw,thh),_=cv2.getTextSize(text,FONT,fs,th) cv2.putText(img,text,(x+12,y+10+thh),FONT,fs,COLOR_TEXT,th,LINE) return (x,y,w,h) def draw_bottom_labels(img, items, gap=10, margin=16, fs=0.9): H,W=img.shape[:2] x=margin; y=H-margin row_h=0 rows=[[]]; widths=[margin] for text,color in items: w,h=measure_box(text,fs) if widths[-1]+w+margin>W and rows[-1]: rows.append([]); widths.append(margin) rows[-1].append((text,color,w,h)) widths[-1]+=w+gap row_h=max(row_h,h) for row in rows[::-1]: x=margin; y-=row_h+4 for text,color,w,h in row: draw_box(img,text,(x,y),border_color=color,fs=fs) x+=w+gap # -------- Skeleton (raw for zero lag; slight head stabilization) -------- POSE_EDGES = list(mp_pose.POSE_CONNECTIONS) HEAD_IDS = [ mp_pose.PoseLandmark.NOSE.value, mp_pose.PoseLandmark.LEFT_EYE.value, mp_pose.PoseLandmark.RIGHT_EYE.value, mp_pose.PoseLandmark.LEFT_EAR.value, mp_pose.PoseLandmark.RIGHT_EAR.value, mp_pose.PoseLandmark.MOUTH_LEFT.value, mp_pose.PoseLandmark.MOUTH_RIGHT.value ] def draw_skeleton(img, lm_raw, W, H, head_override=None, visibility_thresh=0.5): t_line = max(2, W//360); t_shadow = t_line+2 r_joint = max(3, W//190); r_edge = r_joint+2 pts={} for i,p in enumerate(lm_raw): if p.visibility180 else ang)) def sev_label(angle): if angle<30 or angle>160: return "Fix",COLOR_RED if angle<50 or angle>140: return "Improve",COLOR_AMBER return "Good",COLOR_GREEN def confidence_word(avgv): return "High" if avgv>=0.85 else ("Medium" if avgv>=0.7 else "Low") class LowPass: def __init__(self): self.y=None def filt(self,x,a): self.y=x if self.y is None else (a*x+(1-a)*self.y); return self.y def alpha_from_cutoff(cutoff,freq): if cutoff<=0: return 1.0 tau=1.0/(2*np.pi*cutoff); return 1.0/(1.0+tau*freq) class OneEuro: def __init__(self,freq,min_cutoff=1.3,beta=0.25,d_cutoff=1.0): self.freq=freq; self.min_cut=min_cutoff; self.beta=beta; self.d_cut=d_cutoff self.dx=LowPass(); self.x=LowPass(); self.last=None def filt(self,x): if self.last is None: self.last=x dx=(x-self.last)*self.freq edx=self.dx.filt(dx, alpha_from_cutoff(self.d_cut, self.freq)) cutoff=self.min_cut + self.beta*abs(edx) out=self.x.filt(x, alpha_from_cutoff(cutoff, self.freq)) self.last=out; return out class LandmarkSmootherForMetrics: def __init__(self,n_points=33,freq=30.0): self.fx=[OneEuro(freq,1.3,0.25,1.0) for _ in range(n_points)] self.fy=[OneEuro(freq,1.3,0.25,1.0) for _ in range(n_points)] self.fv=[OneEuro(freq,1.5,0.05,1.0) for _ in range(n_points)] def update(self, raw): arr=np.array([[p.x,p.y,p.visibility] for p in raw],dtype=np.float32) class P: pass out=[] for i,(x,y,v) in enumerate(arr): p=P(); p.x=float(self.fx[i].filt(x)); p.y=float(self.fy[i].filt(y)) p.z=0.0; p.visibility=float(self.fv[i].filt(v)); out.append(p) return out class HeadStabilizer: def __init__(self, n_points=33, freq=30.0): self.fx=[OneEuro(freq,1.0,0.15,1.0) for _ in range(n_points)] self.fy=[OneEuro(freq,1.0,0.15,1.0) for _ in range(n_points)] def override(self, raw, use_ids): out={} for i in use_ids: rx,ry = raw[i].x, raw[i].y sx = float(self.fx[i].filt(rx)) sy = float(self.fy[i].filt(ry)) out[i] = (0.7*rx + 0.3*sx, 0.7*ry + 0.3*sy) return out class LandmarkMemory: def __init__(self, hold_frames=2): self.last=None; self.hold=0; self.hold_frames=hold_frames def put(self,lm): self.last=lm; self.hold=self.hold_frames def get(self): if self.hold>0: self.hold-=1; return self.last return None # -------- Video writer -------- def make_writer(base_no_ext, fps, size): W,H=size path=f"{base_no_ext}.mp4" four=cv2.VideoWriter_fourcc(*'mp4v') vw=cv2.VideoWriter(path,four,fps,(W,H)) if vw.isOpened(): return vw,path vw.release() path=f"{base_no_ext}.avi" four=cv2.VideoWriter_fourcc(*'MJPG') vw=cv2.VideoWriter(path,four,fps,(W,H)) if vw.isOpened(): return vw,path raise RuntimeError("VideoWriter failed for both MP4 and AVI") # -------- Simple coach report -------- def compose_simple_report(meta, m): H=[] H.append("Running Form Report") H.append("") H.append(f"

Running Form Report

Created: {meta['timestamp']} • Confidence: {m['conf_word']}
") H.append("

1) Introduction

") H.append("

2) What we saw

") H.append("PostureArms") H.append("Legs & StrideBalance") fixes=[] if m['stride_flag']=='Overstride': fixes.append(("Overstriding","Land closer under the hips to reduce braking.")) if m['stride_flag']=='Short': fixes.append(("Short steps","Push the ground back and let the leg travel behind.")) if m['fwd_lean_pct'] and m['fwd_lean_pct']>20: fixes.append(("Big forward lean","Stand tall; hinge gently from the ankles, not the waist.")) if m['sym_knee_flag']=='Uneven' or m['sym_hip_flag']=='Uneven': fixes.append(("Left–right mismatch","Aim for even steps and level hips.")) if not fixes: fixes=[("Keep it steady","You’re on track—focus on relaxed rhythm and tall posture.")] H.append("

3) Top fixes

") cues=["Tall and relaxed","Land under hips","Push the ground back","Quick, light steps"] H.append("

4) Cues to remember

") H.append("

5) Do these each week

") H.append("Drills (2–3×/week)Strength & mobility (2–3×/week)") H.append("

6) Simple week plan

") H.append("

7) Keep going

") H.append("
Educational guidance only; not medical advice.
") return "\n".join(H) # -------- Core processing -------- def process_video(video_path, out_prefix, units_metric, height_val, weight_val, slow_factor): # height to meters if units_metric: height_m=float(height_val)/100.0 else: height_m=float(height_val)*2.54/100.0 cap=cv2.VideoCapture(video_path) if not cap.isOpened(): raise RuntimeError(f"Could not open video: {video_path}") total_frames=int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0 fps=cap.get(cv2.CAP_PROP_FPS) or 30.0 W=int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); H=int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) norm_writer, norm_path = make_writer(f"{out_prefix}_normal", fps, (W,H)) slow_fps=max(1.0, fps/max(1e-6, slow_factor)) slow_writer, slow_path = make_writer(f"{out_prefix}_slow", slow_fps, (W,H)) timestamp=datetime.now().strftime("%Y-%m-%d %H:%M") # rolling windows (for stable labels & confidence) win = max(12, int(fps*LABEL_WINDOW_SEC)) label_hist = {k: deque(maxlen=win) for k in ["Elbow_R","Elbow_L","Knee_R","Knee_L","Hip_R","Hip_L"]} conf_hist = deque(maxlen=win) stride_hist= deque(maxlen=win) ratio_hist = deque(maxlen=win) fwd_flags = [] smoother = LandmarkSmootherForMetrics(n_points=33, freq=fps) headstab = HeadStabilizer(n_points=33, freq=fps) memory = LandmarkMemory(hold_frames=max(1,int(fps*MIN_DROPOUT_HOLD_SEC))) pose = mp_pose.Pose(static_image_mode=False, model_complexity=2, smooth_landmarks=False, min_detection_confidence=0.6, min_tracking_confidence=0.6) frame_idx=0; start=time.time() try: while True: ok,frame=cap.read() if not ok: break frame_idx+=1 rgb=cv2.cvtColor(frame,cv2.COLOR_BGR2RGB) res=pose.process(rgb) lm_raw=None if res.pose_landmarks: lm_raw = res.pose_landmarks.landmark memory.put(lm_raw) else: lm_raw = memory.get() annotated=frame.copy() top_right_text=None if lm_raw: lm_smooth = smoother.update(lm_raw) def P_raw(i): return [lm_raw[i].x*W, lm_raw[i].y*H] def V_raw(i): return lm_raw[i].visibility def P(i): return [lm_smooth[i].x*W, lm_smooth[i].y*H] def V(i): return lm_smooth[i].visibility idxs=[mp_pose.PoseLandmark.RIGHT_SHOULDER.value, mp_pose.PoseLandmark.RIGHT_ELBOW.value, mp_pose.PoseLandmark.RIGHT_WRIST.value, mp_pose.PoseLandmark.LEFT_SHOULDER.value, mp_pose.PoseLandmark.LEFT_ELBOW.value, mp_pose.PoseLandmark.LEFT_WRIST.value, mp_pose.PoseLandmark.RIGHT_HIP.value, mp_pose.PoseLandmark.RIGHT_KNEE.value, mp_pose.PoseLandmark.RIGHT_ANKLE.value, mp_pose.PoseLandmark.LEFT_HIP.value, mp_pose.PoseLandmark.LEFT_KNEE.value, mp_pose.PoseLandmark.LEFT_ANKLE.value] conf_frame=float(np.mean([V_raw(i) for i in idxs])) conf_hist.append(conf_frame) conf_avg=float(np.mean(conf_hist)) top_right_text=f"Confidence: {confidence_word(conf_avg)}" R_SH,R_EL,R_WR = mp_pose.PoseLandmark.RIGHT_SHOULDER.value, mp_pose.PoseLandmark.RIGHT_ELBOW.value, mp_pose.PoseLandmark.RIGHT_WRIST.value L_SH,L_EL,L_WR = mp_pose.PoseLandmark.LEFT_SHOULDER.value, mp_pose.PoseLandmark.LEFT_ELBOW.value, mp_pose.PoseLandmark.LEFT_WRIST.value R_HP,R_KN,R_AN = mp_pose.PoseLandmark.RIGHT_HIP.value, mp_pose.PoseLandmark.RIGHT_KNEE.value, mp_pose.PoseLandmark.RIGHT_ANKLE.value L_HP,L_KN,L_AN = mp_pose.PoseLandmark.LEFT_HIP.value, mp_pose.PoseLandmark.LEFT_KNEE.value, mp_pose.PoseLandmark.LEFT_ANKLE.value def label_for(A,B,C): if min(V(A),V(B),V(C)) < VIS_THRESH: return None ang=calculate_angle(P(A),P(B),P(C)) lbl,_=sev_label(ang); return lbl frame_labels={ "Elbow_R": label_for(R_SH,R_EL,R_WR), "Elbow_L": label_for(L_SH,L_EL,L_WR), "Knee_R": label_for(R_HP,R_KN,R_AN), "Knee_L": label_for(L_HP,L_KN,L_AN), "Hip_R": label_for(R_SH,R_HP,R_KN), "Hip_L": label_for(L_SH,L_HP,L_KN) } for k,v in frame_labels.items(): if v: label_hist[k].append(v) # ---- STRIDE (per-frame estimate for overlay) ---- r_sh_raw, r_an_raw, l_an_raw = P_raw(R_SH), P_raw(R_AN), P_raw(L_AN) body_px = np.linalg.norm(np.array(r_sh_raw)-np.array(r_an_raw)) stride_m=0.0 if body_px>1e-6: px_per_m = body_px / max(1e-6, height_m) r_an, l_an = P(R_AN), P(L_AN) ankle_dist_px=float(np.linalg.norm(np.array(r_an)-np.array(l_an))) stride_m=ankle_dist_px/px_per_m stride_hist.append(max(0.0,stride_m)) ratio_hist.append((stride_m/height_m) if height_m>1e-6 else 0.0) # forward-lean % sh_mid = ((lm_smooth[R_SH].x + lm_smooth[L_SH].x)/2.0, (lm_smooth[R_SH].y + lm_smooth[L_SH].y)/2.0) hp_mid = ((lm_smooth[R_HP].x + lm_smooth[L_HP].x)/2.0, (lm_smooth[R_HP].y + lm_smooth[L_HP].y)/2.0) dx = (sh_mid[0]-hp_mid[0]); dy = (sh_mid[1]-hp_mid[1]) angle_from_vertical = abs(math.degrees(math.atan2(abs(dx), abs(dy)+1e-6))) fwd_flags.append(1 if angle_from_vertical>15 else 0) # ==== DRAWING ==== head_override = headstab.override(lm_raw, HEAD_IDS) draw_skeleton(annotated, lm_raw, W, H, head_override=head_override, visibility_thresh=VIS_THRESH) if (lm_raw[R_AN].visibility>VIS_THRESH) and (lm_raw[L_AN].visibility>VIS_THRESH): ra=(int(lm_raw[R_AN].x*W), int(lm_raw[R_AN].y*H)) la=(int(lm_raw[L_AN].x*W), int(lm_raw[L_AN].y*H)) cv2.line(annotated, ra, la, COLOR_CYAN, max(2, W//400), LINE) def maj(tag): dq=label_hist[tag] if not dq: return "—" c=Counter(dq); order={"Good":3,"Improve":2,"Fix":1} return sorted(c.items(), key=lambda kv:(kv[1], order.get(kv[0],0)), reverse=True)[0][0] items=[(f"Stride length: {stride_m:.2f} m (est.)", COLOR_CYAN)] for tag in ["Hip_R","Hip_L","Knee_R","Knee_L","Elbow_R","Elbow_L"]: lbl=maj(tag); col={"Good":COLOR_GREEN,"Improve":COLOR_AMBER,"Fix":COLOR_RED}.get(lbl, COLOR_BORDER) items.append((f"{tag.replace('_',' ')}: {lbl}", col)) draw_bottom_labels(annotated, items, fs=0.9) draw_box(annotated,"Real-Time (Simple)",(18,18),fs=0.9,th=2) if top_right_text: w,_=measure_box(top_right_text,fs=0.7,th=2) draw_box(annotated, top_right_text, (max(18, W-w-18), 18), fs=0.7, th=2) norm_writer.write(annotated) sm=annotated.copy() draw_box(sm,f"Slow Motion ({slow_factor}{TIMES})",(18,18),fs=0.9,th=2) slow_writer.write(sm) finally: cap.release(); norm_writer.release(); slow_writer.release() stride_avg = float(np.mean(stride_hist)) if stride_hist else 0.0 ratio_avg = float(np.mean(ratio_hist)) if ratio_hist else 0.0 if ratio_avg < 0.55: stride_flag="Short" elif ratio_avg > 1.00: stride_flag="Overstride" else: stride_flag="Good" def balance(histL, histR): mapv={"Good":0,"Improve":5,"Fix":10} l=np.mean([mapv.get(x,0) for x in histL]) if histL else 0 r=np.mean([mapv.get(x,0) for x in histR]) if histR else 0 diff=abs(r-l); flag="Even" if diff<3 else ("Slight" if diff<7 else "Uneven") return diff,flag kd,kflag=balance(label_hist["Knee_L"], label_hist["Knee_R"]) hd,hflag=balance(label_hist["Hip_L"], label_hist["Hip_R"]) conf_avg = float(np.mean(conf_hist)) if conf_hist else 0.8 fwd_pct = (100.0*sum(fwd_flags)/len(fwd_flags)) if fwd_flags else None metrics = { "stride_avg_m": stride_avg, "stride_ratio_avg": ratio_avg, "stride_flag": stride_flag, "sym_knee_diff": kd, "sym_knee_flag": kflag, "sym_hip_diff": hd, "sym_hip_flag": hflag, "fwd_lean_pct": fwd_pct, "conf_word": confidence_word(conf_avg), } html_report = compose_simple_report({"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M")}, metrics) html_path = f"{out_prefix}_report.html" with open(html_path,"w") as f: f.write(html_report) return {"normal":norm_path,"slow":slow_path,"report":html_path} def make_zip(outs, bundle_name="outputs"): files3=[outs['normal'], outs['slow'], outs['report']] files3=[p for p in files3 if os.path.exists(p) and os.path.getsize(p)>128] if not files3: return None zip_path=os.path.join(OUTPUT_DIR, f"{bundle_name}_outputs.zip") with zipfile.ZipFile(zip_path,'w',compression=zipfile.ZIP_DEFLATED) as z: for p in files3: z.write(p, arcname=os.path.basename(p)) return zip_path