Spaces:
Sleeping
Sleeping
import gradio as gr | |
import tempfile | |
import imageio | |
import torch | |
import time | |
import os | |
from openai import OpenAI | |
from transformers import pipeline | |
from diffusers import DiffusionPipeline | |
# ---------- Load OpenAI Key from Hugging Face Secret ---------- | |
client = OpenAI(api_key=os.getenv("OPENAI_KEY")) | |
# ---------- Configuration ---------- | |
AVAILABLE_MODELS = { | |
"Codette Fine-Tuned (v9)": "ft:gpt-4.1-2025-04-14:raiffs-bits:codette-final:BO907H7Z", | |
"GPT-2 (small, fast)": "gpt2", | |
"Falcon (TII UAE)": "tiiuae/falcon-7b-instruct", | |
"Mistral (OpenAccess)": "mistralai/Mistral-7B-v0.1" | |
} | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
text_model_cache = {} | |
chat_memory = {} | |
last_usage_time = {} | |
MAX_PROMPTS_PER_SESSION = 5 | |
THROTTLE_SECONDS = 30 | |
# ---------- Load Image Generator ---------- | |
try: | |
image_generator = DiffusionPipeline.from_pretrained( | |
"runwayml/stable-diffusion-v1-5", | |
safety_checker=None, | |
torch_dtype=torch.float16 if device == "cuda" else torch.float32 | |
) | |
image_generator.to(device) | |
image_enabled = True | |
except Exception as e: | |
print(f"[Image Model Load Error]: {e}") | |
image_generator = None | |
image_enabled = False | |
# ---------- Load Video Generator ---------- | |
try: | |
video_pipeline = DiffusionPipeline.from_pretrained( | |
"damo-vilab/text-to-video-ms-1.7b", | |
safety_checker=None, | |
torch_dtype=torch.float16 if device == "cuda" else torch.float32 | |
) | |
video_pipeline.to(device) | |
video_enabled = True | |
except Exception as e: | |
print(f"[Video Model Load Error]: {e}") | |
video_pipeline = None | |
video_enabled = False | |
# ---------- Main Terminal with Rate Limits ---------- | |
def codette_terminal_limited(prompt, model_name, generate_image, generate_video, | |
session_id, batch_size, video_steps, fps): | |
if session_id not in chat_memory: | |
chat_memory[session_id] = [] | |
if prompt.lower() in ["exit", "quit"]: | |
chat_memory[session_id] = [] | |
yield "π§ Codette signing off... Session reset.", None, None | |
return | |
if model_name == "Codette Fine-Tuned (v9)": | |
count = sum(1 for line in chat_memory[session_id] if line.startswith("ποΈ You >")) | |
if count >= MAX_PROMPTS_PER_SESSION: | |
yield "[π Limit] Max 5 prompts per session.", None, None | |
return | |
now = time.time() | |
if now - last_usage_time.get(session_id, 0) < THROTTLE_SECONDS: | |
wait = int(THROTTLE_SECONDS - (now - last_usage_time[session_id])) | |
yield f"[β³ Wait] Try again in {wait} sec.", None, None | |
return | |
last_usage_time[session_id] = now | |
if model_name == "Codette Fine-Tuned (v9)": | |
try: | |
response = client.chat.completions.create( | |
model=AVAILABLE_MODELS[model_name], | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.7, | |
max_tokens=256 | |
) | |
output = response.choices[0].message.content.strip() | |
except Exception as e: | |
yield f"[OpenAI error]: {e}", None, None | |
return | |
else: | |
if model_name not in text_model_cache: | |
try: | |
text_model_cache[model_name] = pipeline( | |
"text-generation", | |
model=AVAILABLE_MODELS[model_name], | |
device=0 if device == "cuda" else -1 | |
) | |
except Exception as e: | |
yield f"[Text model error]: {e}", None, None | |
return | |
try: | |
output = text_model_cache[model_name]( | |
prompt, max_length=100, do_sample=True, num_return_sequences=1 | |
)[0]['generated_text'].strip() | |
except Exception as e: | |
yield f"[Generation error]: {e}", None, None | |
return | |
# Stream text output | |
response_so_far = "" | |
for char in output: | |
response_so_far += char | |
temp_log = chat_memory[session_id][:] | |
temp_log.append(f"ποΈ You > {prompt}") | |
temp_log.append(f"π§ Codette > {response_so_far}") | |
yield "\n".join(temp_log[-10:]), None, None | |
time.sleep(0.01) | |
chat_memory[session_id].append(f"ποΈ You > {prompt}") | |
chat_memory[session_id].append(f"π§ Codette > {output}") | |
imgs, vid = None, None | |
if generate_image and image_enabled: | |
try: | |
result = image_generator(prompt, num_images_per_prompt=batch_size) | |
imgs = result.images | |
except Exception as e: | |
response_so_far += f"\n[Image error]: {e}" | |
if generate_video and video_enabled: | |
try: | |
result = video_pipeline(prompt, num_inference_steps=video_steps) | |
frames = result.frames | |
temp_video_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name | |
imageio.mimsave(temp_video_path, frames, fps=fps) | |
vid = temp_video_path | |
except Exception as e: | |
response_so_far += f"\n[Video error]: {e}" | |
yield "\n".join(chat_memory[session_id][-10:]), imgs, vid | |
# ---------- Gradio UI ---------- | |
with gr.Blocks(title="𧬠Codette Terminal β Streamed AI Chat") as demo: | |
gr.Markdown("## 𧬠Codette Terminal (Chat + Image + Video + Fine-Tuned AI)") | |
gr.Markdown("Type a prompt, choose a model, and generate responses. Type `'exit'` to reset the session.") | |
with gr.Row(): | |
session_id = gr.Textbox(value="session_default", visible=False) | |
model_dropdown = gr.Dropdown(choices=list(AVAILABLE_MODELS.keys()), value="GPT-2 (small, fast)", label="Language Model") | |
with gr.Row(): | |
generate_image_toggle = gr.Checkbox(label="Generate Image(s)?", value=False, interactive=image_enabled) | |
generate_video_toggle = gr.Checkbox(label="Generate Video?", value=False, interactive=video_enabled) | |
with gr.Row(): | |
batch_size_slider = gr.Slider(label="Number of Images", minimum=1, maximum=4, step=1, value=1) | |
video_steps_slider = gr.Slider(label="Video Inference Steps", minimum=10, maximum=100, step=10, value=50) | |
fps_slider = gr.Slider(label="Video FPS", minimum=4, maximum=24, step=2, value=8) | |
with gr.Row(): | |
user_input = gr.Textbox( | |
label="Your Prompt", | |
placeholder="e.g. A robot dreaming on Mars", | |
lines=1 | |
) | |
with gr.Row(): | |
output_text = gr.Textbox(label="Codette Output", lines=15, interactive=False) | |
with gr.Row(): | |
output_image = gr.Gallery(label="Generated Image(s)", columns=2) | |
output_video = gr.Video(label="Generated Video") | |
user_input.submit( | |
codette_terminal_limited, | |
inputs=[ | |
user_input, model_dropdown, generate_image_toggle, generate_video_toggle, | |
session_id, batch_size_slider, video_steps_slider, fps_slider | |
], | |
outputs=[output_text, output_image, output_video] | |
) | |
# ---------- Launch ---------- | |
if __name__ == "__main__": | |
demo.launch(mcp_server=True) | |