Spaces:
Running
Running
# analyzer.py | |
import os, time, zipfile, numpy as np, cv2, math, pathlib | |
from datetime import datetime | |
from collections import deque, Counter | |
import mediapipe as mp | |
# -------- Paths -------- | |
OUTPUT_DIR = "/tmp/form_analysis" | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
# -------- Styles & constants -------- | |
mp_pose = mp.solutions.pose | |
FONT=cv2.FONT_HERSHEY_SIMPLEX; LINE=cv2.LINE_AA | |
COLOR_TEXT=(255,255,255); COLOR_BOX=(30,30,30); COLOR_BORDER=(80,80,80) | |
COLOR_BONE=(255,220,180); COLOR_BONE_SHADOW=(35,35,35) | |
COLOR_JOINT=(255,80,80); COLOR_JOINT_EDGE=(255,255,255) | |
COLOR_GREEN=(80,190,80); COLOR_AMBER=(80,170,255); COLOR_RED=(40,60,255) | |
COLOR_CYAN=(255,255,0) # ankle line + stride label border (BGR) | |
TIMES="x"; VIS_THRESH=0.60 | |
LABEL_WINDOW_SEC=1.5 | |
MIN_DROPOUT_HOLD_SEC=0.07 | |
# -------- Drawing helpers -------- | |
def measure_box(text, fs=0.9, th=2, pad_x=12, pad_y=10): | |
(w,h),_=cv2.getTextSize(text,FONT,fs,th) | |
return w+2*pad_x, h+2*pad_y | |
def draw_box(img, text, xy, border_color=COLOR_BORDER, fs=0.9, th=2, alpha=0.82): | |
x,y=xy; w,h=measure_box(text,fs,th) | |
overlay=img.copy() | |
cv2.rectangle(overlay,(x,y),(x+w,y+h),COLOR_BOX,-1) | |
cv2.addWeighted(overlay,alpha,img,1-alpha,0,img) | |
cv2.rectangle(img,(x,y),(x+w,y+h),border_color,2,LINE) | |
(tw,thh),_=cv2.getTextSize(text,FONT,fs,th) | |
cv2.putText(img,text,(x+12,y+10+thh),FONT,fs,COLOR_TEXT,th,LINE) | |
return (x,y,w,h) | |
def draw_bottom_labels(img, items, gap=10, margin=16, fs=0.9): | |
H,W=img.shape[:2] | |
x=margin; y=H-margin | |
row_h=0 | |
rows=[[]]; widths=[margin] | |
for text,color in items: | |
w,h=measure_box(text,fs) | |
if widths[-1]+w+margin>W and rows[-1]: | |
rows.append([]); widths.append(margin) | |
rows[-1].append((text,color,w,h)) | |
widths[-1]+=w+gap | |
row_h=max(row_h,h) | |
for row in rows[::-1]: | |
x=margin; y-=row_h+4 | |
for text,color,w,h in row: | |
draw_box(img,text,(x,y),border_color=color,fs=fs) | |
x+=w+gap | |
# -------- Skeleton (raw for zero lag; slight head stabilization) -------- | |
POSE_EDGES = list(mp_pose.POSE_CONNECTIONS) | |
HEAD_IDS = [ | |
mp_pose.PoseLandmark.NOSE.value, | |
mp_pose.PoseLandmark.LEFT_EYE.value, mp_pose.PoseLandmark.RIGHT_EYE.value, | |
mp_pose.PoseLandmark.LEFT_EAR.value, mp_pose.PoseLandmark.RIGHT_EAR.value, | |
mp_pose.PoseLandmark.MOUTH_LEFT.value, mp_pose.PoseLandmark.MOUTH_RIGHT.value | |
] | |
def draw_skeleton(img, lm_raw, W, H, head_override=None, visibility_thresh=0.5): | |
t_line = max(2, W//360); t_shadow = t_line+2 | |
r_joint = max(3, W//190); r_edge = r_joint+2 | |
pts={} | |
for i,p in enumerate(lm_raw): | |
if p.visibility<visibility_thresh: continue | |
x = int((head_override.get(i)[0] if (head_override and i in head_override) else p.x)*W) | |
y = int((head_override.get(i)[1] if (head_override and i in head_override) else p.y)*H) | |
pts[i]=(x,y) | |
for a,b in POSE_EDGES: | |
if a in pts and b in pts: | |
ax,ay=pts[a]; bx,by=pts[b] | |
cv2.line(img,(ax,ay),(bx,by),COLOR_BONE_SHADOW,t_shadow,LINE) | |
cv2.line(img,(ax,ay),(bx,by),COLOR_BONE,t_line,LINE) | |
for i,(x,y) in pts.items(): | |
cv2.circle(img,(x,y),r_edge,COLOR_JOINT_EDGE,-1,LINE) | |
cv2.circle(img,(x,y),r_joint,COLOR_JOINT,-1,LINE) | |
# -------- Math & smoothing -------- | |
def calculate_angle(a,b,c): | |
a,b,c=np.array(a,float),np.array(b,float),np.array(c,float) | |
ang=abs(np.degrees(np.arctan2(c[1]-b[1],c[0]-b[0])-np.arctan2(a[1]-b[1],a[0]-b[0]))) | |
return int(round(360-ang if ang>180 else ang)) | |
def sev_label(angle): | |
if angle<30 or angle>160: return "Fix",COLOR_RED | |
if angle<50 or angle>140: return "Improve",COLOR_AMBER | |
return "Good",COLOR_GREEN | |
def confidence_word(avgv): return "High" if avgv>=0.85 else ("Medium" if avgv>=0.7 else "Low") | |
class LowPass: | |
def __init__(self): self.y=None | |
def filt(self,x,a): self.y=x if self.y is None else (a*x+(1-a)*self.y); return self.y | |
def alpha_from_cutoff(cutoff,freq): | |
if cutoff<=0: return 1.0 | |
tau=1.0/(2*np.pi*cutoff); return 1.0/(1.0+tau*freq) | |
class OneEuro: | |
def __init__(self,freq,min_cutoff=1.3,beta=0.25,d_cutoff=1.0): | |
self.freq=freq; self.min_cut=min_cutoff; self.beta=beta; self.d_cut=d_cutoff | |
self.dx=LowPass(); self.x=LowPass(); self.last=None | |
def filt(self,x): | |
if self.last is None: self.last=x | |
dx=(x-self.last)*self.freq | |
edx=self.dx.filt(dx, alpha_from_cutoff(self.d_cut, self.freq)) | |
cutoff=self.min_cut + self.beta*abs(edx) | |
out=self.x.filt(x, alpha_from_cutoff(cutoff, self.freq)) | |
self.last=out; return out | |
class LandmarkSmootherForMetrics: | |
def __init__(self,n_points=33,freq=30.0): | |
self.fx=[OneEuro(freq,1.3,0.25,1.0) for _ in range(n_points)] | |
self.fy=[OneEuro(freq,1.3,0.25,1.0) for _ in range(n_points)] | |
self.fv=[OneEuro(freq,1.5,0.05,1.0) for _ in range(n_points)] | |
def update(self, raw): | |
arr=np.array([[p.x,p.y,p.visibility] for p in raw],dtype=np.float32) | |
class P: pass | |
out=[] | |
for i,(x,y,v) in enumerate(arr): | |
p=P(); p.x=float(self.fx[i].filt(x)); p.y=float(self.fy[i].filt(y)) | |
p.z=0.0; p.visibility=float(self.fv[i].filt(v)); out.append(p) | |
return out | |
class HeadStabilizer: | |
def __init__(self, n_points=33, freq=30.0): | |
self.fx=[OneEuro(freq,1.0,0.15,1.0) for _ in range(n_points)] | |
self.fy=[OneEuro(freq,1.0,0.15,1.0) for _ in range(n_points)] | |
def override(self, raw, use_ids): | |
out={} | |
for i in use_ids: | |
rx,ry = raw[i].x, raw[i].y | |
sx = float(self.fx[i].filt(rx)) | |
sy = float(self.fy[i].filt(ry)) | |
out[i] = (0.7*rx + 0.3*sx, 0.7*ry + 0.3*sy) | |
return out | |
class LandmarkMemory: | |
def __init__(self, hold_frames=2): self.last=None; self.hold=0; self.hold_frames=hold_frames | |
def put(self,lm): self.last=lm; self.hold=self.hold_frames | |
def get(self): | |
if self.hold>0: self.hold-=1; return self.last | |
return None | |
# -------- Video writer -------- | |
def make_writer(base_no_ext, fps, size): | |
W,H=size | |
path=f"{base_no_ext}.mp4" | |
four=cv2.VideoWriter_fourcc(*'mp4v') | |
vw=cv2.VideoWriter(path,four,fps,(W,H)) | |
if vw.isOpened(): return vw,path | |
vw.release() | |
path=f"{base_no_ext}.avi" | |
four=cv2.VideoWriter_fourcc(*'MJPG') | |
vw=cv2.VideoWriter(path,four,fps,(W,H)) | |
if vw.isOpened(): return vw,path | |
raise RuntimeError("VideoWriter failed for both MP4 and AVI") | |
# -------- Simple coach report -------- | |
def compose_simple_report(meta, m): | |
H=[] | |
H.append("<html><head><meta charset='utf-8'><title>Running Form Report</title>") | |
H.append("<style>body{font-family:Arial,Helvetica,sans-serif;line-height:1.5;} h2,h3{margin:.5em 0;} ul{margin:.3em 0 .8em 1.2em;} li{margin:.25em 0;} .small{color:#666;font-size:12px}</style></head><body>") | |
H.append(f"<h2>Running Form Report</h2><div class='small'>Created: {meta['timestamp']} • Confidence: {m['conf_word']}</div>") | |
H.append("<h3>1) Introduction</h3><ul>") | |
H.append("<li>Nice work getting this filmed — great base to build from.</li>") | |
if m['stride_flag']=='Good': H.append("<li>Your step length looks natural for your height.</li>") | |
H.append("<li>Overall rhythm and posture are coming along.</li>") | |
H.append("</ul>") | |
H.append("<h3>2) What we saw</h3>") | |
H.append("<b>Posture</b><ul>") | |
if m['fwd_lean_pct'] is not None: | |
H.append(f"<li>Forward lean in ~{int(round(m['fwd_lean_pct']))}% of frames. Keep tall and hinge gently from the ankles.</li>") | |
else: | |
H.append("<li>Posture looked steady. Stay tall and relaxed.</li>") | |
H.append("</ul><b>Arms</b><ul><li>Hands by pockets, swing straight forward/back. Relax the shoulders.</li></ul>") | |
H.append("<b>Legs & Stride</b><ul>") | |
H.append(f"<li>Average stride length: {m['stride_avg_m']:.2f} m (~{int(round(m['stride_ratio_avg']*100))}% of height). Overall: <b>{m['stride_flag']}</b>.</li>") | |
H.append("</ul><b>Balance</b><ul>") | |
H.append(f"<li>Knees L/R: {m['sym_knee_flag']} (diff ~{m['sym_knee_diff']:.1f}°). Hips L/R: {m['sym_hip_flag']} (diff ~{m['sym_hip_diff']:.1f}°).</li>") | |
H.append("</ul>") | |
fixes=[] | |
if m['stride_flag']=='Overstride': fixes.append(("Overstriding","Land closer under the hips to reduce braking.")) | |
if m['stride_flag']=='Short': fixes.append(("Short steps","Push the ground back and let the leg travel behind.")) | |
if m['fwd_lean_pct'] and m['fwd_lean_pct']>20: fixes.append(("Big forward lean","Stand tall; hinge gently from the ankles, not the waist.")) | |
if m['sym_knee_flag']=='Uneven' or m['sym_hip_flag']=='Uneven': fixes.append(("Left–right mismatch","Aim for even steps and level hips.")) | |
if not fixes: fixes=[("Keep it steady","You’re on track—focus on relaxed rhythm and tall posture.")] | |
H.append("<h3>3) Top fixes</h3><ul>") | |
for name,why in fixes[:3]: H.append(f"<li><b>{name}:</b> {why}</li>") | |
H.append("</ul>") | |
cues=["Tall and relaxed","Land under hips","Push the ground back","Quick, light steps"] | |
H.append("<h3>4) Cues to remember</h3><ul>"); [H.append(f"<li>{c}</li>") for c in cues[:4]]; H.append("</ul>") | |
H.append("<h3>5) Do these each week</h3>") | |
H.append("<b>Drills (2–3×/week)</b><ul>") | |
H.append("<li>A-Skips — 2×20m</li><li>High-knee march — 2×20m</li><li>Ankling — 2×20m</li><li>4–6 relaxed strides (60–80m)</li>") | |
H.append("</ul><b>Strength & mobility (2–3×/week)</b><ul>") | |
H.append("<li>Glute bridge — 3×12</li><li>Split squat — 3×8/side</li><li>Core (dead bug or plank) — 3×30–40s</li><li>Calf raises — 3×12</li>") | |
H.append("</ul>") | |
H.append("<h3>6) Simple week plan</h3><ul>") | |
H.append("<li><b>Mon:</b> Easy run 20–30 min + A-Skips</li>") | |
H.append("<li><b>Wed:</b> Easy run 20–30 min + drills</li>") | |
H.append("<li><b>Fri:</b> Easy run 20–30 min + 4×60–80 m strides</li>") | |
H.append("<li><b>Tue/Thu/Sat:</b> Strength or mobility 25–35 min</li>") | |
H.append("<li><b>Sun:</b> Rest or easy walk</li></ul>") | |
H.append("<h3>7) Keep going</h3><ul><li>Small tweaks add up. Stay relaxed, tall, and consistent — you’ve got this!</li></ul>") | |
H.append("<div class='small'>Educational guidance only; not medical advice.</div></body></html>") | |
return "\n".join(H) | |
# -------- Core processing -------- | |
def process_video(video_path, out_prefix, units_metric, height_val, weight_val, slow_factor): | |
# height to meters | |
if units_metric: height_m=float(height_val)/100.0 | |
else: height_m=float(height_val)*2.54/100.0 | |
cap=cv2.VideoCapture(video_path) | |
if not cap.isOpened(): raise RuntimeError(f"Could not open video: {video_path}") | |
total_frames=int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0 | |
fps=cap.get(cv2.CAP_PROP_FPS) or 30.0 | |
W=int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); H=int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
norm_writer, norm_path = make_writer(f"{out_prefix}_normal", fps, (W,H)) | |
slow_fps=max(1.0, fps/max(1e-6, slow_factor)) | |
slow_writer, slow_path = make_writer(f"{out_prefix}_slow", slow_fps, (W,H)) | |
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M") | |
# rolling windows (for stable labels & confidence) | |
win = max(12, int(fps*LABEL_WINDOW_SEC)) | |
label_hist = {k: deque(maxlen=win) for k in ["Elbow_R","Elbow_L","Knee_R","Knee_L","Hip_R","Hip_L"]} | |
conf_hist = deque(maxlen=win) | |
stride_hist= deque(maxlen=win) | |
ratio_hist = deque(maxlen=win) | |
fwd_flags = [] | |
smoother = LandmarkSmootherForMetrics(n_points=33, freq=fps) | |
headstab = HeadStabilizer(n_points=33, freq=fps) | |
memory = LandmarkMemory(hold_frames=max(1,int(fps*MIN_DROPOUT_HOLD_SEC))) | |
pose = mp_pose.Pose(static_image_mode=False, model_complexity=2, smooth_landmarks=False, | |
min_detection_confidence=0.6, min_tracking_confidence=0.6) | |
frame_idx=0; start=time.time() | |
try: | |
while True: | |
ok,frame=cap.read() | |
if not ok: break | |
frame_idx+=1 | |
rgb=cv2.cvtColor(frame,cv2.COLOR_BGR2RGB) | |
res=pose.process(rgb) | |
lm_raw=None | |
if res.pose_landmarks: | |
lm_raw = res.pose_landmarks.landmark | |
memory.put(lm_raw) | |
else: | |
lm_raw = memory.get() | |
annotated=frame.copy() | |
top_right_text=None | |
if lm_raw: | |
lm_smooth = smoother.update(lm_raw) | |
def P_raw(i): return [lm_raw[i].x*W, lm_raw[i].y*H] | |
def V_raw(i): return lm_raw[i].visibility | |
def P(i): return [lm_smooth[i].x*W, lm_smooth[i].y*H] | |
def V(i): return lm_smooth[i].visibility | |
idxs=[mp_pose.PoseLandmark.RIGHT_SHOULDER.value, mp_pose.PoseLandmark.RIGHT_ELBOW.value, | |
mp_pose.PoseLandmark.RIGHT_WRIST.value, mp_pose.PoseLandmark.LEFT_SHOULDER.value, | |
mp_pose.PoseLandmark.LEFT_ELBOW.value, mp_pose.PoseLandmark.LEFT_WRIST.value, | |
mp_pose.PoseLandmark.RIGHT_HIP.value, mp_pose.PoseLandmark.RIGHT_KNEE.value, | |
mp_pose.PoseLandmark.RIGHT_ANKLE.value, mp_pose.PoseLandmark.LEFT_HIP.value, | |
mp_pose.PoseLandmark.LEFT_KNEE.value, mp_pose.PoseLandmark.LEFT_ANKLE.value] | |
conf_frame=float(np.mean([V_raw(i) for i in idxs])) | |
conf_hist.append(conf_frame) | |
conf_avg=float(np.mean(conf_hist)) | |
top_right_text=f"Confidence: {confidence_word(conf_avg)}" | |
R_SH,R_EL,R_WR = mp_pose.PoseLandmark.RIGHT_SHOULDER.value, mp_pose.PoseLandmark.RIGHT_ELBOW.value, mp_pose.PoseLandmark.RIGHT_WRIST.value | |
L_SH,L_EL,L_WR = mp_pose.PoseLandmark.LEFT_SHOULDER.value, mp_pose.PoseLandmark.LEFT_ELBOW.value, mp_pose.PoseLandmark.LEFT_WRIST.value | |
R_HP,R_KN,R_AN = mp_pose.PoseLandmark.RIGHT_HIP.value, mp_pose.PoseLandmark.RIGHT_KNEE.value, mp_pose.PoseLandmark.RIGHT_ANKLE.value | |
L_HP,L_KN,L_AN = mp_pose.PoseLandmark.LEFT_HIP.value, mp_pose.PoseLandmark.LEFT_KNEE.value, mp_pose.PoseLandmark.LEFT_ANKLE.value | |
def label_for(A,B,C): | |
if min(V(A),V(B),V(C)) < VIS_THRESH: return None | |
ang=calculate_angle(P(A),P(B),P(C)) | |
lbl,_=sev_label(ang); return lbl | |
frame_labels={ | |
"Elbow_R": label_for(R_SH,R_EL,R_WR), | |
"Elbow_L": label_for(L_SH,L_EL,L_WR), | |
"Knee_R": label_for(R_HP,R_KN,R_AN), | |
"Knee_L": label_for(L_HP,L_KN,L_AN), | |
"Hip_R": label_for(R_SH,R_HP,R_KN), | |
"Hip_L": label_for(L_SH,L_HP,L_KN) | |
} | |
for k,v in frame_labels.items(): | |
if v: label_hist[k].append(v) | |
# ---- STRIDE (per-frame estimate for overlay) ---- | |
r_sh_raw, r_an_raw, l_an_raw = P_raw(R_SH), P_raw(R_AN), P_raw(L_AN) | |
body_px = np.linalg.norm(np.array(r_sh_raw)-np.array(r_an_raw)) | |
stride_m=0.0 | |
if body_px>1e-6: | |
px_per_m = body_px / max(1e-6, height_m) | |
r_an, l_an = P(R_AN), P(L_AN) | |
ankle_dist_px=float(np.linalg.norm(np.array(r_an)-np.array(l_an))) | |
stride_m=ankle_dist_px/px_per_m | |
stride_hist.append(max(0.0,stride_m)) | |
ratio_hist.append((stride_m/height_m) if height_m>1e-6 else 0.0) | |
# forward-lean % | |
sh_mid = ((lm_smooth[R_SH].x + lm_smooth[L_SH].x)/2.0, | |
(lm_smooth[R_SH].y + lm_smooth[L_SH].y)/2.0) | |
hp_mid = ((lm_smooth[R_HP].x + lm_smooth[L_HP].x)/2.0, | |
(lm_smooth[R_HP].y + lm_smooth[L_HP].y)/2.0) | |
dx = (sh_mid[0]-hp_mid[0]); dy = (sh_mid[1]-hp_mid[1]) | |
angle_from_vertical = abs(math.degrees(math.atan2(abs(dx), abs(dy)+1e-6))) | |
fwd_flags.append(1 if angle_from_vertical>15 else 0) | |
# ==== DRAWING ==== | |
head_override = headstab.override(lm_raw, HEAD_IDS) | |
draw_skeleton(annotated, lm_raw, W, H, head_override=head_override, visibility_thresh=VIS_THRESH) | |
if (lm_raw[R_AN].visibility>VIS_THRESH) and (lm_raw[L_AN].visibility>VIS_THRESH): | |
ra=(int(lm_raw[R_AN].x*W), int(lm_raw[R_AN].y*H)) | |
la=(int(lm_raw[L_AN].x*W), int(lm_raw[L_AN].y*H)) | |
cv2.line(annotated, ra, la, COLOR_CYAN, max(2, W//400), LINE) | |
def maj(tag): | |
dq=label_hist[tag] | |
if not dq: return "—" | |
c=Counter(dq); order={"Good":3,"Improve":2,"Fix":1} | |
return sorted(c.items(), key=lambda kv:(kv[1], order.get(kv[0],0)), reverse=True)[0][0] | |
items=[(f"Stride length: {stride_m:.2f} m (est.)", COLOR_CYAN)] | |
for tag in ["Hip_R","Hip_L","Knee_R","Knee_L","Elbow_R","Elbow_L"]: | |
lbl=maj(tag); col={"Good":COLOR_GREEN,"Improve":COLOR_AMBER,"Fix":COLOR_RED}.get(lbl, COLOR_BORDER) | |
items.append((f"{tag.replace('_',' ')}: {lbl}", col)) | |
draw_bottom_labels(annotated, items, fs=0.9) | |
draw_box(annotated,"Real-Time (Simple)",(18,18),fs=0.9,th=2) | |
if top_right_text: | |
w,_=measure_box(top_right_text,fs=0.7,th=2) | |
draw_box(annotated, top_right_text, (max(18, W-w-18), 18), fs=0.7, th=2) | |
norm_writer.write(annotated) | |
sm=annotated.copy() | |
draw_box(sm,f"Slow Motion ({slow_factor}{TIMES})",(18,18),fs=0.9,th=2) | |
slow_writer.write(sm) | |
finally: | |
cap.release(); norm_writer.release(); slow_writer.release() | |
stride_avg = float(np.mean(stride_hist)) if stride_hist else 0.0 | |
ratio_avg = float(np.mean(ratio_hist)) if ratio_hist else 0.0 | |
if ratio_avg < 0.55: stride_flag="Short" | |
elif ratio_avg > 1.00: stride_flag="Overstride" | |
else: stride_flag="Good" | |
def balance(histL, histR): | |
mapv={"Good":0,"Improve":5,"Fix":10} | |
l=np.mean([mapv.get(x,0) for x in histL]) if histL else 0 | |
r=np.mean([mapv.get(x,0) for x in histR]) if histR else 0 | |
diff=abs(r-l); flag="Even" if diff<3 else ("Slight" if diff<7 else "Uneven") | |
return diff,flag | |
kd,kflag=balance(label_hist["Knee_L"], label_hist["Knee_R"]) | |
hd,hflag=balance(label_hist["Hip_L"], label_hist["Hip_R"]) | |
conf_avg = float(np.mean(conf_hist)) if conf_hist else 0.8 | |
fwd_pct = (100.0*sum(fwd_flags)/len(fwd_flags)) if fwd_flags else None | |
metrics = { | |
"stride_avg_m": stride_avg, | |
"stride_ratio_avg": ratio_avg, | |
"stride_flag": stride_flag, | |
"sym_knee_diff": kd, "sym_knee_flag": kflag, | |
"sym_hip_diff": hd, "sym_hip_flag": hflag, | |
"fwd_lean_pct": fwd_pct, | |
"conf_word": confidence_word(conf_avg), | |
} | |
html_report = compose_simple_report({"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M")}, metrics) | |
html_path = f"{out_prefix}_report.html" | |
with open(html_path,"w") as f: f.write(html_report) | |
return {"normal":norm_path,"slow":slow_path,"report":html_path} | |
def make_zip(outs, bundle_name="outputs"): | |
files3=[outs['normal'], outs['slow'], outs['report']] | |
files3=[p for p in files3 if os.path.exists(p) and os.path.getsize(p)>128] | |
if not files3: | |
return None | |
zip_path=os.path.join(OUTPUT_DIR, f"{bundle_name}_outputs.zip") | |
with zipfile.ZipFile(zip_path,'w',compression=zipfile.ZIP_DEFLATED) as z: | |
for p in files3: z.write(p, arcname=os.path.basename(p)) | |
return zip_path | |