Spaces:
No application file
No application file
File size: 2,887 Bytes
1b6bcbc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
import librosa
import os
import soundfile
import subprocess
from concurrent.futures import ThreadPoolExecutor
from .ext_files import get_files_by_ext
def ffmpeg_installed():
try:
subprocess.run(["ffmpeg", "-version"],
capture_output=True,
check=True)
print("find ffmpeg installed, use ffmpeg")
return True
except Exception as e:
print("ffmpeg not found, use librosa")
return False
def convert_wav_ffmpeg(source_file : str,
target_file : str,
sample_rate : int,
number : int):
os.makedirs(os.path.dirname(target_file), exist_ok=True)
print(f"file {number} start convert")
cmd = ["ffmpeg", "-y", "-i", source_file, "-ar", f"{sample_rate}", "-ac", "1", "-v", "quiet", target_file]
subprocess.run(cmd)
def convert_wav_librosa(source_file : str,
target_file : str,
sample_rate : int,
number : int):
os.makedirs(os.path.dirname(target_file), exist_ok=True)
print(f"file {number} start convert")
data, sample_rate = librosa.load(source_file,
sr=sample_rate,
mono=True)
soundfile.write(target_file, data, sample_rate)
def convert_files(source_dir : str,
target_dir : str,
sample_rate : int,
max_threads = None,
force_librosa = False):
if max_threads == None:
max_threads = os.cpu_count()
ext_files = get_files_by_ext(source_dir, [".mp3","acc","wav"])
ffmpeg_installed_flag = (not force_librosa) and ffmpeg_installed()
os.makedirs(target_dir, exist_ok=True)
with ThreadPoolExecutor(max_workers=max_threads) as executor:
print(f"files count: {len(ext_files)}")
print(f"max_threads = {max_threads}")
for number, file in enumerate(ext_files, start=1):
source_path = os.path.join(source_dir, file)
target_path = os.path.join(target_dir, os.path.splitext(file)[0] + '.wav')
os.makedirs(os.path.dirname(target_path), exist_ok=True)
if not os.path.exists(target_path):
if ffmpeg_installed_flag:
executor.submit(convert_wav_ffmpeg,
source_path,
target_path,
sample_rate,
number)
else:
executor.submit(convert_wav_librosa,
source_path,
target_path,
sample_rate,
number) |