import argparse import os import re import subprocess import librosa import numpy as np import soundfile from modelscope.pipelines import pipeline from modelscope.utils.constant import Tasks def get_sub_dirs(source_dir): sub_dir = [f for f in os.listdir(source_dir) if not f.startswith('.')] sub_dir = [f for f in sub_dir if os.path.isdir(os.path.join(source_dir, f))] return sub_dir def is_sentence_ending(sentence): if re.search(r'[。?!……]$', sentence): return True return False def resample_audios(origin_dir, resample_dir, sample_rate): print("start resample audios") os.makedirs(resample_dir, exist_ok=True) dirs = get_sub_dirs(origin_dir) try: subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True) ffmpeg_installed = True print("ffmpeg installed. use ffmpeg.") except Exception as e: ffmpeg_installed = False print("ERROR! ffmpeg is not installed. use librosa.") for dir in dirs: source_dir = os.path.join(origin_dir, dir) target_dir = os.path.join(resample_dir, dir) os.makedirs(target_dir, exist_ok=True) listdir = list(os.listdir(source_dir)) listdir_len = len(listdir) for index, f in enumerate(listdir, start=1): if f.endswith(".wav") or f.endswith(".mp3"): file_path = os.path.join(source_dir, f) target_path = os.path.join(target_dir, f) target_path = os.path.splitext(target_path)[0] + '.wav' if os.path.exists(target_path): continue if ffmpeg_installed: process = subprocess.run(["ffmpeg", "-y", "-i", file_path, "-ar", f"{sample_rate}", "-ac", "1", "-v", "quiet", target_path]) else: try: print(f"{index}/{listdir_len} file") data, sample_rate = librosa.load(file_path, sr=sample_rate, mono=True) soundfile.write(target_path, data, sample_rate) except Exception as e: print(f"\n{file_path} convert fail.") finally: pass def create_dataset(source_dir, target_dir, sample_rate, language, inference_pipeline, max_seconds): # source_dir, target_dir, sample_rate=44100, language = "ZH", inference_pipeline = None roles = get_sub_dirs(source_dir) count = 0 result = [] for speaker_name in roles: source_audios = [f for f in os.listdir(os.path.join(source_dir, speaker_name)) if f.endswith(".wav")] source_audios = [os.path.join(source_dir, speaker_name, filename) for filename in source_audios] slice_dir = os.path.join(target_dir, speaker_name) os.makedirs(slice_dir, exist_ok=True) for audio_path in source_audios: rec_result = inference_pipeline(audio_in=audio_path) # dict_keys(['text', 'text_postprocessed', 'time_stamp', 'sentences']) data, sample_rate = librosa.load(audio_path, sr=sample_rate, mono=True) sentence_list = [] audio_list = [] time_length = 0 for sentence in rec_result['sentences']: text = sentence['text'].strip() if (text == ""): continue start = int((sentence['start'] / 1000) * sample_rate) end = int((sentence['end'] / 1000) * sample_rate) if time_length > 0 and time_length + ((sentence['end'] - sentence['start']) / 1000) > max_seconds: sliced_audio_name = f"{str(count).zfill(6)}" sliced_audio_path = os.path.join(slice_dir, sliced_audio_name+".wav") s_sentence = "".join(sentence_list) if not re.search(r"[。!?]$", s_sentence): sentence_end = s_sentence[-1] s_sentence = s_sentence[:-1] + '。' if sentence_end != '。' else s_sentence audio_concat = np.concatenate(audio_list) if time_length > max_seconds: print(f"[too long voice]:{sliced_audio_path}, voice_length:{time_length} seconds") soundfile.write(sliced_audio_path, audio_concat, sample_rate) result.append( f"{sliced_audio_path}|{speaker_name}|{language}|{s_sentence}" ) sentence_list = [] audio_list = [] time_length = 0 count = count + 1 sentence_list.append(text) audio_list.append(data[start:end]) time_length = time_length + ((sentence['end'] - sentence['start']) / 1000) if ( is_sentence_ending(text) ): sliced_audio_name = f"{str(count).zfill(6)}" sliced_audio_path = os.path.join(slice_dir, sliced_audio_name+".wav") s_sentence = "".join(sentence_list) audio_concat = np.concatenate(audio_list) soundfile.write(sliced_audio_path, audio_concat, sample_rate) result.append( f"{sliced_audio_path}|{speaker_name}|{language}|{s_sentence}" ) sentence_list = [] audio_list = [] time_length = 0 count = count + 1 return result def create_list(source_dir, target_dir, resample_dir, sample_rate, language, output_list, max_seconds): resample_audios(source_dir, resample_dir, sample_rate) inference_pipeline = pipeline( task=Tasks.auto_speech_recognition, model='damo/speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-pytorch', model_revision="v1.2.4") result = create_dataset(resample_dir, target_dir, sample_rate = sample_rate, language = language, inference_pipeline = inference_pipeline, max_seconds = max_seconds) with open(output_list, "w", encoding="utf-8") as file: for line in result: try: file.write(line.strip() + '\n') except UnicodeEncodeError: print("UnicodeEncodeError: Can't encode to ASCII:", line) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--source_dir", type=str, default="origin", help="Source directory path, Default: origin") parser.add_argument("--target_dir", type=str, default="dataset", help="Target directory path, Default: dataset") parser.add_argument("--resample_dir", type=str, default="origin_resample", help="Resample directory path, Default: origin_resample") parser.add_argument("--sample_rate", type=int, default=44100, help="Sample rate, Default: 44100") parser.add_argument("--language", type=str, default="ZH", help="Language, Default: ZH") parser.add_argument("--output", type=str, default="demo.list", help="List file, Default: demo.list") parser.add_argument("--max_seconds", type=int, default=15, help="Max sliced voice length(seconds), Default: 15") args = parser.parse_args() create_list(args.source_dir, args.target_dir, args.resample_dir, args.sample_rate, args.language, args.output, args.max_seconds)