running-form-analyze / analyzer.py
tylersf's picture
Update analyzer.py
83c80f5 verified
# analyzer.py
import os, time, zipfile, numpy as np, cv2, math, pathlib
from datetime import datetime
from collections import deque, Counter
import mediapipe as mp
# -------- Paths --------
OUTPUT_DIR = "/tmp/form_analysis"
os.makedirs(OUTPUT_DIR, exist_ok=True)
# -------- Styles & constants --------
mp_pose = mp.solutions.pose
FONT=cv2.FONT_HERSHEY_SIMPLEX; LINE=cv2.LINE_AA
COLOR_TEXT=(255,255,255); COLOR_BOX=(30,30,30); COLOR_BORDER=(80,80,80)
COLOR_BONE=(255,220,180); COLOR_BONE_SHADOW=(35,35,35)
COLOR_JOINT=(255,80,80); COLOR_JOINT_EDGE=(255,255,255)
COLOR_GREEN=(80,190,80); COLOR_AMBER=(80,170,255); COLOR_RED=(40,60,255)
COLOR_CYAN=(255,255,0) # ankle line + stride label border (BGR)
TIMES="x"; VIS_THRESH=0.60
LABEL_WINDOW_SEC=1.5
MIN_DROPOUT_HOLD_SEC=0.07
# -------- Drawing helpers --------
def measure_box(text, fs=0.9, th=2, pad_x=12, pad_y=10):
(w,h),_=cv2.getTextSize(text,FONT,fs,th)
return w+2*pad_x, h+2*pad_y
def draw_box(img, text, xy, border_color=COLOR_BORDER, fs=0.9, th=2, alpha=0.82):
x,y=xy; w,h=measure_box(text,fs,th)
overlay=img.copy()
cv2.rectangle(overlay,(x,y),(x+w,y+h),COLOR_BOX,-1)
cv2.addWeighted(overlay,alpha,img,1-alpha,0,img)
cv2.rectangle(img,(x,y),(x+w,y+h),border_color,2,LINE)
(tw,thh),_=cv2.getTextSize(text,FONT,fs,th)
cv2.putText(img,text,(x+12,y+10+thh),FONT,fs,COLOR_TEXT,th,LINE)
return (x,y,w,h)
def draw_bottom_labels(img, items, gap=10, margin=16, fs=0.9):
H,W=img.shape[:2]
x=margin; y=H-margin
row_h=0
rows=[[]]; widths=[margin]
for text,color in items:
w,h=measure_box(text,fs)
if widths[-1]+w+margin>W and rows[-1]:
rows.append([]); widths.append(margin)
rows[-1].append((text,color,w,h))
widths[-1]+=w+gap
row_h=max(row_h,h)
for row in rows[::-1]:
x=margin; y-=row_h+4
for text,color,w,h in row:
draw_box(img,text,(x,y),border_color=color,fs=fs)
x+=w+gap
# -------- Skeleton (raw for zero lag; slight head stabilization) --------
POSE_EDGES = list(mp_pose.POSE_CONNECTIONS)
HEAD_IDS = [
mp_pose.PoseLandmark.NOSE.value,
mp_pose.PoseLandmark.LEFT_EYE.value, mp_pose.PoseLandmark.RIGHT_EYE.value,
mp_pose.PoseLandmark.LEFT_EAR.value, mp_pose.PoseLandmark.RIGHT_EAR.value,
mp_pose.PoseLandmark.MOUTH_LEFT.value, mp_pose.PoseLandmark.MOUTH_RIGHT.value
]
def draw_skeleton(img, lm_raw, W, H, head_override=None, visibility_thresh=0.5):
t_line = max(2, W//360); t_shadow = t_line+2
r_joint = max(3, W//190); r_edge = r_joint+2
pts={}
for i,p in enumerate(lm_raw):
if p.visibility<visibility_thresh: continue
x = int((head_override.get(i)[0] if (head_override and i in head_override) else p.x)*W)
y = int((head_override.get(i)[1] if (head_override and i in head_override) else p.y)*H)
pts[i]=(x,y)
for a,b in POSE_EDGES:
if a in pts and b in pts:
ax,ay=pts[a]; bx,by=pts[b]
cv2.line(img,(ax,ay),(bx,by),COLOR_BONE_SHADOW,t_shadow,LINE)
cv2.line(img,(ax,ay),(bx,by),COLOR_BONE,t_line,LINE)
for i,(x,y) in pts.items():
cv2.circle(img,(x,y),r_edge,COLOR_JOINT_EDGE,-1,LINE)
cv2.circle(img,(x,y),r_joint,COLOR_JOINT,-1,LINE)
# -------- Math & smoothing --------
def calculate_angle(a,b,c):
a,b,c=np.array(a,float),np.array(b,float),np.array(c,float)
ang=abs(np.degrees(np.arctan2(c[1]-b[1],c[0]-b[0])-np.arctan2(a[1]-b[1],a[0]-b[0])))
return int(round(360-ang if ang>180 else ang))
def sev_label(angle):
if angle<30 or angle>160: return "Fix",COLOR_RED
if angle<50 or angle>140: return "Improve",COLOR_AMBER
return "Good",COLOR_GREEN
def confidence_word(avgv): return "High" if avgv>=0.85 else ("Medium" if avgv>=0.7 else "Low")
class LowPass:
def __init__(self): self.y=None
def filt(self,x,a): self.y=x if self.y is None else (a*x+(1-a)*self.y); return self.y
def alpha_from_cutoff(cutoff,freq):
if cutoff<=0: return 1.0
tau=1.0/(2*np.pi*cutoff); return 1.0/(1.0+tau*freq)
class OneEuro:
def __init__(self,freq,min_cutoff=1.3,beta=0.25,d_cutoff=1.0):
self.freq=freq; self.min_cut=min_cutoff; self.beta=beta; self.d_cut=d_cutoff
self.dx=LowPass(); self.x=LowPass(); self.last=None
def filt(self,x):
if self.last is None: self.last=x
dx=(x-self.last)*self.freq
edx=self.dx.filt(dx, alpha_from_cutoff(self.d_cut, self.freq))
cutoff=self.min_cut + self.beta*abs(edx)
out=self.x.filt(x, alpha_from_cutoff(cutoff, self.freq))
self.last=out; return out
class LandmarkSmootherForMetrics:
def __init__(self,n_points=33,freq=30.0):
self.fx=[OneEuro(freq,1.3,0.25,1.0) for _ in range(n_points)]
self.fy=[OneEuro(freq,1.3,0.25,1.0) for _ in range(n_points)]
self.fv=[OneEuro(freq,1.5,0.05,1.0) for _ in range(n_points)]
def update(self, raw):
arr=np.array([[p.x,p.y,p.visibility] for p in raw],dtype=np.float32)
class P: pass
out=[]
for i,(x,y,v) in enumerate(arr):
p=P(); p.x=float(self.fx[i].filt(x)); p.y=float(self.fy[i].filt(y))
p.z=0.0; p.visibility=float(self.fv[i].filt(v)); out.append(p)
return out
class HeadStabilizer:
def __init__(self, n_points=33, freq=30.0):
self.fx=[OneEuro(freq,1.0,0.15,1.0) for _ in range(n_points)]
self.fy=[OneEuro(freq,1.0,0.15,1.0) for _ in range(n_points)]
def override(self, raw, use_ids):
out={}
for i in use_ids:
rx,ry = raw[i].x, raw[i].y
sx = float(self.fx[i].filt(rx))
sy = float(self.fy[i].filt(ry))
out[i] = (0.7*rx + 0.3*sx, 0.7*ry + 0.3*sy)
return out
class LandmarkMemory:
def __init__(self, hold_frames=2): self.last=None; self.hold=0; self.hold_frames=hold_frames
def put(self,lm): self.last=lm; self.hold=self.hold_frames
def get(self):
if self.hold>0: self.hold-=1; return self.last
return None
# -------- Video writer --------
def make_writer(base_no_ext, fps, size):
W,H=size
path=f"{base_no_ext}.mp4"
four=cv2.VideoWriter_fourcc(*'mp4v')
vw=cv2.VideoWriter(path,four,fps,(W,H))
if vw.isOpened(): return vw,path
vw.release()
path=f"{base_no_ext}.avi"
four=cv2.VideoWriter_fourcc(*'MJPG')
vw=cv2.VideoWriter(path,four,fps,(W,H))
if vw.isOpened(): return vw,path
raise RuntimeError("VideoWriter failed for both MP4 and AVI")
# -------- Simple coach report --------
def compose_simple_report(meta, m):
H=[]
H.append("<html><head><meta charset='utf-8'><title>Running Form Report</title>")
H.append("<style>body{font-family:Arial,Helvetica,sans-serif;line-height:1.5;} h2,h3{margin:.5em 0;} ul{margin:.3em 0 .8em 1.2em;} li{margin:.25em 0;} .small{color:#666;font-size:12px}</style></head><body>")
H.append(f"<h2>Running Form Report</h2><div class='small'>Created: {meta['timestamp']} • Confidence: {m['conf_word']}</div>")
H.append("<h3>1) Introduction</h3><ul>")
H.append("<li>Nice work getting this filmed — great base to build from.</li>")
if m['stride_flag']=='Good': H.append("<li>Your step length looks natural for your height.</li>")
H.append("<li>Overall rhythm and posture are coming along.</li>")
H.append("</ul>")
H.append("<h3>2) What we saw</h3>")
H.append("<b>Posture</b><ul>")
if m['fwd_lean_pct'] is not None:
H.append(f"<li>Forward lean in ~{int(round(m['fwd_lean_pct']))}% of frames. Keep tall and hinge gently from the ankles.</li>")
else:
H.append("<li>Posture looked steady. Stay tall and relaxed.</li>")
H.append("</ul><b>Arms</b><ul><li>Hands by pockets, swing straight forward/back. Relax the shoulders.</li></ul>")
H.append("<b>Legs & Stride</b><ul>")
H.append(f"<li>Average stride length: {m['stride_avg_m']:.2f} m (~{int(round(m['stride_ratio_avg']*100))}% of height). Overall: <b>{m['stride_flag']}</b>.</li>")
H.append("</ul><b>Balance</b><ul>")
H.append(f"<li>Knees L/R: {m['sym_knee_flag']} (diff ~{m['sym_knee_diff']:.1f}°). Hips L/R: {m['sym_hip_flag']} (diff ~{m['sym_hip_diff']:.1f}°).</li>")
H.append("</ul>")
fixes=[]
if m['stride_flag']=='Overstride': fixes.append(("Overstriding","Land closer under the hips to reduce braking."))
if m['stride_flag']=='Short': fixes.append(("Short steps","Push the ground back and let the leg travel behind."))
if m['fwd_lean_pct'] and m['fwd_lean_pct']>20: fixes.append(("Big forward lean","Stand tall; hinge gently from the ankles, not the waist."))
if m['sym_knee_flag']=='Uneven' or m['sym_hip_flag']=='Uneven': fixes.append(("Left–right mismatch","Aim for even steps and level hips."))
if not fixes: fixes=[("Keep it steady","You’re on track—focus on relaxed rhythm and tall posture.")]
H.append("<h3>3) Top fixes</h3><ul>")
for name,why in fixes[:3]: H.append(f"<li><b>{name}:</b> {why}</li>")
H.append("</ul>")
cues=["Tall and relaxed","Land under hips","Push the ground back","Quick, light steps"]
H.append("<h3>4) Cues to remember</h3><ul>"); [H.append(f"<li>{c}</li>") for c in cues[:4]]; H.append("</ul>")
H.append("<h3>5) Do these each week</h3>")
H.append("<b>Drills (2–3×/week)</b><ul>")
H.append("<li>A-Skips — 2×20m</li><li>High-knee march — 2×20m</li><li>Ankling — 2×20m</li><li>4–6 relaxed strides (60–80m)</li>")
H.append("</ul><b>Strength & mobility (2–3×/week)</b><ul>")
H.append("<li>Glute bridge — 3×12</li><li>Split squat — 3×8/side</li><li>Core (dead bug or plank) — 3×30–40s</li><li>Calf raises — 3×12</li>")
H.append("</ul>")
H.append("<h3>6) Simple week plan</h3><ul>")
H.append("<li><b>Mon:</b> Easy run 20–30 min + A-Skips</li>")
H.append("<li><b>Wed:</b> Easy run 20–30 min + drills</li>")
H.append("<li><b>Fri:</b> Easy run 20–30 min + 4×60–80 m strides</li>")
H.append("<li><b>Tue/Thu/Sat:</b> Strength or mobility 25–35 min</li>")
H.append("<li><b>Sun:</b> Rest or easy walk</li></ul>")
H.append("<h3>7) Keep going</h3><ul><li>Small tweaks add up. Stay relaxed, tall, and consistent — you’ve got this!</li></ul>")
H.append("<div class='small'>Educational guidance only; not medical advice.</div></body></html>")
return "\n".join(H)
# -------- Core processing --------
def process_video(video_path, out_prefix, units_metric, height_val, weight_val, slow_factor):
# height to meters
if units_metric: height_m=float(height_val)/100.0
else: height_m=float(height_val)*2.54/100.0
cap=cv2.VideoCapture(video_path)
if not cap.isOpened(): raise RuntimeError(f"Could not open video: {video_path}")
total_frames=int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
fps=cap.get(cv2.CAP_PROP_FPS) or 30.0
W=int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); H=int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
norm_writer, norm_path = make_writer(f"{out_prefix}_normal", fps, (W,H))
slow_fps=max(1.0, fps/max(1e-6, slow_factor))
slow_writer, slow_path = make_writer(f"{out_prefix}_slow", slow_fps, (W,H))
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M")
# rolling windows (for stable labels & confidence)
win = max(12, int(fps*LABEL_WINDOW_SEC))
label_hist = {k: deque(maxlen=win) for k in ["Elbow_R","Elbow_L","Knee_R","Knee_L","Hip_R","Hip_L"]}
conf_hist = deque(maxlen=win)
stride_hist= deque(maxlen=win)
ratio_hist = deque(maxlen=win)
fwd_flags = []
smoother = LandmarkSmootherForMetrics(n_points=33, freq=fps)
headstab = HeadStabilizer(n_points=33, freq=fps)
memory = LandmarkMemory(hold_frames=max(1,int(fps*MIN_DROPOUT_HOLD_SEC)))
pose = mp_pose.Pose(static_image_mode=False, model_complexity=2, smooth_landmarks=False,
min_detection_confidence=0.6, min_tracking_confidence=0.6)
frame_idx=0; start=time.time()
try:
while True:
ok,frame=cap.read()
if not ok: break
frame_idx+=1
rgb=cv2.cvtColor(frame,cv2.COLOR_BGR2RGB)
res=pose.process(rgb)
lm_raw=None
if res.pose_landmarks:
lm_raw = res.pose_landmarks.landmark
memory.put(lm_raw)
else:
lm_raw = memory.get()
annotated=frame.copy()
top_right_text=None
if lm_raw:
lm_smooth = smoother.update(lm_raw)
def P_raw(i): return [lm_raw[i].x*W, lm_raw[i].y*H]
def V_raw(i): return lm_raw[i].visibility
def P(i): return [lm_smooth[i].x*W, lm_smooth[i].y*H]
def V(i): return lm_smooth[i].visibility
idxs=[mp_pose.PoseLandmark.RIGHT_SHOULDER.value, mp_pose.PoseLandmark.RIGHT_ELBOW.value,
mp_pose.PoseLandmark.RIGHT_WRIST.value, mp_pose.PoseLandmark.LEFT_SHOULDER.value,
mp_pose.PoseLandmark.LEFT_ELBOW.value, mp_pose.PoseLandmark.LEFT_WRIST.value,
mp_pose.PoseLandmark.RIGHT_HIP.value, mp_pose.PoseLandmark.RIGHT_KNEE.value,
mp_pose.PoseLandmark.RIGHT_ANKLE.value, mp_pose.PoseLandmark.LEFT_HIP.value,
mp_pose.PoseLandmark.LEFT_KNEE.value, mp_pose.PoseLandmark.LEFT_ANKLE.value]
conf_frame=float(np.mean([V_raw(i) for i in idxs]))
conf_hist.append(conf_frame)
conf_avg=float(np.mean(conf_hist))
top_right_text=f"Confidence: {confidence_word(conf_avg)}"
R_SH,R_EL,R_WR = mp_pose.PoseLandmark.RIGHT_SHOULDER.value, mp_pose.PoseLandmark.RIGHT_ELBOW.value, mp_pose.PoseLandmark.RIGHT_WRIST.value
L_SH,L_EL,L_WR = mp_pose.PoseLandmark.LEFT_SHOULDER.value, mp_pose.PoseLandmark.LEFT_ELBOW.value, mp_pose.PoseLandmark.LEFT_WRIST.value
R_HP,R_KN,R_AN = mp_pose.PoseLandmark.RIGHT_HIP.value, mp_pose.PoseLandmark.RIGHT_KNEE.value, mp_pose.PoseLandmark.RIGHT_ANKLE.value
L_HP,L_KN,L_AN = mp_pose.PoseLandmark.LEFT_HIP.value, mp_pose.PoseLandmark.LEFT_KNEE.value, mp_pose.PoseLandmark.LEFT_ANKLE.value
def label_for(A,B,C):
if min(V(A),V(B),V(C)) < VIS_THRESH: return None
ang=calculate_angle(P(A),P(B),P(C))
lbl,_=sev_label(ang); return lbl
frame_labels={
"Elbow_R": label_for(R_SH,R_EL,R_WR),
"Elbow_L": label_for(L_SH,L_EL,L_WR),
"Knee_R": label_for(R_HP,R_KN,R_AN),
"Knee_L": label_for(L_HP,L_KN,L_AN),
"Hip_R": label_for(R_SH,R_HP,R_KN),
"Hip_L": label_for(L_SH,L_HP,L_KN)
}
for k,v in frame_labels.items():
if v: label_hist[k].append(v)
# ---- STRIDE (per-frame estimate for overlay) ----
r_sh_raw, r_an_raw, l_an_raw = P_raw(R_SH), P_raw(R_AN), P_raw(L_AN)
body_px = np.linalg.norm(np.array(r_sh_raw)-np.array(r_an_raw))
stride_m=0.0
if body_px>1e-6:
px_per_m = body_px / max(1e-6, height_m)
r_an, l_an = P(R_AN), P(L_AN)
ankle_dist_px=float(np.linalg.norm(np.array(r_an)-np.array(l_an)))
stride_m=ankle_dist_px/px_per_m
stride_hist.append(max(0.0,stride_m))
ratio_hist.append((stride_m/height_m) if height_m>1e-6 else 0.0)
# forward-lean %
sh_mid = ((lm_smooth[R_SH].x + lm_smooth[L_SH].x)/2.0,
(lm_smooth[R_SH].y + lm_smooth[L_SH].y)/2.0)
hp_mid = ((lm_smooth[R_HP].x + lm_smooth[L_HP].x)/2.0,
(lm_smooth[R_HP].y + lm_smooth[L_HP].y)/2.0)
dx = (sh_mid[0]-hp_mid[0]); dy = (sh_mid[1]-hp_mid[1])
angle_from_vertical = abs(math.degrees(math.atan2(abs(dx), abs(dy)+1e-6)))
fwd_flags.append(1 if angle_from_vertical>15 else 0)
# ==== DRAWING ====
head_override = headstab.override(lm_raw, HEAD_IDS)
draw_skeleton(annotated, lm_raw, W, H, head_override=head_override, visibility_thresh=VIS_THRESH)
if (lm_raw[R_AN].visibility>VIS_THRESH) and (lm_raw[L_AN].visibility>VIS_THRESH):
ra=(int(lm_raw[R_AN].x*W), int(lm_raw[R_AN].y*H))
la=(int(lm_raw[L_AN].x*W), int(lm_raw[L_AN].y*H))
cv2.line(annotated, ra, la, COLOR_CYAN, max(2, W//400), LINE)
def maj(tag):
dq=label_hist[tag]
if not dq: return "—"
c=Counter(dq); order={"Good":3,"Improve":2,"Fix":1}
return sorted(c.items(), key=lambda kv:(kv[1], order.get(kv[0],0)), reverse=True)[0][0]
items=[(f"Stride length: {stride_m:.2f} m (est.)", COLOR_CYAN)]
for tag in ["Hip_R","Hip_L","Knee_R","Knee_L","Elbow_R","Elbow_L"]:
lbl=maj(tag); col={"Good":COLOR_GREEN,"Improve":COLOR_AMBER,"Fix":COLOR_RED}.get(lbl, COLOR_BORDER)
items.append((f"{tag.replace('_',' ')}: {lbl}", col))
draw_bottom_labels(annotated, items, fs=0.9)
draw_box(annotated,"Real-Time (Simple)",(18,18),fs=0.9,th=2)
if top_right_text:
w,_=measure_box(top_right_text,fs=0.7,th=2)
draw_box(annotated, top_right_text, (max(18, W-w-18), 18), fs=0.7, th=2)
norm_writer.write(annotated)
sm=annotated.copy()
draw_box(sm,f"Slow Motion ({slow_factor}{TIMES})",(18,18),fs=0.9,th=2)
slow_writer.write(sm)
finally:
cap.release(); norm_writer.release(); slow_writer.release()
stride_avg = float(np.mean(stride_hist)) if stride_hist else 0.0
ratio_avg = float(np.mean(ratio_hist)) if ratio_hist else 0.0
if ratio_avg < 0.55: stride_flag="Short"
elif ratio_avg > 1.00: stride_flag="Overstride"
else: stride_flag="Good"
def balance(histL, histR):
mapv={"Good":0,"Improve":5,"Fix":10}
l=np.mean([mapv.get(x,0) for x in histL]) if histL else 0
r=np.mean([mapv.get(x,0) for x in histR]) if histR else 0
diff=abs(r-l); flag="Even" if diff<3 else ("Slight" if diff<7 else "Uneven")
return diff,flag
kd,kflag=balance(label_hist["Knee_L"], label_hist["Knee_R"])
hd,hflag=balance(label_hist["Hip_L"], label_hist["Hip_R"])
conf_avg = float(np.mean(conf_hist)) if conf_hist else 0.8
fwd_pct = (100.0*sum(fwd_flags)/len(fwd_flags)) if fwd_flags else None
metrics = {
"stride_avg_m": stride_avg,
"stride_ratio_avg": ratio_avg,
"stride_flag": stride_flag,
"sym_knee_diff": kd, "sym_knee_flag": kflag,
"sym_hip_diff": hd, "sym_hip_flag": hflag,
"fwd_lean_pct": fwd_pct,
"conf_word": confidence_word(conf_avg),
}
html_report = compose_simple_report({"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M")}, metrics)
html_path = f"{out_prefix}_report.html"
with open(html_path,"w") as f: f.write(html_report)
return {"normal":norm_path,"slow":slow_path,"report":html_path}
def make_zip(outs, bundle_name="outputs"):
files3=[outs['normal'], outs['slow'], outs['report']]
files3=[p for p in files3 if os.path.exists(p) and os.path.getsize(p)>128]
if not files3:
return None
zip_path=os.path.join(OUTPUT_DIR, f"{bundle_name}_outputs.zip")
with zipfile.ZipFile(zip_path,'w',compression=zipfile.ZIP_DEFLATED) as z:
for p in files3: z.write(p, arcname=os.path.basename(p))
return zip_path