Spaces:
Sleeping
Sleeping
# app.py β AutoShorts Web UI (Streamlit) | |
# Run with: streamlit run app.py | |
# Requires: FFmpeg in PATH, and pip packages installed (see instructions) | |
import os, io, zipfile, tempfile, subprocess, textwrap | |
from pathlib import Path | |
import streamlit as st | |
from moviepy.editor import VideoFileClip | |
import numpy as np | |
# Transcription (faster-whisper) | |
from faster_whisper import WhisperModel | |
# ---------- Utilities ---------- | |
def format_time(sec: float) -> str: | |
h = int(sec // 3600) | |
m = int((sec % 3600) // 60) | |
s = int(sec % 60) | |
ms = int((sec % 1) * 1000) | |
return f"{h:02}:{m:02}:{s:02},{ms:03}" | |
def srt_from_segments(segs, clip_start, clip_end): | |
lines, idx = [], 1 | |
for s in segs: | |
s_start, s_end = float(s["start"]), float(s["end"]) | |
if s_end < clip_start or s_start > clip_end: | |
continue | |
t0 = max(s_start, clip_start) - clip_start | |
t1 = min(s_end, clip_end) - clip_start | |
def ts(t): | |
h = int(t // 3600); m = int((t % 3600) // 60) | |
sec = t % 60 | |
return f"{h:02d}:{m:02d}:{sec:06.3f}".replace(".", ",") | |
text = s["text"].strip() | |
wrapped = "\n".join(textwrap.wrap(text, width=36)) or text | |
lines += [str(idx), f"{ts(t0)} --> {ts(t1)}", wrapped, ""] | |
idx += 1 | |
return "\n".join(lines) | |
def score_windows(segs, window=22, stride=8): | |
"""Score windows by simple heuristics on transcript text.""" | |
if not segs: | |
return [] | |
t_end = float(segs[-1]["end"]) | |
hooks = ["how to", "here's why", "the secret", "you need", "do this", | |
"mistake", "tip", "hack", "lesson", "watch", "today", "now"] | |
windows = [] | |
t = 0.0 | |
while t + window <= t_end: | |
text = " ".join(s["text"] for s in segs if not (s["end"] < t or s["start"] > t+window)).lower() | |
if text.strip(): | |
score = 0.0 | |
score += sum(1 for k in hooks if k in text) * 2.0 | |
score += text.count(".") * 0.2 + text.count("!") * 0.4 + text.count("?") * 0.3 | |
score += min(1.5, max(1, len(text.split())) / 40.0) | |
windows.append({"start": t, "end": t+window, "score": score}) | |
t += stride | |
windows.sort(key=lambda x: x["score"], reverse=True) | |
# Non-overlapping pick | |
selected = [] | |
for w in windows: | |
if all(not (w["start"] < s["end"] and w["end"] > s["start"]) for s in selected): | |
selected.append(w) | |
if len(selected) >= 50: | |
break | |
return selected | |
def export_clip(input_path, output_path, t0, t1, burn_srt_text=None, | |
target_h=1920, target_w=1080): | |
"""Crop to vertical 9:16, write temp mp4; optionally burn SRT with ffmpeg.""" | |
out_path = os.path.abspath(str(output_path)) | |
out_dir = os.path.dirname(out_path) | |
os.makedirs(out_dir, exist_ok=True) | |
# 1) Create vertical video temp file | |
tmp = out_path.replace(".mp4", "_tmp.mp4") | |
with VideoFileClip(input_path) as v: | |
sub = v.subclip(t0, t1) | |
aspect = sub.w / sub.h | |
vertical_aspect = target_w / target_h | |
if aspect > vertical_aspect: | |
new_h = target_h | |
new_w = int(aspect * new_h) | |
resized = sub.resize(height=new_h) | |
x1 = (new_w - target_w) // 2 | |
base = resized.crop(x1=x1, y1=0, x2=x1+target_w, y2=target_h) | |
else: | |
new_w = target_w | |
new_h = int(new_w / aspect) | |
resized = sub.resize(width=new_w) | |
y1 = (new_h - target_h) // 2 | |
base = resized.crop(x1=0, y1=y1, x2=target_w, y2=y1+target_h) | |
base.write_videofile( | |
tmp, codec="libx264", audio_codec="aac", fps=30, threads=4, | |
verbose=False, logger=None | |
) | |
# 2) Burn subtitles if provided and non-empty | |
if burn_srt_text and burn_srt_text.strip(): | |
srt_path = out_path.replace(".mp4", ".srt") | |
with open(srt_path, "w", encoding="utf-8") as f: | |
f.write(burn_srt_text) | |
# Windows/FFmpeg-safe paths | |
tmp_ff = os.path.abspath(tmp).replace("\\", "/") | |
srt_ff = os.path.abspath(srt_path).replace("\\", "/") | |
out_ff = os.path.abspath(out_path).replace("\\", "/") | |
# Escape drive colon: C:/ -> C\:/ | |
srt_ff_escaped = srt_ff.replace(":", r"\:") | |
cmd = [ | |
"ffmpeg", "-y", | |
"-i", tmp_ff, | |
"-vf", f"subtitles=filename='{srt_ff_escaped}'", | |
"-c:a", "copy", | |
out_ff | |
] | |
subprocess.run(cmd, check=True) | |
try: | |
os.remove(srt_path) | |
os.remove(tmp) | |
except OSError: | |
pass | |
else: | |
# No subs β just move the temp video into place | |
if os.path.exists(out_path): | |
os.remove(out_path) | |
os.replace(tmp, out_path) | |
def transcribe_with_whisper(audio_path, model_size="base"): | |
"""Transcribe with faster-whisper; returns list of dict segments.""" | |
# CPU-friendly compute_type | |
model = WhisperModel(model_size, compute_type="int8") | |
segments, _info = model.transcribe(audio_path, language="en", vad_filter=True, beam_size=5) | |
out = [{"start": float(s.start), "end": float(s.end), "text": s.text.strip()} for s in segments] | |
return out | |
def extract_audio_16k_mono(input_path, out_path): | |
cmd = ["ffmpeg", "-y", "-i", input_path, "-vn", "-ac", "1", "-ar", "16000", out_path] | |
subprocess.run(cmd, check=True) | |
def plan_windows(segs, video_duration, clips, min_sec, max_sec): | |
"""Pick best windows; if fewer than requested, fill evenly to guarantee N clips.""" | |
base_window = min(max_sec, max(min_sec, 22)) | |
if video_duration and base_window >= video_duration: | |
base_window = max(6, int(video_duration * 0.9)) | |
best = score_windows(segs, window=base_window, stride=8)[:clips] | |
# Fallback: fill with evenly spaced slices | |
if video_duration and len(best) < clips: | |
missing = clips - len(best) | |
seg_len = max(5, min(max_sec, int(video_duration / max(1, clips)))) | |
if clips > 1 and video_duration > seg_len: | |
starts = [i * (video_duration - seg_len) / (clips - 1) for i in range(clips)] | |
else: | |
starts = [0.0] | |
fill = [{"start": s, "end": min(video_duration, s + seg_len), "score": 0.0} | |
for s in starts[len(best):len(best)+missing]] | |
best = best + fill | |
return best | |
# ---------- Streamlit UI ---------- | |
st.set_page_config(page_title="AutoShorts", page_icon="π¬", layout="centered") | |
st.title("π¬ AutoShorts β Long video β vertical clips") | |
with st.sidebar: | |
st.header("Settings") | |
clips = st.slider("Number of clips", 1, 12, 4, 1) | |
min_sec = st.slider("Min seconds per clip", 4, 40, 6, 1) | |
max_sec = st.slider("Max seconds per clip", 6, 60, 12, 1) | |
captions = st.checkbox("Burn captions (if speech detected)", value=True) | |
model_size = st.selectbox("Whisper model", ["tiny", "base", "small"], index=1, | |
help="Bigger = better accuracy, slower download/compute.") | |
uploaded = st.file_uploader("Upload a .mp4 (or .mov/.mkv)", type=["mp4", "mov", "mkv"]) | |
run = st.button("Make Clips π", type="primary", disabled=uploaded is None) | |
if run and uploaded is not None: | |
# Work area | |
with tempfile.TemporaryDirectory() as workdir: | |
in_path = os.path.join(workdir, "input.mp4") | |
with open(in_path, "wb") as f: | |
f.write(uploaded.read()) | |
# Read duration early | |
try: | |
with VideoFileClip(in_path) as v: | |
duration = float(v.duration or 0.0) | |
except Exception as e: | |
st.error(f"Could not open video: {e}") | |
st.stop() | |
# Audio extract + transcription | |
with st.status("π Extracting audio + transcribing...", expanded=True) as status: | |
audio_path = os.path.join(workdir, "audio.m4a") | |
try: | |
extract_audio_16k_mono(in_path, audio_path) | |
st.write("β Audio extracted (16kHz mono)") | |
except subprocess.CalledProcessError as e: | |
st.error("FFmpeg failed to extract audio.") | |
st.stop() | |
try: | |
segs = transcribe_with_whisper(audio_path, model_size=model_size) | |
st.write(f"β Transcription ok β {len(segs)} segments") | |
except Exception as e: | |
st.warning(f"Transcription failed ({e}). Proceeding without captions.") | |
segs = [] | |
status.update(label="π§ Planning best windows...", state="running") | |
# Plan windows (guarantee N) | |
windows = plan_windows(segs, duration, clips, min_sec, max_sec) | |
if not windows: | |
st.error("Could not plan any clips. Try smaller min/max or a longer video.") | |
st.stop() | |
# Export | |
out_dir = os.path.join(workdir, "exports") | |
os.makedirs(out_dir, exist_ok=True) | |
st.subheader("Exporting clips") | |
progress = st.progress(0.0, text="Starting...") | |
logs = st.empty() | |
out_files = [] | |
for i, w in enumerate(windows, start=1): | |
t0 = float(w["start"]) | |
t1 = float(w["end"]) | |
outp = os.path.join(out_dir, f"short_{i:02d}.mp4") | |
srt_text = srt_from_segments(segs, clip_start=t0, clip_end=t1) if (captions and segs) else None | |
logs.write(f"Clip {i}/{clips}: {t0:.2f}s β {t1:.2f}s") | |
try: | |
export_clip(in_path, outp, t0, t1, burn_srt_text=srt_text) | |
out_files.append(outp) | |
except subprocess.CalledProcessError as e: | |
# If subtitle burn fails, retry without captions | |
try: | |
export_clip(in_path, outp, t0, t1, burn_srt_text=None) | |
out_files.append(outp) | |
logs.write(f"β οΈ Captions failed on clip {i}; exported without captions.") | |
except Exception as e2: | |
logs.write(f"β Failed clip {i}: {e2}") | |
progress.progress(i / len(windows), text=f"Exported {i}/{len(windows)}") | |
st.success(f"Done. Exported {len(out_files)} clip(s).") | |
# Show players + build ZIP | |
for fp in out_files: | |
st.video(fp) | |
# Zip for download | |
mem_zip = io.BytesIO() | |
with zipfile.ZipFile(mem_zip, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: | |
for fp in out_files: | |
zf.write(fp, arcname=os.path.basename(fp)) | |
mem_zip.seek(0) | |
st.download_button( | |
"Download all clips (ZIP)", | |
mem_zip, | |
file_name="autoshorts_exports.zip", | |
mime="application/zip", | |
) | |
else: | |
st.info("Upload a video, tweak settings in the sidebar, then click **Make Clips π**.") | |