Spaces:
Running
Running
File size: 5,063 Bytes
52f3c67 a6c3a18 8396721 a6c3a18 8396721 a6c3a18 8396721 a6c3a18 8396721 a6c3a18 8396721 a6c3a18 8396721 a6c3a18 69afb83 a6c3a18 ad9e788 52f3c67 7f095bd e48f73e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
#!/usr/bin/env python
# coding=utf-8
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mimetypes
import os
import re
from typing import Optional
import gradio as gr
from smolagents.agent_types import AgentAudio, AgentImage, AgentText, handle_agent_output_types
from smolagents.agents import ActionStep
from smolagents.memory import MemoryStep
from smolagents.utils import _is_package_available
class GradioUI:
def __init__(self, agent, file_upload_folder: str = "./uploads"):
self.agent = agent
self.file_upload_folder = file_upload_folder
if not os.path.exists(self.file_upload_folder):
os.makedirs(self.file_upload_folder)
def interact_with_agent(self, prompt, messages):
messages.append(gr.ChatMessage(role="user", content=prompt))
yield messages
for msg in stream_to_gradio(self.agent, task=prompt, reset_agent_memory=False):
messages.append(msg)
yield messages
yield messages
def upload_file(
self,
file,
file_uploads_log,
allowed_file_types=[
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"text/plain",
],
):
if file is None:
return gr.Textbox("No file uploaded", visible=True), file_uploads_log
try:
mime_type, _ = mimetypes.guess_type(file.name)
except Exception as e:
return gr.Textbox(f"Error: {e}", visible=True), file_uploads_log
if mime_type not in allowed_file_types:
return gr.Textbox("File type disallowed", visible=True), file_uploads_log
original_name = os.path.basename(file.name)
sanitized_name = re.sub(r"[^\w\-.]", "_", original_name)
type_to_ext = {}
for ext, t in mimetypes.types_map.items():
if t not in type_to_ext:
type_to_ext[t] = ext
name_without_ext = ".".join(sanitized_name.split(".")[:-1])
ext = type_to_ext.get(mime_type, "")
if not ext.startswith("."):
ext = "." + ext if ext else ""
sanitized_name = f"{name_without_ext}{ext}"
file_path = os.path.join(self.file_upload_folder, sanitized_name)
with open(file_path, "wb") as f:
f.write(file.read())
file_uploads_log.append(file_path)
return gr.Textbox(f"File uploaded: {sanitized_name}", visible=True), file_uploads_log
def build_ui(self):
chatbot = gr.Chatbot()
msg = gr.Textbox(placeholder="Ask something...", label="Your message")
clear = gr.Button("Clear")
file_uploads_log = []
with gr.Blocks() as demo:
with gr.Row():
with gr.Column():
chatbot.render()
msg.render()
clear.render()
msg.submit(self.interact_with_agent, [msg, chatbot], chatbot)
clear.click(lambda: None, None, chatbot, queue=False)
return demo
def pull_messages_from_step(step_log: MemoryStep):
if isinstance(step_log, ActionStep):
step_number = f"Step {step_log.step_number}" if step_log.step_number is not None else ""
yield gr.ChatMessage(role="assistant", content=f"**{step_number}**")
if hasattr(step_log, "model_output") and step_log.model_output is not None:
model_output = step_log.model_output.strip()
model_output = re.sub(r"```\s*<end_code>", "```", model_output)
model_output = re.sub(r"<end_code>\s*```", "```", model_output)
model_output = re.sub(r"```\s*\n\s*<end_code>", "```", model_output)
model_output = model_output.strip()
yield gr.ChatMessage(role="assistant", content=model_output)
if hasattr(step_log, "tool_calls") and step_log.tool_calls is not None:
first_tool_call = step_log.tool_calls[0]
used_code = first_tool_call.name == "python_interpreter"
parent_id = f"call_{len(step_log.tool_calls)}"
args = first_tool_call.arguments
content = str(args.get("answer", str(args))) if isinstance(args, dict) else str(args).strip()
if used_code:
content = re.sub(r"```.*?\n", "", content)
content = re.sub(r"\s*<end_code>\s*", "", content).strip()
if not content.startswith("```python"):
content = f"```python\n{content}\n```"
|