#!/usr/bin/python3 # -*- coding: utf-8 -*- import argparse import json import os import re from pathlib import Path import sys pwd = os.path.abspath(os.path.dirname(__file__)) sys.path.append(os.path.join(pwd, "../../")) from google import genai from google.genai import types from project_settings import environment, project_path def get_args(): parser = argparse.ArgumentParser() parser.add_argument( "--google_application_credentials", default=(project_path / "dotenv/potent-veld-462405-t3-8091a29b2894.json").as_posix(), type=str ) parser.add_argument( "--model_name", default="gemini-2.5-pro", type=str ) parser.add_argument( "--speech_audio_dir", default=r"D:\Users\tianx\HuggingDatasets\nx_noise\data\speech\nx-speech\en-SG\2025-06-17", type=str ) parser.add_argument( "--output_file", # default=r"D:\Users\tianx\HuggingDatasets\nx_noise\data\noise\nx-noise\en-SG\2025-06-17\vad.jsonl", default=r"vad.jsonl", type=str ) parser.add_argument( "--gemini_api_key", default=environment.get("GEMINI_API_KEY", dtype=str), type=str ) args = parser.parse_args() return args def main(): args = get_args() speech_audio_dir = Path(args.speech_audio_dir) output_file = Path(args.output_file) os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = args.google_application_credentials os.environ["gemini_api_key"] = args.gemini_api_key developer_client = genai.Client( api_key=args.gemini_api_key, ) client = genai.Client( vertexai=True, project="potent-veld-462405-t3", location="global", ) generate_content_config = types.GenerateContentConfig( temperature=1, top_p=0.95, max_output_tokens=8192, response_modalities=["TEXT"], ) # finished finished_set = set() if output_file.exists(): with open(output_file.as_posix(), "r", encoding="utf-8") as f: for row in f: row = json.loads(row) name = row["name"] finished_set.add(name) print(f"finished count: {len(finished_set)}") with open(output_file.as_posix(), "a+", encoding="utf-8") as f: for filename in speech_audio_dir.glob("**/*.wav"): name = filename.name if name in finished_set: continue finished_set.add(name) # upload audio_file = developer_client.files.upload( file=filename.as_posix(), config=None ) print(f"upload file: {audio_file.name}") prompt = f""" 给我这段音频中的语音分段的开始和结束时间,单位为秒,精确到毫秒,并输出JSON格式, 例如: ```json [[0.254, 1.214], [2.200, 3.100]], ``` 如果没有语音段则输出: ```json [] ``` """.strip() try: contents = [ types.Content( role="user", parts=[ types.Part(text=prompt), types.Part.from_uri( file_uri=audio_file.uri, mime_type=audio_file.mime_type, ) ] ) ] response: types.GenerateContentResponse = developer_client.models.generate_content( model=args.model_name, contents=contents, config=generate_content_config, ) answer = response.candidates[0].content.parts[0].text print(answer) finally: # delete print(f"delete file: {audio_file.name}") developer_client.files.delete(name=audio_file.name) pattern = "```json(.+?)```" match = re.search(pattern=pattern, string=answer, flags=re.DOTALL | re.IGNORECASE) if match is None: raise AssertionError(f"answer: {answer}") vad_segments = match.group(1) vad_segments = json.loads(vad_segments) row = { "name": name, "filename": filename.as_posix(), "vad_segments": vad_segments } row = json.dumps(row, ensure_ascii=False) f.write(f"{row}\n") exit(0) return if __name__ == "__main__": main()