Spaces:
Sleeping
Sleeping
import os | |
import json | |
import faiss | |
from pathlib import Path | |
from git import Repo | |
from huggingface_hub import snapshot_download | |
from sentence_transformers import SentenceTransformer | |
from openai import OpenAI | |
import gradio as gr | |
from openai import AzureOpenAI | |
# βββ Configuration βββ | |
REPO_URL = "https://github.com/dotnet/xharness.git" | |
REPO_LOCAL_DIR = Path("artifacts/repo_code") | |
HF_REPO_ID = "kotlarmilos/repository-learning" | |
HF_BASE_DIR = Path("artifacts/repo_hf") | |
HF_INDEX_DIR = HF_BASE_DIR / "dotnet-xharness" / "index" | |
METADATA_PATH = HF_INDEX_DIR / "metadata.json" | |
ROOT_DIR = REPO_LOCAL_DIR # where your repo code lives | |
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT") | |
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY") | |
API_VERSION = "2024-12-01-preview" | |
EMBEDDER_MODEL = "sentence-transformers/all-MiniLM-L6-v2" | |
OPENAI_MODEL = "gpt-4o-mini" | |
TOP_K = 5 | |
# βββ Step 1: Acquire code and artifacts βββ | |
# Clone or pull the GitHub repo | |
if REPO_LOCAL_DIR.exists() and (REPO_LOCAL_DIR / ".git").exists(): | |
Repo(REPO_LOCAL_DIR).remotes.origin.pull() | |
else: | |
Repo.clone_from(REPO_URL, REPO_LOCAL_DIR) | |
# Download Hugging Face snapshots for index & metadata | |
snapshot_download( | |
repo_id=HF_REPO_ID, | |
local_dir=str(HF_BASE_DIR), | |
local_dir_use_symlinks=False, | |
token=os.getenv("HUGGINGFACE_HUB_TOKEN"), | |
) | |
# βββ Step 2: Load FAISS index & metadata βββ | |
index = faiss.read_index(str(HF_INDEX_DIR / "index.faiss")) | |
with open(METADATA_PATH, "r", encoding="utf-8") as f: | |
metadata = json.load(f) | |
# βββ Step 3: Prepare embedder & OpenAI client βββ | |
embedder = SentenceTransformer(EMBEDDER_MODEL) | |
openai = AzureOpenAI( | |
api_version=API_VERSION, | |
azure_endpoint=AZURE_OPENAI_ENDPOINT, | |
api_key=AZURE_OPENAI_API_KEY, | |
) | |
# βββ Helper: load code snippets by FAISS ID βββ | |
def load_snippets_from_metadata(ids, metadata, root_dir): | |
snippets = [] | |
for idx in ids: | |
entry = metadata[idx] | |
file_rel = entry["file"] | |
start, end = entry["start_line"], entry["end_line"] | |
file_path = Path(root_dir) / file_rel | |
try: | |
lines = file_path.read_text(encoding="utf-8").splitlines() | |
code = "\n".join(lines[start-1 : end]).rstrip() | |
except FileNotFoundError: | |
code = f"# ERROR: {file_rel} not found" | |
snippets.append({ | |
"file": file_rel, | |
"lines": (start, end), | |
"code": code, | |
"description": entry.get("llm_description", "") | |
}) | |
return snippets | |
# βββ Core: embed question, retrieve, and call OpenAI βββ | |
def answer_from_index(question: str, top_k: int = TOP_K) -> str: | |
# 1) Encode question | |
q_emb = embedder.encode([question]) | |
# 2) Search FAISS | |
_, indices = index.search(q_emb, top_k) | |
ids = indices[0].tolist() | |
# 3) Load code snippets | |
snippets = load_snippets_from_metadata(ids, metadata, ROOT_DIR) | |
# 4) Build context block | |
context_parts = [] | |
for snip in snippets: | |
context_parts.append( | |
f"File: {snip['file']} (lines {snip['lines'][0]}β{snip['lines'][1]})\n" | |
"```python\n" | |
f"{snip['code']}\n" | |
"```\n" | |
f"Description: {snip['description']}" | |
) | |
context_block = "\n\n".join(context_parts) | |
# 5) Prompt OpenAI | |
prompt = ( | |
"You are a code assistant. Use the following code snippets to answer the user's question.\n\n" | |
f"{context_block}\n\n" | |
"Question:\n" f"{question}\n\n" | |
"Answer:" | |
) | |
resp = openai.chat.completions.create( | |
model=OPENAI_MODEL, | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.2, | |
max_tokens=512 | |
) | |
return resp.choices[0].message.content.strip() | |
def rewrite_followup(history: list[tuple[str,str]], followup: str) -> str: | |
# history is list of (user,assistant) pairs | |
convo = "\n".join( | |
f"User: {u}\nAssistant: {a}" for u,a in history[-4:] | |
) | |
prompt = ( | |
"Given the conversation below, rewrite the final user query into " | |
"a standalone question.\n\n" | |
f"{convo}\nUser: {followup}\n\nStandalone question:" | |
) | |
resp = openai.chat.completions.create( | |
model=OPENAI_MODEL, | |
messages=[{"role":"user","content":prompt}], | |
temperature=0, | |
max_tokens=128 | |
) | |
return resp.choices[0].message.content.strip() | |
def respond( | |
message, | |
history: list[tuple[str, str]], | |
system_message, | |
max_tokens, | |
temperature, | |
top_p, | |
): | |
# Use the existing answer_from_index function to get the response | |
standalone = rewrite_followup(history, message) | |
response = answer_from_index(standalone) | |
yield response | |
# βββ Gradio interface βββ | |
def on_submit(question: str) -> str: | |
return answer_from_index(question) | |
""" | |
For information on how to customize the ChatInterface, peruse the gradio docs: https://www.gradio.app/docs/chatinterface | |
""" | |
demo = gr.ChatInterface( | |
respond, | |
additional_inputs=[ | |
gr.Textbox(value="You are a friendly Chatbot.", label="System message"), | |
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), | |
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), | |
gr.Slider( | |
minimum=0.1, | |
maximum=1.0, | |
value=0.95, | |
step=0.05, | |
label="Top-p (nucleus sampling)", | |
), | |
], | |
) | |
if __name__ == "__main__": | |
demo.launch() | |