import spaces from snac import SNAC import torch import gradio as gr from transformers import AutoModelForCausalLM, AutoTokenizer from huggingface_hub import snapshot_download import google.generativeai as genai import re import logging # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Check if CUDA is available device = "cuda" if torch.cuda.is_available() else "cpu" print("Loading SNAC model...") snac_model = SNAC.from_pretrained("hubertsiuzdak/snac_24khz") snac_model = snac_model.to(device) model_name = "canopylabs/orpheus-3b-0.1-ft" model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16) model.to(device) tokenizer = AutoTokenizer.from_pretrained(model_name) print(f"Orpheus model loaded to {device}") @spaces.GPU() def generate_podcast_script(api_key, content, uploaded_file, duration, num_hosts): try: genai.configure(api_key=api_key) model = genai.GenerativeModel('gemini-2.5-pro-preview-03-25') combined_content = content or "" if uploaded_file: file_content = uploaded_file.read().decode('utf-8') combined_content += "\n" + file_content if combined_content else file_content prompt = f""" Create a podcast script for {'one person' if num_hosts == 1 else 'two people'} discussing: {combined_content} Duration: {duration}. Include natural speech, humor, and occasional off-topic thoughts. Use speech fillers like um, ah. Vary emotional tone. Format: {'Monologue' if num_hosts == 1 else 'Alternating dialogue'} without speaker labels. Separate {'paragraphs' if num_hosts == 1 else 'lines'} with blank lines. Use emotion tags in angle brackets: , , , , , , , . Example: "I can't believe I stayed up all night only to find out the meeting was canceled ." Ensure content flows naturally and stays on topic. Match the script length to {duration}. """ response = model.generate_content(prompt) return re.sub(r'[^a-zA-Z0-9\s.,?!<>]', '', response.text) except Exception as e: logger.error(f"Error generating podcast script: {str(e)}") raise def process_prompt(prompt, voice, tokenizer, device): prompt = f"{voice}: {prompt}" input_ids = tokenizer(prompt, return_tensors="pt").input_ids start_token = torch.tensor([[128259]], dtype=torch.int64) end_tokens = torch.tensor([[128009, 128260]], dtype=torch.int64) modified_input_ids = torch.cat([start_token, input_ids, end_tokens], dim=1) attention_mask = torch.ones_like(modified_input_ids) return modified_input_ids.to(device), attention_mask.to(device) def parse_output(generated_ids): token_to_find = 128257 token_to_remove = 128258 token_indices = (generated_ids == token_to_find).nonzero(as_tuple=True) if len(token_indices[1]) > 0: last_occurrence_idx = token_indices[1][-1].item() cropped_tensor = generated_ids[:, last_occurrence_idx+1:] else: cropped_tensor = generated_ids processed_rows = [] for row in cropped_tensor: masked_row = row[row != token_to_remove] processed_rows.append(masked_row) code_lists = [] for row in processed_rows: row_length = row.size(0) new_length = (row_length // 7) * 7 trimmed_row = row[:new_length] trimmed_row = [t - 128266 for t in trimmed_row] code_lists.append(trimmed_row) return code_lists[0] @spaces.GPU() def generate_speech(text, voice, temperature, top_p, repetition_penalty, max_new_tokens, progress=gr.Progress()): if not text.strip(): return None try: progress(0.1, "Processing text...") input_ids, attention_mask = process_prompt(text, voice, tokenizer, device) progress(0.3, "Generating speech tokens...") with torch.no_grad(): generated_ids = model.generate( input_ids, attention_mask=attention_mask, do_sample=True, temperature=temperature, top_p=top_p, repetition_penalty=repetition_penalty, max_new_tokens=max_new_tokens, num_return_sequences=1, eos_token_id=128258, ) progress(0.6, "Processing speech tokens...") code_list = parse_output(generated_ids) progress(0.8, "Converting to audio...") audio_samples = redistribute_codes(code_list, snac_model) return (24000, audio_samples) # Return sample rate and audio except Exception as e: print(f"Error generating speech: {e}") return None # Create Gradio interface with gr.Blocks(title="AI Podcaster") as demo: with gr.Row(): with gr.Column(scale=1): gemini_api_key = gr.Textbox(label="Gemini API Key", type="password") content = gr.Textbox(label="Content", lines=5) uploaded_file = gr.File(label="Upload File") duration = gr.Slider(minimum=1, maximum=60, value=5, step=1, label="Duration (minutes)") num_hosts = gr.Radio(["1", "2"], label="Number of Hosts", value="1") generate_script_btn = gr.Button("Generate Podcast Script") with gr.Column(scale=2): script_output = gr.Textbox(label="Generated Script", lines=10) text_input = gr.Textbox(label="Text to speak", lines=5) voice = gr.Dropdown(choices=["Narrator", "Male", "Female"], value="Narrator", label="Voice") with gr.Row(): temperature = gr.Slider(minimum=0.1, maximum=1.0, value=0.7, step=0.1, label="Temperature") top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.9, step=0.1, label="Top P") with gr.Row(): repetition_penalty = gr.Slider(minimum=1.0, maximum=2.0, value=1.2, step=0.1, label="Repetition Penalty") max_new_tokens = gr.Slider(minimum=100, maximum=1000, value=500, step=50, label="Max New Tokens") submit_btn = gr.Button("Generate Speech") clear_btn = gr.Button("Clear") with gr.Column(scale=2): audio_output = gr.Audio(label="Generated Speech", type="numpy") # Set up event handlers generate_script_btn.click( fn=generate_podcast_script, inputs=[gemini_api_key, content, uploaded_file, duration, num_hosts], outputs=script_output ) submit_btn.click( fn=generate_speech, inputs=[text_input, voice, temperature, top_p, repetition_penalty, max_new_tokens], outputs=audio_output ) clear_btn.click( fn=lambda: (None, None), inputs=[], outputs=[text_input, audio_output] ) # Launch the app if __name__ == "__main__": demo.queue().launch(share=False, ssr_mode=False)