autoshorts / app.py
h4sh99's picture
Create app.py
bad8a14 verified
# app.py β€” AutoShorts Web UI (Streamlit)
# Run with: streamlit run app.py
# Requires: FFmpeg in PATH, and pip packages installed (see instructions)
import os, io, zipfile, tempfile, subprocess, textwrap
from pathlib import Path
import streamlit as st
from moviepy.editor import VideoFileClip
import numpy as np
# Transcription (faster-whisper)
from faster_whisper import WhisperModel
# ---------- Utilities ----------
def format_time(sec: float) -> str:
h = int(sec // 3600)
m = int((sec % 3600) // 60)
s = int(sec % 60)
ms = int((sec % 1) * 1000)
return f"{h:02}:{m:02}:{s:02},{ms:03}"
def srt_from_segments(segs, clip_start, clip_end):
lines, idx = [], 1
for s in segs:
s_start, s_end = float(s["start"]), float(s["end"])
if s_end < clip_start or s_start > clip_end:
continue
t0 = max(s_start, clip_start) - clip_start
t1 = min(s_end, clip_end) - clip_start
def ts(t):
h = int(t // 3600); m = int((t % 3600) // 60)
sec = t % 60
return f"{h:02d}:{m:02d}:{sec:06.3f}".replace(".", ",")
text = s["text"].strip()
wrapped = "\n".join(textwrap.wrap(text, width=36)) or text
lines += [str(idx), f"{ts(t0)} --> {ts(t1)}", wrapped, ""]
idx += 1
return "\n".join(lines)
def score_windows(segs, window=22, stride=8):
"""Score windows by simple heuristics on transcript text."""
if not segs:
return []
t_end = float(segs[-1]["end"])
hooks = ["how to", "here's why", "the secret", "you need", "do this",
"mistake", "tip", "hack", "lesson", "watch", "today", "now"]
windows = []
t = 0.0
while t + window <= t_end:
text = " ".join(s["text"] for s in segs if not (s["end"] < t or s["start"] > t+window)).lower()
if text.strip():
score = 0.0
score += sum(1 for k in hooks if k in text) * 2.0
score += text.count(".") * 0.2 + text.count("!") * 0.4 + text.count("?") * 0.3
score += min(1.5, max(1, len(text.split())) / 40.0)
windows.append({"start": t, "end": t+window, "score": score})
t += stride
windows.sort(key=lambda x: x["score"], reverse=True)
# Non-overlapping pick
selected = []
for w in windows:
if all(not (w["start"] < s["end"] and w["end"] > s["start"]) for s in selected):
selected.append(w)
if len(selected) >= 50:
break
return selected
def export_clip(input_path, output_path, t0, t1, burn_srt_text=None,
target_h=1920, target_w=1080):
"""Crop to vertical 9:16, write temp mp4; optionally burn SRT with ffmpeg."""
out_path = os.path.abspath(str(output_path))
out_dir = os.path.dirname(out_path)
os.makedirs(out_dir, exist_ok=True)
# 1) Create vertical video temp file
tmp = out_path.replace(".mp4", "_tmp.mp4")
with VideoFileClip(input_path) as v:
sub = v.subclip(t0, t1)
aspect = sub.w / sub.h
vertical_aspect = target_w / target_h
if aspect > vertical_aspect:
new_h = target_h
new_w = int(aspect * new_h)
resized = sub.resize(height=new_h)
x1 = (new_w - target_w) // 2
base = resized.crop(x1=x1, y1=0, x2=x1+target_w, y2=target_h)
else:
new_w = target_w
new_h = int(new_w / aspect)
resized = sub.resize(width=new_w)
y1 = (new_h - target_h) // 2
base = resized.crop(x1=0, y1=y1, x2=target_w, y2=y1+target_h)
base.write_videofile(
tmp, codec="libx264", audio_codec="aac", fps=30, threads=4,
verbose=False, logger=None
)
# 2) Burn subtitles if provided and non-empty
if burn_srt_text and burn_srt_text.strip():
srt_path = out_path.replace(".mp4", ".srt")
with open(srt_path, "w", encoding="utf-8") as f:
f.write(burn_srt_text)
# Windows/FFmpeg-safe paths
tmp_ff = os.path.abspath(tmp).replace("\\", "/")
srt_ff = os.path.abspath(srt_path).replace("\\", "/")
out_ff = os.path.abspath(out_path).replace("\\", "/")
# Escape drive colon: C:/ -> C\:/
srt_ff_escaped = srt_ff.replace(":", r"\:")
cmd = [
"ffmpeg", "-y",
"-i", tmp_ff,
"-vf", f"subtitles=filename='{srt_ff_escaped}'",
"-c:a", "copy",
out_ff
]
subprocess.run(cmd, check=True)
try:
os.remove(srt_path)
os.remove(tmp)
except OSError:
pass
else:
# No subs β†’ just move the temp video into place
if os.path.exists(out_path):
os.remove(out_path)
os.replace(tmp, out_path)
def transcribe_with_whisper(audio_path, model_size="base"):
"""Transcribe with faster-whisper; returns list of dict segments."""
# CPU-friendly compute_type
model = WhisperModel(model_size, compute_type="int8")
segments, _info = model.transcribe(audio_path, language="en", vad_filter=True, beam_size=5)
out = [{"start": float(s.start), "end": float(s.end), "text": s.text.strip()} for s in segments]
return out
def extract_audio_16k_mono(input_path, out_path):
cmd = ["ffmpeg", "-y", "-i", input_path, "-vn", "-ac", "1", "-ar", "16000", out_path]
subprocess.run(cmd, check=True)
def plan_windows(segs, video_duration, clips, min_sec, max_sec):
"""Pick best windows; if fewer than requested, fill evenly to guarantee N clips."""
base_window = min(max_sec, max(min_sec, 22))
if video_duration and base_window >= video_duration:
base_window = max(6, int(video_duration * 0.9))
best = score_windows(segs, window=base_window, stride=8)[:clips]
# Fallback: fill with evenly spaced slices
if video_duration and len(best) < clips:
missing = clips - len(best)
seg_len = max(5, min(max_sec, int(video_duration / max(1, clips))))
if clips > 1 and video_duration > seg_len:
starts = [i * (video_duration - seg_len) / (clips - 1) for i in range(clips)]
else:
starts = [0.0]
fill = [{"start": s, "end": min(video_duration, s + seg_len), "score": 0.0}
for s in starts[len(best):len(best)+missing]]
best = best + fill
return best
# ---------- Streamlit UI ----------
st.set_page_config(page_title="AutoShorts", page_icon="🎬", layout="centered")
st.title("🎬 AutoShorts β€” Long video ➜ vertical clips")
with st.sidebar:
st.header("Settings")
clips = st.slider("Number of clips", 1, 12, 4, 1)
min_sec = st.slider("Min seconds per clip", 4, 40, 6, 1)
max_sec = st.slider("Max seconds per clip", 6, 60, 12, 1)
captions = st.checkbox("Burn captions (if speech detected)", value=True)
model_size = st.selectbox("Whisper model", ["tiny", "base", "small"], index=1,
help="Bigger = better accuracy, slower download/compute.")
uploaded = st.file_uploader("Upload a .mp4 (or .mov/.mkv)", type=["mp4", "mov", "mkv"])
run = st.button("Make Clips πŸš€", type="primary", disabled=uploaded is None)
if run and uploaded is not None:
# Work area
with tempfile.TemporaryDirectory() as workdir:
in_path = os.path.join(workdir, "input.mp4")
with open(in_path, "wb") as f:
f.write(uploaded.read())
# Read duration early
try:
with VideoFileClip(in_path) as v:
duration = float(v.duration or 0.0)
except Exception as e:
st.error(f"Could not open video: {e}")
st.stop()
# Audio extract + transcription
with st.status("πŸ”Š Extracting audio + transcribing...", expanded=True) as status:
audio_path = os.path.join(workdir, "audio.m4a")
try:
extract_audio_16k_mono(in_path, audio_path)
st.write("βœ… Audio extracted (16kHz mono)")
except subprocess.CalledProcessError as e:
st.error("FFmpeg failed to extract audio.")
st.stop()
try:
segs = transcribe_with_whisper(audio_path, model_size=model_size)
st.write(f"βœ… Transcription ok β€” {len(segs)} segments")
except Exception as e:
st.warning(f"Transcription failed ({e}). Proceeding without captions.")
segs = []
status.update(label="🧠 Planning best windows...", state="running")
# Plan windows (guarantee N)
windows = plan_windows(segs, duration, clips, min_sec, max_sec)
if not windows:
st.error("Could not plan any clips. Try smaller min/max or a longer video.")
st.stop()
# Export
out_dir = os.path.join(workdir, "exports")
os.makedirs(out_dir, exist_ok=True)
st.subheader("Exporting clips")
progress = st.progress(0.0, text="Starting...")
logs = st.empty()
out_files = []
for i, w in enumerate(windows, start=1):
t0 = float(w["start"])
t1 = float(w["end"])
outp = os.path.join(out_dir, f"short_{i:02d}.mp4")
srt_text = srt_from_segments(segs, clip_start=t0, clip_end=t1) if (captions and segs) else None
logs.write(f"Clip {i}/{clips}: {t0:.2f}s β†’ {t1:.2f}s")
try:
export_clip(in_path, outp, t0, t1, burn_srt_text=srt_text)
out_files.append(outp)
except subprocess.CalledProcessError as e:
# If subtitle burn fails, retry without captions
try:
export_clip(in_path, outp, t0, t1, burn_srt_text=None)
out_files.append(outp)
logs.write(f"⚠️ Captions failed on clip {i}; exported without captions.")
except Exception as e2:
logs.write(f"❌ Failed clip {i}: {e2}")
progress.progress(i / len(windows), text=f"Exported {i}/{len(windows)}")
st.success(f"Done. Exported {len(out_files)} clip(s).")
# Show players + build ZIP
for fp in out_files:
st.video(fp)
# Zip for download
mem_zip = io.BytesIO()
with zipfile.ZipFile(mem_zip, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
for fp in out_files:
zf.write(fp, arcname=os.path.basename(fp))
mem_zip.seek(0)
st.download_button(
"Download all clips (ZIP)",
mem_zip,
file_name="autoshorts_exports.zip",
mime="application/zip",
)
else:
st.info("Upload a video, tweak settings in the sidebar, then click **Make Clips πŸš€**.")