""" Flask 音声同期エディター(波形なし/2本バー+線マッピング) ▶ 使い方 1) 必要: Python 3.9+ / FFmpeg が PATH で使えること 2) 依存関係: pip install Flask 3) 実行: python app.py 4) ブラウザで http://127.0.0.1:5000 を開く 機能: - 2つの音声をアップロード - 横向きの2本バー (上=音声1, 下=音声2) - 上下バー間にドラッグ&ドロップで線を引いて対応(アンカー)を作成 - 複数線の斜め対応により区間毎の速度補正を計算 - FFmpeg の atempo を区間ごとに適用 (分割→速度変更→結合) - 再生/一時停止、拡大/縮小(時間スケール)、現在秒表示 - 変換後、UI の音声2を処理後に差し替え、ダウンロード可能 注意: - atempo は 0.5〜2.0 の範囲。範囲外はチェインで分割適用。 - 区間境界はハードカット (必要なら acrossfade 等の導入を検討) - 簡易実装のため同時複数ユーザーは想定していません """ import math import os import shlex import subprocess import json from pathlib import Path from flask import ( Flask, jsonify, render_template_string, request, send_from_directory, ) BASE_DIR = Path(os.getcwd()).resolve() UPLOAD_DIR = BASE_DIR / "uploads" OUTPUT_DIR = BASE_DIR / "static" / "out" UPLOAD_DIR.mkdir(parents=True, exist_ok=True) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024 # 1GB INDEX_HTML = r""" 音声同期エディター (2本バー+線)

音声同期エディター バー+線

ファイル1を選択:
ファイル2を選択:
0.000 s ズーム: 100 px/s
ファイル1 (基準タイムライン)
ファイル2 (速度調整対象)
音声プレビュー
""" def run_cmd(cmd): """サブプロセス実行のヘルパ (実行内容とログをprint)""" print("=== 実行コマンド ===") print(" ".join(shlex.quote(c) for c in cmd)) proc = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, text=True ) print("=== 標準出力 ===") print(proc.stdout.strip()) print("=== 標準エラー ===") print(proc.stderr.strip()) print("=== 戻り値 ===", proc.returncode) return proc.returncode, proc.stdout.strip(), proc.stderr.strip() def ffprobe_duration(path: Path) -> float: """ffprobe で秒数(float)を取得。失敗時は 0.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(path) ] code, out, err = run_cmd(cmd) if code == 0: try: return float(out) except Exception: return 0.0 return 0.0 def ffprobe_has_video(path: Path) -> bool: """ffprobe でビデオストリームがあるかチェック""" cmd = [ "ffprobe", "-v", "error", "-select_streams", "v", "-show_entries", "stream=codec_type", "-of", "default=noprint_wrappers=1:nokey=1", str(path) ] code, out, err = run_cmd(cmd) return code == 0 and "video" in out def ffprobe_has_audio(path: Path) -> bool: """ffprobe でオーディオストリームがあるかチェック""" cmd = [ "ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=codec_type", "-of", "default=noprint_wrappers=1:nokey=1", str(path) ] code, out, err = run_cmd(cmd) return code == 0 and "audio" in out def atempo_chain(tempo: float) -> str: """atempo は 0.5〜2.0 の範囲制限があるため、複数段に分解してチェインする。""" if tempo <= 0: tempo = 1.0 filters = [] t = tempo # 大きい場合は 2.0 で割り続ける while t > 2.0: filters.append("atempo=2.0") t /= 2.0 # 小さい場合は 0.5 で割って逆にする while t < 0.5: filters.append("atempo=0.5") t /= 0.5 # 最後の 0.5〜2.0 範囲を追加 filters.append(f"atempo={t:.6f}") return ",".join(filters) @app.route('/') def index(): return render_template_string(INDEX_HTML) @app.route('/upload', methods=['POST']) def upload(): f1 = request.files.get('audio1') f2 = request.files.get('audio2') if not f1 or not f2: return jsonify(ok=False, error='audio1 と audio2 が必要です'), 400 # 拡張子 ext1 = os.path.splitext(f1.filename or '')[1] or '.wav' ext2 = os.path.splitext(f2.filename or '')[1] or '.wav' p1 = UPLOAD_DIR / f"audio1{ext1}" p2 = UPLOAD_DIR / f"audio2{ext2}" # 既存ファイルを削除 for p in (p1, p2): try: if p.exists(): p.unlink() except Exception: pass f1.save(p1) f2.save(p2) # ファイルタイプをチェック audio1_is_video = ffprobe_has_video(p1) audio2_is_video = ffprobe_has_video(p2) # レスポンスデータの基本設定 response_data = { 'ok': True, 'audio1_url': f"/media/{p1.name}", 'audio2_url': f"/media/{p2.name}", 'audio1_is_video': audio1_is_video, 'audio2_is_video': audio2_is_video, } # 動画ファイルの場合、音声のみのバージョンも作成(オプション) try: if audio1_is_video: audio_only_path = UPLOAD_DIR / f"audio1_audio.wav" if audio_only_path.exists(): audio_only_path.unlink() # 動画から音声を抽出 cmd = [ "ffmpeg", "-y", "-i", str(p1), "-vn", "-acodec", "pcm_s16le", "-ar", "44100", str(audio_only_path) ] code, out, err = run_cmd(cmd) if code == 0 and audio_only_path.exists(): response_data['audio1_audio_url'] = f"/media/audio1_audio.wav" if audio2_is_video: audio_only_path = UPLOAD_DIR / f"audio2_audio.wav" if audio_only_path.exists(): audio_only_path.unlink() # 動画から音声を抽出 cmd = [ "ffmpeg", "-y", "-i", str(p2), "-vn", "-acodec", "pcm_s16le", "-ar", "44100", str(audio_only_path) ] code, out, err = run_cmd(cmd) if code == 0 and audio_only_path.exists(): response_data['audio2_audio_url'] = f"/media/audio2_audio.wav" except Exception as e: print(f"音声抽出エラー: {e}") # エラーが発生しても続行 return jsonify(response_data) @app.route('/media/') def media(filename): return send_from_directory(UPLOAD_DIR, filename, as_attachment=False) @app.route('/convert', methods=['POST']) def convert_audio(): data = request.get_json(silent=True) or {} mappings = data.get('mappings') or [] is_video2 = data.get('is_video2', False) # 入力ファイル探索 cand1 = sorted(UPLOAD_DIR.glob('audio1.*'), key=os.path.getmtime) cand2 = sorted(UPLOAD_DIR.glob('audio2.*'), key=os.path.getmtime) if not cand1 or not cand2: return jsonify(ok=False, error='先に /upload にファイルを送信してください'), 400 src1 = cand1[-1] src2 = cand2[-1] # 動画ファイルに音声ストリームがあるかチェック has_audio_in_video = ffprobe_has_audio(src2) if is_video2 else True dur1 = ffprobe_duration(src1) dur2 = ffprobe_duration(src2) if dur1 <= 0 or dur2 <= 0: return jsonify(ok=False, error='ファイルの長さを取得できませんでした'), 400 # アンカー生成: 0 と 終端を補完 anchors = [] anchors.append({ 't1': 0.0, 't2': 0.0 }) # ユーザー指定のアンカー (t1 昇順) for m in sorted(mappings, key=lambda x: x.get('t1', 0.0)): try: t1 = float(m.get('t1', 0.0)) t2 = float(m.get('t2', 0.0)) if 0 <= t1 <= dur1 and 0 <= t2 <= dur2: anchors.append({'t1': t1, 't2': t2}) except Exception: pass anchors.append({ 't1': dur1, 't2': dur2 }) # 単調増加になるようにフィルタリング filtered = [anchors[0]] for a in anchors[1:]: if a['t1'] > filtered[-1]['t1'] and a['t2'] > filtered[-1]['t2']: filtered.append(a) anchors = filtered if len(anchors) < 2: return jsonify(ok=False, error='有効な対応線がありません'), 400 # セグメントごとに atempo を計算 segs = [] # (start2, end2, tempo) for i in range(len(anchors)-1): s1, e1 = anchors[i]['t1'], anchors[i+1]['t1'] s2, e2 = anchors[i]['t2'], anchors[i+1]['t2'] src_len = max(0.0, e2 - s2) dst_len = max(0.001, e1 - s1) # 0割回避 if src_len <= 0: # 無効 continue tempo = src_len / dst_len # 入力/出力 segs.append((s2, e2, tempo)) if not segs: return jsonify(ok=False, error='有効な区間が作成できませんでした'), 400 # 出力ファイルの設定 if is_video2: output_path = OUTPUT_DIR / "adjusted_video2.mp4" else: output_path = OUTPUT_DIR / "adjusted_audio2.wav" # 既存ファイルを削除 if output_path.exists(): try: output_path.unlink() except Exception: pass def video_tempo_chain(tempo): """映像用の速度調整チェーン(0.5-2.0の制限対応)""" if tempo <= 0: tempo = 1.0 filters = [] t = tempo # 大きい場合は2.0で割り続ける while t > 2.0: filters.append("setpts=PTS/2.0") t /= 2.0 # 小さい場合は0.5で割って逆にする while t < 0.5: filters.append("setpts=PTS/0.5") t /= 0.5 # 最後の0.5〜2.0範囲を追加 if abs(t - 1.0) > 0.001: # 1.0と大きく異なる場合のみ追加 filters.append(f"setpts=PTS/{t:.6f}") return ",".join(filters) if filters else "null" # FFmpeg フィルタ構築 if is_video2: try: # 映像と音声のフィルタを別々に構築 video_filters = [] audio_filters = [] video_labels = [] audio_labels = [] for idx, (st, ed, tempo) in enumerate(segs): v_lab = f"v{idx}" a_lab = f"a{idx}" video_tempo_filter = video_tempo_chain(tempo) # 映像フィルタ: trim + setptsで速度調整 video_filters.append( f"[0:v]trim=start={st:.6f}:end={ed:.6f},setpts=PTS-STARTPTS,{video_tempo_filter}[{v_lab}]" ) # 音声フィルタ: atrim + atempoで速度調整 if has_audio_in_video: atempo_f = atempo_chain(tempo) audio_filters.append( f"[0:a]atrim=start={st:.6f}:end={ed:.6f},asetpts=PTS-STARTPTS,{atempo_f}[{a_lab}]" ) video_labels.append(f"[{v_lab}]") if has_audio_in_video: audio_labels.append(f"[{a_lab}]") # concatフィルタ if has_audio_in_video: concat_inputs = "".join([f"{vl}{al}" for vl, al in zip(video_labels, audio_labels)]) concat_filter = f"{concat_inputs}concat=n={len(segs)}:v=1:a=1[outv][outa]" filter_complex = ";".join(video_filters + audio_filters + [concat_filter]) cmd = [ "ffmpeg", "-y", "-i", str(src2), "-filter_complex", filter_complex, "-map", "[outv]", "-map", "[outa]", "-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", str(output_path) ] else: # 音声がない場合 concat_inputs = "".join(video_labels) concat_filter = f"{concat_inputs}concat=n={len(segs)}:v=1:a=0[outv]" filter_complex = ";".join(video_filters + [concat_filter]) cmd = [ "ffmpeg", "-y", "-i", str(src2), "-filter_complex", filter_complex, "-map", "[outv]", "-an", # 音声なし "-c:v", "libx264", "-preset", "veryfast", "-crf", "23", str(output_path) ] code, out, err = run_cmd(cmd) if code != 0: return jsonify(ok=False, error=f"動画処理失敗: {err[:1000]}") except Exception as e: return jsonify(ok=False, error=f"動画処理エラー: {str(e)}") else: # 音声のみ処理(既存のまま) filters = [] labels = [] for idx, (st, ed, tempo) in enumerate(segs): atempo_f = atempo_chain(tempo) lab = f"a{idx}" f = ( f"[0:a]atrim=start={st:.6f}:end={ed:.6f}," f"asetpts=PTS-STARTPTS,{atempo_f}[{lab}]" ) filters.append(f) labels.append(f"[{lab}]") concat = f"{''.join(labels)}concat=n={len(segs)}:v=0:a=1[outa]" filter_complex = ";".join(filters + [concat]) cmd = [ "ffmpeg", "-y", "-i", str(src2), "-filter_complex", filter_complex, "-map", "[outa]", str(output_path) ] code, out, err = run_cmd(cmd) if code != 0: return jsonify(ok=False, error=f"FFmpeg 失敗: {err[:4000]}") response_data = { 'ok': True, 'output_url': f"/static/out/{output_path.name}", 'output_filename': output_path.name, } # 動画ファイルの場合、音声のみのバージョンも作成 if is_video2 and has_audio_in_video: try: audio_only_path = OUTPUT_DIR / "adjusted_audio.wav" if audio_only_path.exists(): audio_only_path.unlink() cmd_audio_extract = [ "ffmpeg", "-y", "-i", str(output_path), "-vn", "-acodec", "pcm_s16le", "-ar", "44100", str(audio_only_path) ] code, out, err = run_cmd(cmd_audio_extract) if code == 0: response_data['output_audio_url'] = f"/static/out/adjusted_audio.wav" except: pass return jsonify(response_data) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True, threaded=True)