"""
Flask 音声同期エディター(波形なし/2本バー+線マッピング)
▶ 使い方
1) 必要: Python 3.9+ / FFmpeg が PATH で使えること
2) 依存関係: pip install Flask
3) 実行: python app.py
4) ブラウザで http://127.0.0.1:5000 を開く
機能:
- 2つの音声をアップロード
- 横向きの2本バー (上=音声1, 下=音声2)
- 上下バー間にドラッグ&ドロップで線を引いて対応(アンカー)を作成
- 複数線の斜め対応により区間毎の速度補正を計算
- FFmpeg の atempo を区間ごとに適用 (分割→速度変更→結合)
- 再生/一時停止、拡大/縮小(時間スケール)、現在秒表示
- 変換後、UI の音声2を処理後に差し替え、ダウンロード可能
注意:
- atempo は 0.5〜2.0 の範囲。範囲外はチェインで分割適用。
- 区間境界はハードカット (必要なら acrossfade 等の導入を検討)
- 簡易実装のため同時複数ユーザーは想定していません
"""
import math
import os
import shlex
import subprocess
import json
from pathlib import Path
from flask import (
Flask,
jsonify,
render_template_string,
request,
send_from_directory,
)
BASE_DIR = Path(os.getcwd()).resolve()
UPLOAD_DIR = BASE_DIR / "uploads"
OUTPUT_DIR = BASE_DIR / "static" / "out"
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024 # 1GB
INDEX_HTML = r"""
音声同期エディター (2本バー+線)
音声同期エディター バー+線
0.000 s
ズーム: 100 px/s
"""
def run_cmd(cmd):
"""サブプロセス実行のヘルパ (実行内容とログをprint)"""
print("=== 実行コマンド ===")
print(" ".join(shlex.quote(c) for c in cmd))
proc = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
text=True
)
print("=== 標準出力 ===")
print(proc.stdout.strip())
print("=== 標準エラー ===")
print(proc.stderr.strip())
print("=== 戻り値 ===", proc.returncode)
return proc.returncode, proc.stdout.strip(), proc.stderr.strip()
def ffprobe_duration(path: Path) -> float:
"""ffprobe で秒数(float)を取得。失敗時は 0."""
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(path)
]
code, out, err = run_cmd(cmd)
if code == 0:
try:
return float(out)
except Exception:
return 0.0
return 0.0
def ffprobe_has_video(path: Path) -> bool:
"""ffprobe でビデオストリームがあるかチェック"""
cmd = [
"ffprobe", "-v", "error",
"-select_streams", "v",
"-show_entries", "stream=codec_type",
"-of", "default=noprint_wrappers=1:nokey=1",
str(path)
]
code, out, err = run_cmd(cmd)
return code == 0 and "video" in out
def ffprobe_has_audio(path: Path) -> bool:
"""ffprobe でオーディオストリームがあるかチェック"""
cmd = [
"ffprobe", "-v", "error",
"-select_streams", "a",
"-show_entries", "stream=codec_type",
"-of", "default=noprint_wrappers=1:nokey=1",
str(path)
]
code, out, err = run_cmd(cmd)
return code == 0 and "audio" in out
def atempo_chain(tempo: float) -> str:
"""atempo は 0.5〜2.0 の範囲制限があるため、複数段に分解してチェインする。"""
if tempo <= 0:
tempo = 1.0
filters = []
t = tempo
# 大きい場合は 2.0 で割り続ける
while t > 2.0:
filters.append("atempo=2.0")
t /= 2.0
# 小さい場合は 0.5 で割って逆にする
while t < 0.5:
filters.append("atempo=0.5")
t /= 0.5
# 最後の 0.5〜2.0 範囲を追加
filters.append(f"atempo={t:.6f}")
return ",".join(filters)
@app.route('/')
def index():
return render_template_string(INDEX_HTML)
@app.route('/upload', methods=['POST'])
def upload():
f1 = request.files.get('audio1')
f2 = request.files.get('audio2')
if not f1 or not f2:
return jsonify(ok=False, error='audio1 と audio2 が必要です'), 400
# 拡張子
ext1 = os.path.splitext(f1.filename or '')[1] or '.wav'
ext2 = os.path.splitext(f2.filename or '')[1] or '.wav'
p1 = UPLOAD_DIR / f"audio1{ext1}"
p2 = UPLOAD_DIR / f"audio2{ext2}"
# 既存ファイルを削除
for p in (p1, p2):
try:
if p.exists(): p.unlink()
except Exception:
pass
f1.save(p1)
f2.save(p2)
# ファイルタイプをチェック
audio1_is_video = ffprobe_has_video(p1)
audio2_is_video = ffprobe_has_video(p2)
# レスポンスデータの基本設定
response_data = {
'ok': True,
'audio1_url': f"/media/{p1.name}",
'audio2_url': f"/media/{p2.name}",
'audio1_is_video': audio1_is_video,
'audio2_is_video': audio2_is_video,
}
# 動画ファイルの場合、音声のみのバージョンも作成(オプション)
try:
if audio1_is_video:
audio_only_path = UPLOAD_DIR / f"audio1_audio.wav"
if audio_only_path.exists():
audio_only_path.unlink()
# 動画から音声を抽出
cmd = [
"ffmpeg", "-y", "-i", str(p1),
"-vn", "-acodec", "pcm_s16le", "-ar", "44100",
str(audio_only_path)
]
code, out, err = run_cmd(cmd)
if code == 0 and audio_only_path.exists():
response_data['audio1_audio_url'] = f"/media/audio1_audio.wav"
if audio2_is_video:
audio_only_path = UPLOAD_DIR / f"audio2_audio.wav"
if audio_only_path.exists():
audio_only_path.unlink()
# 動画から音声を抽出
cmd = [
"ffmpeg", "-y", "-i", str(p2),
"-vn", "-acodec", "pcm_s16le", "-ar", "44100",
str(audio_only_path)
]
code, out, err = run_cmd(cmd)
if code == 0 and audio_only_path.exists():
response_data['audio2_audio_url'] = f"/media/audio2_audio.wav"
except Exception as e:
print(f"音声抽出エラー: {e}")
# エラーが発生しても続行
return jsonify(response_data)
@app.route('/media/')
def media(filename):
return send_from_directory(UPLOAD_DIR, filename, as_attachment=False)
@app.route('/convert', methods=['POST'])
def convert_audio():
data = request.get_json(silent=True) or {}
mappings = data.get('mappings') or []
is_video2 = data.get('is_video2', False)
# 入力ファイル探索
cand1 = sorted(UPLOAD_DIR.glob('audio1.*'), key=os.path.getmtime)
cand2 = sorted(UPLOAD_DIR.glob('audio2.*'), key=os.path.getmtime)
if not cand1 or not cand2:
return jsonify(ok=False, error='先に /upload にファイルを送信してください'), 400
src1 = cand1[-1]
src2 = cand2[-1]
# 動画ファイルに音声ストリームがあるかチェック
has_audio_in_video = ffprobe_has_audio(src2) if is_video2 else True
dur1 = ffprobe_duration(src1)
dur2 = ffprobe_duration(src2)
if dur1 <= 0 or dur2 <= 0:
return jsonify(ok=False, error='ファイルの長さを取得できませんでした'), 400
# アンカー生成: 0 と 終端を補完
anchors = []
anchors.append({ 't1': 0.0, 't2': 0.0 })
# ユーザー指定のアンカー (t1 昇順)
for m in sorted(mappings, key=lambda x: x.get('t1', 0.0)):
try:
t1 = float(m.get('t1', 0.0))
t2 = float(m.get('t2', 0.0))
if 0 <= t1 <= dur1 and 0 <= t2 <= dur2:
anchors.append({'t1': t1, 't2': t2})
except Exception:
pass
anchors.append({ 't1': dur1, 't2': dur2 })
# 単調増加になるようにフィルタリング
filtered = [anchors[0]]
for a in anchors[1:]:
if a['t1'] > filtered[-1]['t1'] and a['t2'] > filtered[-1]['t2']:
filtered.append(a)
anchors = filtered
if len(anchors) < 2:
return jsonify(ok=False, error='有効な対応線がありません'), 400
# セグメントごとに atempo を計算
segs = [] # (start2, end2, tempo)
for i in range(len(anchors)-1):
s1, e1 = anchors[i]['t1'], anchors[i+1]['t1']
s2, e2 = anchors[i]['t2'], anchors[i+1]['t2']
src_len = max(0.0, e2 - s2)
dst_len = max(0.001, e1 - s1) # 0割回避
if src_len <= 0: # 無効
continue
tempo = src_len / dst_len # 入力/出力
segs.append((s2, e2, tempo))
if not segs:
return jsonify(ok=False, error='有効な区間が作成できませんでした'), 400
# 出力ファイルの設定
if is_video2:
output_path = OUTPUT_DIR / "adjusted_video2.mp4"
else:
output_path = OUTPUT_DIR / "adjusted_audio2.wav"
# 既存ファイルを削除
if output_path.exists():
try:
output_path.unlink()
except Exception:
pass
def video_tempo_chain(tempo):
"""映像用の速度調整チェーン(0.5-2.0の制限対応)"""
if tempo <= 0:
tempo = 1.0
filters = []
t = tempo
# 大きい場合は2.0で割り続ける
while t > 2.0:
filters.append("setpts=PTS/2.0")
t /= 2.0
# 小さい場合は0.5で割って逆にする
while t < 0.5:
filters.append("setpts=PTS/0.5")
t /= 0.5
# 最後の0.5〜2.0範囲を追加
if abs(t - 1.0) > 0.001: # 1.0と大きく異なる場合のみ追加
filters.append(f"setpts=PTS/{t:.6f}")
return ",".join(filters) if filters else "null"
# FFmpeg フィルタ構築
if is_video2:
try:
# 映像と音声のフィルタを別々に構築
video_filters = []
audio_filters = []
video_labels = []
audio_labels = []
for idx, (st, ed, tempo) in enumerate(segs):
v_lab = f"v{idx}"
a_lab = f"a{idx}"
video_tempo_filter = video_tempo_chain(tempo)
# 映像フィルタ: trim + setptsで速度調整
video_filters.append(
f"[0:v]trim=start={st:.6f}:end={ed:.6f},setpts=PTS-STARTPTS,{video_tempo_filter}[{v_lab}]"
)
# 音声フィルタ: atrim + atempoで速度調整
if has_audio_in_video:
atempo_f = atempo_chain(tempo)
audio_filters.append(
f"[0:a]atrim=start={st:.6f}:end={ed:.6f},asetpts=PTS-STARTPTS,{atempo_f}[{a_lab}]"
)
video_labels.append(f"[{v_lab}]")
if has_audio_in_video:
audio_labels.append(f"[{a_lab}]")
# concatフィルタ
if has_audio_in_video:
concat_inputs = "".join([f"{vl}{al}" for vl, al in zip(video_labels, audio_labels)])
concat_filter = f"{concat_inputs}concat=n={len(segs)}:v=1:a=1[outv][outa]"
filter_complex = ";".join(video_filters + audio_filters + [concat_filter])
cmd = [
"ffmpeg", "-y",
"-i", str(src2),
"-filter_complex", filter_complex,
"-map", "[outv]",
"-map", "[outa]",
"-c:v", "libx264", "-preset", "veryfast", "-crf", "23",
"-c:a", "aac", "-b:a", "128k",
"-movflags", "+faststart",
str(output_path)
]
else:
# 音声がない場合
concat_inputs = "".join(video_labels)
concat_filter = f"{concat_inputs}concat=n={len(segs)}:v=1:a=0[outv]"
filter_complex = ";".join(video_filters + [concat_filter])
cmd = [
"ffmpeg", "-y",
"-i", str(src2),
"-filter_complex", filter_complex,
"-map", "[outv]",
"-an", # 音声なし
"-c:v", "libx264", "-preset", "veryfast", "-crf", "23",
str(output_path)
]
code, out, err = run_cmd(cmd)
if code != 0:
return jsonify(ok=False, error=f"動画処理失敗: {err[:1000]}")
except Exception as e:
return jsonify(ok=False, error=f"動画処理エラー: {str(e)}")
else:
# 音声のみ処理(既存のまま)
filters = []
labels = []
for idx, (st, ed, tempo) in enumerate(segs):
atempo_f = atempo_chain(tempo)
lab = f"a{idx}"
f = (
f"[0:a]atrim=start={st:.6f}:end={ed:.6f},"
f"asetpts=PTS-STARTPTS,{atempo_f}[{lab}]"
)
filters.append(f)
labels.append(f"[{lab}]")
concat = f"{''.join(labels)}concat=n={len(segs)}:v=0:a=1[outa]"
filter_complex = ";".join(filters + [concat])
cmd = [
"ffmpeg", "-y",
"-i", str(src2),
"-filter_complex", filter_complex,
"-map", "[outa]",
str(output_path)
]
code, out, err = run_cmd(cmd)
if code != 0:
return jsonify(ok=False, error=f"FFmpeg 失敗: {err[:4000]}")
response_data = {
'ok': True,
'output_url': f"/static/out/{output_path.name}",
'output_filename': output_path.name,
}
# 動画ファイルの場合、音声のみのバージョンも作成
if is_video2 and has_audio_in_video:
try:
audio_only_path = OUTPUT_DIR / "adjusted_audio.wav"
if audio_only_path.exists():
audio_only_path.unlink()
cmd_audio_extract = [
"ffmpeg", "-y", "-i", str(output_path),
"-vn", "-acodec", "pcm_s16le", "-ar", "44100",
str(audio_only_path)
]
code, out, err = run_cmd(cmd_audio_extract)
if code == 0:
response_data['output_audio_url'] = f"/static/out/adjusted_audio.wav"
except:
pass
return jsonify(response_data)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True, threaded=True)