kevinwang676's picture
Upload folder using huggingface_hub
1b6bcbc verified
import argparse
import os
import re
import subprocess
import librosa
import numpy as np
import soundfile
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
def get_sub_dirs(source_dir):
sub_dir = [f for f in os.listdir(source_dir) if not f.startswith('.')]
sub_dir = [f for f in sub_dir if os.path.isdir(os.path.join(source_dir, f))]
return sub_dir
def is_sentence_ending(sentence):
if re.search(r'[。?!……]$', sentence):
return True
return False
def resample_audios(origin_dir, resample_dir, sample_rate):
print("start resample audios")
os.makedirs(resample_dir, exist_ok=True)
dirs = get_sub_dirs(origin_dir)
try:
subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True)
ffmpeg_installed = True
print("ffmpeg installed. use ffmpeg.")
except Exception as e:
ffmpeg_installed = False
print("ERROR! ffmpeg is not installed. use librosa.")
for dir in dirs:
source_dir = os.path.join(origin_dir, dir)
target_dir = os.path.join(resample_dir, dir)
os.makedirs(target_dir, exist_ok=True)
listdir = list(os.listdir(source_dir))
listdir_len = len(listdir)
for index, f in enumerate(listdir, start=1):
if f.endswith(".wav") or f.endswith(".mp3"):
file_path = os.path.join(source_dir, f)
target_path = os.path.join(target_dir, f)
target_path = os.path.splitext(target_path)[0] + '.wav'
if os.path.exists(target_path):
continue
if ffmpeg_installed:
process = subprocess.run(["ffmpeg", "-y", "-i", file_path, "-ar", f"{sample_rate}", "-ac", "1", "-v", "quiet", target_path])
else:
try:
print(f"{index}/{listdir_len} file")
data, sample_rate = librosa.load(file_path, sr=sample_rate, mono=True)
soundfile.write(target_path, data, sample_rate)
except Exception as e:
print(f"\n{file_path} convert fail.")
finally:
pass
def create_dataset(source_dir, target_dir, sample_rate, language, inference_pipeline, max_seconds):
# source_dir, target_dir, sample_rate=44100, language = "ZH", inference_pipeline = None
roles = get_sub_dirs(source_dir)
count = 0
result = []
for speaker_name in roles:
source_audios = [f for f in os.listdir(os.path.join(source_dir, speaker_name)) if f.endswith(".wav")]
source_audios = [os.path.join(source_dir, speaker_name, filename) for filename in source_audios]
slice_dir = os.path.join(target_dir, speaker_name)
os.makedirs(slice_dir, exist_ok=True)
for audio_path in source_audios:
rec_result = inference_pipeline(audio_in=audio_path) # dict_keys(['text', 'text_postprocessed', 'time_stamp', 'sentences'])
data, sample_rate = librosa.load(audio_path, sr=sample_rate, mono=True)
sentence_list = []
audio_list = []
time_length = 0
for sentence in rec_result['sentences']:
text = sentence['text'].strip()
if (text == ""):
continue
start = int((sentence['start'] / 1000) * sample_rate)
end = int((sentence['end'] / 1000) * sample_rate)
if time_length > 0 and time_length + ((sentence['end'] - sentence['start']) / 1000) > max_seconds:
sliced_audio_name = f"{str(count).zfill(6)}"
sliced_audio_path = os.path.join(slice_dir, sliced_audio_name+".wav")
s_sentence = "".join(sentence_list)
if not re.search(r"[。!?]$", s_sentence):
sentence_end = s_sentence[-1]
s_sentence = s_sentence[:-1] + '。' if sentence_end != '。' else s_sentence
audio_concat = np.concatenate(audio_list)
if time_length > max_seconds:
print(f"[too long voice]:{sliced_audio_path}, voice_length:{time_length} seconds")
soundfile.write(sliced_audio_path, audio_concat, sample_rate)
result.append(
f"{sliced_audio_path}|{speaker_name}|{language}|{s_sentence}"
)
sentence_list = []
audio_list = []
time_length = 0
count = count + 1
sentence_list.append(text)
audio_list.append(data[start:end])
time_length = time_length + ((sentence['end'] - sentence['start']) / 1000)
if ( is_sentence_ending(text) ):
sliced_audio_name = f"{str(count).zfill(6)}"
sliced_audio_path = os.path.join(slice_dir, sliced_audio_name+".wav")
s_sentence = "".join(sentence_list)
audio_concat = np.concatenate(audio_list)
soundfile.write(sliced_audio_path, audio_concat, sample_rate)
result.append(
f"{sliced_audio_path}|{speaker_name}|{language}|{s_sentence}"
)
sentence_list = []
audio_list = []
time_length = 0
count = count + 1
return result
def create_list(source_dir, target_dir, resample_dir, sample_rate, language, output_list, max_seconds):
resample_audios(source_dir, resample_dir, sample_rate)
inference_pipeline = pipeline(
task=Tasks.auto_speech_recognition,
model='damo/speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-pytorch',
model_revision="v1.2.4")
result = create_dataset(resample_dir, target_dir, sample_rate = sample_rate, language = language, inference_pipeline = inference_pipeline, max_seconds = max_seconds)
with open(output_list, "w", encoding="utf-8") as file:
for line in result:
try:
file.write(line.strip() + '\n')
except UnicodeEncodeError:
print("UnicodeEncodeError: Can't encode to ASCII:", line)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--source_dir", type=str, default="origin", help="Source directory path, Default: origin")
parser.add_argument("--target_dir", type=str, default="dataset", help="Target directory path, Default: dataset")
parser.add_argument("--resample_dir", type=str, default="origin_resample", help="Resample directory path, Default: origin_resample")
parser.add_argument("--sample_rate", type=int, default=44100, help="Sample rate, Default: 44100")
parser.add_argument("--language", type=str, default="ZH", help="Language, Default: ZH")
parser.add_argument("--output", type=str, default="demo.list", help="List file, Default: demo.list")
parser.add_argument("--max_seconds", type=int, default=15, help="Max sliced voice length(seconds), Default: 15")
args = parser.parse_args()
create_list(args.source_dir, args.target_dir, args.resample_dir, args.sample_rate, args.language, args.output, args.max_seconds)