Spaces:
Runtime error
Runtime error
import streamlit as st | |
import os | |
from pinecone import Pinecone | |
from sentence_transformers import SentenceTransformer | |
from typing import List, Dict | |
import re # For parsing timestamp and extracting video ID | |
import streamlit.components.v1 as components # For embedding HTML | |
from openai import OpenAI # Import OpenAI library | |
import logging | |
# Setup logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# --- Helper Functions (Existing: parse_timestamp_to_seconds, get_youtube_video_id, add_timestamp_to_youtube_url, generate_youtube_embed_html) --- | |
def parse_timestamp_to_seconds(timestamp: str) -> int | None: | |
"""HH:MM:SS 또는 HH:MM:SS.ms 형식의 타임스탬프를 초 단위로 변환합니다.""" | |
if not isinstance(timestamp, str): | |
return None | |
# Remove milliseconds part if present | |
timestamp_no_ms = timestamp.split('.')[0] | |
parts = timestamp_no_ms.split(':') | |
try: | |
if len(parts) == 3: | |
h, m, s = map(int, parts) | |
return h * 3600 + m * 60 + s | |
elif len(parts) == 2: | |
m, s = map(int, parts) | |
return m * 60 + s | |
elif len(parts) == 1: | |
return int(parts[0]) | |
else: | |
return None | |
except ValueError: | |
return None | |
def get_youtube_video_id(url: str) -> str | None: | |
"""YouTube URL에서 비디오 ID를 추출합니다.""" | |
if not isinstance(url, str): | |
return None | |
# Standard YouTube URLs (youtube.com/watch?v=...), shortened URLs (youtu.be/...), etc. | |
match = re.search(r"(?:v=|/|youtu\.be/|embed/|shorts/)([0-9A-Za-z_-]{11})", url) | |
return match.group(1) if match else None | |
def add_timestamp_to_youtube_url(youtube_url: str, timestamp: str) -> str: | |
"""YouTube URL에 타임스탬프를 추가합니다.""" | |
seconds = parse_timestamp_to_seconds(timestamp) | |
if seconds is None or not youtube_url: | |
return youtube_url # Return original URL if timestamp is invalid or URL is empty | |
separator = '&' if '?' in youtube_url else '?' | |
# Remove existing t= parameter if present | |
cleaned_url = re.sub(r'[?&]t=\d+s?', '', youtube_url) | |
separator = '&' if '?' in cleaned_url else '?' # Re-check separator after cleaning | |
return f"{cleaned_url}{separator}t={seconds}s" | |
def generate_youtube_embed_html(youtube_url: str, timestamp: str) -> str | None: | |
"""타임스탬프가 적용된 YouTube 임베드 HTML 코드를 생성합니다. 가로 800px 고정, 세로 자동 조절.""" | |
video_id = get_youtube_video_id(youtube_url) | |
start_seconds = parse_timestamp_to_seconds(timestamp) | |
if not video_id: | |
logger.warning(f"Could not extract video ID from URL: {youtube_url}") | |
return None # Cannot generate embed code without video ID | |
start_param = f"start={start_seconds}" if start_seconds is not None else "" | |
# Use aspect ratio approach with fixed width 800px | |
return f''' | |
<div style="position: relative; width: 800px; padding-bottom: 450px; /* 800px * 9 / 16 = 450px */ height: 0; overflow: hidden;"> | |
<iframe | |
src="https://www.youtube.com/embed/{video_id}?{start_param}&autoplay=0&rel=0" | |
frameborder="0" | |
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" | |
referrerpolicy="strict-origin-when-cross-origin" | |
allowfullscreen | |
style="position: absolute; top: 0; left: 0; width: 100%; height: 100%;"> | |
</iframe> | |
</div> | |
''' | |
# --- 설정 --- | |
# Pinecone 설정 | |
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY","pcsk_PZHLK_TRAvMCyNmJM4FKGCX7rbbY22a58fhnWYasx1mf3WL6sRasoASZXfsbnJYvCQ13w") # Load from environment variable | |
PINECONE_ENV = os.getenv("PINECONE_ENV", "us-east-1") | |
INDEX_NAME = "video-embeddings" | |
EMBEDDING_MODEL = "jhgan/ko-sroberta-multitask" | |
# OpenAI 설정 | |
OPENAI_API_KEY = "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" | |
# --- 리소스 로딩 (캐싱 활용) --- | |
def init_pinecone(): | |
"""Pinecone 클라이언트를 초기화합니다.""" | |
api_key = PINECONE_API_KEY | |
if not api_key: | |
st.error("Pinecone API 키가 설정되지 않았습니다. 환경 변수를 확인하세요.") | |
st.stop() | |
try: | |
pc = Pinecone(api_key=api_key) | |
logger.info("Successfully connected to Pinecone.") | |
return pc | |
except Exception as e: | |
st.error(f"Pinecone 초기화 중 오류 발생: {e}") | |
st.stop() | |
def load_embedding_model(): | |
"""Sentence Transformer 모델을 로드합니다.""" | |
try: | |
model = SentenceTransformer(EMBEDDING_MODEL) | |
logger.info(f"Successfully loaded embedding model: {EMBEDDING_MODEL}") | |
return model | |
except Exception as e: | |
st.error(f"임베딩 모델 로딩 중 오류 발생: {e}") | |
st.stop() | |
def get_pinecone_index(_pc: Pinecone, index_name: str): | |
"""Pinecone 인덱스 객체를 가져옵니다.""" | |
try: | |
index = _pc.Index(index_name) | |
# Optionally, do a quick check like index.describe_index_stats() to confirm connection | |
stats = index.describe_index_stats() | |
logger.info(f"Successfully connected to Pinecone index '{index_name}'. Stats: {stats.get('total_vector_count', 'N/A')} vectors") | |
return index | |
except Exception as e: | |
st.error(f"Pinecone 인덱스 '{index_name}' 연결 중 오류 발생: {e}. 인덱스가 존재하고 활성 상태인지 확인하세요.") | |
st.stop() | |
def init_openai_client(): | |
"""OpenAI 클라이언트를 초기화합니다.""" | |
if not OPENAI_API_KEY: | |
st.error("OpenAI API 키가 설정되지 않았습니다. 환경 변수를 확인하세요.") | |
st.stop() | |
try: | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
# Test connection (optional, but recommended) | |
client.models.list() | |
logger.info("Successfully connected to OpenAI.") | |
return client | |
except Exception as e: | |
st.error(f"OpenAI 클라이언트 초기화 또는 연결 테스트 중 오류 발생: {e}") | |
st.stop() | |
# --- 검색 함수 --- | |
def search(query: str, top_k: int = 5, _index=None, _model=None) -> List[Dict]: | |
"""Pinecone 인덱스에서 검색을 수행하고 title과 original_text를 포함합니다.""" | |
if not query or _index is None or _model is None: | |
return [] | |
try: | |
query_vec = _model.encode(query, convert_to_numpy=True).tolist() | |
result = _index.query(vector=query_vec, top_k=top_k, include_metadata=True) | |
matches = result.get("matches", []) | |
search_results = [] | |
for m in matches: | |
metadata = m.get("metadata", {}) | |
search_results.append({ | |
"URL": metadata.get("url", "N/A"), | |
"타임스탬프": metadata.get("timestamp", "N/A"), | |
"타입": metadata.get("type", "N/A"), | |
"제목": metadata.get("title", "N/A"), # 제목 추가 | |
"요약": metadata.get("summary", "N/A"), | |
"원본텍스트": metadata.get("original_text", "N/A"), # 컨텍스트로 활용할 원본 텍스트 | |
"점수": m.get("score", 0.0) | |
}) | |
logger.info(f"Pinecone search returned {len(search_results)} results for query: '{query[:50]}...'") | |
return search_results | |
except Exception as e: | |
st.error(f"Pinecone 검색 중 오류 발생: {e}") | |
logger.error(f"Error during Pinecone search: {e}", exc_info=True) | |
return [] | |
# --- OpenAI 답변 생성 함수 --- | |
def generate_khan_answer(query: str, search_results: List[Dict], client: OpenAI) -> str: | |
"""사용자 질문과 검색 결과를 바탕으로 Khan 페르소나 답변을 생성합니다.""" | |
if not search_results: | |
# Return a persona-consistent message even when no results are found | |
return "현재 질문에 대해 참고할 만한 관련 영상을 찾지 못했습니다. 질문을 조금 더 명확하게 해주시거나 다른 방식으로 질문해주시면 도움이 될 것 같습니다." | |
# Build context string for OpenAI more robustly, including timestamped URL | |
context_parts = [] | |
for i, r in enumerate(search_results): | |
original_text_snippet = "" | |
if r.get('원본텍스트'): | |
snippet = r['원본텍스트'][:200] | |
original_text_snippet = f"\n(원본 내용 일부: {snippet}...)" | |
# Generate timestamped URL if possible | |
timestamped_url_str = "N/A" | |
url = r.get('URL', 'N/A') | |
timestamp = r.get('타임스탬프', 'N/A') | |
is_youtube = url and isinstance(url, str) and ('youtube.com' in url or 'youtu.be' in url) | |
has_valid_timestamp = timestamp and timestamp != 'N/A' and parse_timestamp_to_seconds(timestamp) is not None | |
if is_youtube and has_valid_timestamp: | |
try: | |
timestamped_url_str = add_timestamp_to_youtube_url(url, timestamp) | |
except Exception: | |
timestamped_url_str = url # Fallback to original URL on error | |
elif url != "N/A": | |
timestamped_url_str = url # Use original URL if not YouTube/no timestamp | |
context_parts.append( | |
f"관련 정보 {i+1}:\n" | |
f"제목: {r.get('제목', 'N/A')}\n" | |
f"영상 URL (원본): {url}\n" | |
f"타임스탬프: {timestamp}\n" | |
f"타임스탬프 적용 URL: {timestamped_url_str}\n" # Add the timestamped URL here | |
f"내용 타입: {r.get('타입', 'N/A')}\n" | |
f"요약: {r.get('요약', 'N/A')}" | |
f"{original_text_snippet}" # Append the snippet safely | |
) | |
context = "\n\n---\n\n".join(context_parts) # Join the parts | |
# Updated system prompt to instruct Markdown link usage | |
system_prompt = """너는 현실적인 조언을 잘하는 PM 멘토 Khan이다. | |
- 말투는 단호하지만 공감력이 있다. "~입니다." 또는 "~죠." 와 같이 명확하게 끝맺는다. 존댓말을 사용한다. | |
- 모호한 위로보다는 구조적이고 실용적인 제안을 한다. 문제의 핵심을 파악하고 구체적인 해결책이나 다음 단계를 제시한다. | |
- 질문이 막연하면 구체화해서 되물어본다. | |
- (이전 대화 기록은 없으므로) 반복 질문에는 "이전에 유사한 내용을 찾아봤었죠. 다시 한번 살펴보면..." 과 같이 언급할 수 있다. | |
- 긴 설명보단 핵심을 빠르게 전달하고, 필요하다면 간결한 비유를 활용한다. | |
- 주어진 '관련 정보' (영상 제목, 요약, 원본 내용, 타임스탬프 적용 URL 등)를 바탕으로 답변해야 한다. 정보가 부족하거나 질문과 관련성이 낮으면, 그 점을 명확히 밝히고 추가 정보를 요청하거나 질문을 구체화하도록 유도한다. | |
- **답변 중 관련 정보를 참조할 때는, 반드시 '타임스탬프 적용 URL'을 사용하여 다음과 같은 Markdown 링크 형식으로 만들어야 한다: `[영상 제목](타임스탬프_적용_URL)`. 예를 들어, "자세한 내용은 [비개발자가 연봉 2억을 받는 현실적인 방법](https://www.youtube.com/watch?v=VIDEO_ID&t=178s) 영상을 참고하시면 도움이 될 겁니다." 와 같이 표시한다.** | |
- 답변은 한국어로 한다.""" | |
# Use triple quotes for the multi-line f-string | |
user_message = f"""사용자 질문: {query} | |
아래 관련 정보를 바탕으로 Khan 멘토로서 답변해주세요: | |
{context}""" | |
try: | |
logger.info("Calling OpenAI API...") | |
completion = client.chat.completions.create( | |
model="gpt-4o", # Use gpt-4 if available and preferred | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_message} | |
], | |
temperature=0.5, # Slightly less creative, more focused on instructions | |
) | |
answer = completion.choices[0].message.content | |
logger.info("Received response from OpenAI.") | |
return answer.strip() | |
except Exception as e: | |
st.error(f"OpenAI 답변 생성 중 오류 발생: {e}") | |
logger.error(f"Error during OpenAI API call: {e}", exc_info=True) | |
return "답변을 생성하는 중에 문제가 발생했습니다. OpenAI API 키 또는 서비스 상태를 확인해주세요." | |
# --- Streamlit 앱 UI --- | |
st.set_page_config(page_title="Khan 멘토 (PM 영상 기반)", layout="wide") | |
st.title("✨ Khan 멘토에게 질문하기") | |
st.markdown("PM 관련 영상 내용을 기반으로 Khan 멘토가 답변해 드립니다.") | |
# --- API 키 확인 및 리소스 초기화 --- | |
openai_client = init_openai_client() | |
pc = init_pinecone() | |
model = load_embedding_model() | |
index = get_pinecone_index(pc, INDEX_NAME) | |
# --- 사용자 입력 --- | |
query = st.text_input("멘토에게 질문할 내용을 입력하세요:", placeholder="예: 신입 PM이 가장 먼저 해야 할 일은 무엇인가요?") | |
# --- 검색 및 답변 생성 실행 --- | |
if st.button("Khan 멘토에게 질문하기"): | |
# Always use top_k=3 for Pinecone search | |
if query and index and model and openai_client: | |
with st.spinner("관련 영상을 찾고 Khan 멘토가 답변을 준비하는 중..."): | |
# 1. Pinecone 검색 (Always use top_k=3) | |
pinecone_results = search(query, top_k=5, _index=index, _model=model) | |
# 2. OpenAI 답변 생성 | |
khan_answer = generate_khan_answer(query, pinecone_results, openai_client) | |
# 3. 결과 표시 | |
st.subheader("💡 Khan 멘토의 답변") | |
st.markdown(khan_answer) # 생성된 답변 표시 | |
# 4. 참고 자료 (Pinecone 검색 결과) 표시 | |
if pinecone_results: | |
with st.expander("답변에 참고한 영상 정보 보기"): | |
displayed_urls = set() # Keep track of displayed video URLs | |
# Display up to 3 *unique* results based on URL | |
for i, r in enumerate(pinecone_results): | |
url = r.get('URL', 'N/A') | |
# Skip if this video URL has already been displayed | |
if url in displayed_urls or url == 'N/A': | |
continue | |
# Add the URL to the set of displayed URLs | |
displayed_urls.add(url) | |
# --- Display unique video info --- | |
st.markdown(f"--- **참고 자료 {len(displayed_urls)} (유사도: {r['점수']:.4f})** ---") # Use length of set for counter | |
st.markdown(f"**제목:** {r.get('제목', 'N/A')}") | |
st.markdown(f"**요약:** {r.get('요약', 'N/A')}") | |
timestamp = r.get('타임스탬프', 'N/A') | |
is_youtube = url and isinstance(url, str) and ('youtube.com' in url or 'youtu.be' in url) | |
start_seconds = None # Initialize start_seconds | |
# Try to calculate start_seconds if timestamp is valid | |
if is_youtube and timestamp and timestamp != 'N/A': | |
start_seconds = parse_timestamp_to_seconds(timestamp) | |
# Display timestamped link (still useful for user) | |
if is_youtube and start_seconds is not None: | |
try: | |
# We still generate timestamped URL for the link text | |
timestamped_link_url = add_timestamp_to_youtube_url(url, timestamp) | |
st.markdown(f"**영상 링크 (타임스탬프 포함):** [{timestamped_link_url}]({timestamped_link_url})") | |
except Exception as e: | |
logger.error(f"Error creating timestamped URL for link: {e}") | |
st.markdown(f"**영상 링크 (원본):** [{url}]({url})") # Fallback link | |
elif url != "N/A" and isinstance(url, str) and url.startswith("http"): | |
st.markdown(f"**URL:** [{url}]({url})") | |
else: | |
st.markdown(f"**URL:** {url}") | |
# Use st.video with original URL and start_time parameter | |
if is_youtube and url != "N/A": | |
# Create columns to control width, place video in the first column (50% width) | |
col1, col2 = st.columns(2) | |
with col1: | |
try: | |
# Pass original URL and calculated start_seconds to st.video | |
st.video(url, start_time=start_seconds or 0) | |
except Exception as e: | |
st.error(f"비디오({url}) 재생 중 오류 발생: {e}") | |
# Fallback link uses the original URL here as start_time likely failed | |
st.markdown(f"[YouTube에서 보기]({url})") | |
elif url != "N/A": # Try st.video for other potential video URLs (no start_time) | |
# Create columns for non-YouTube videos as well | |
col1, col2 = st.columns(2) | |
with col1: | |
try: | |
st.video(url) | |
except Exception as e: | |
logger.warning(f"st.video failed for non-YouTube URL {url}: {e}") | |
# Remove the display of original timestamp and type | |
# st.markdown(f"**타임스탬프 (원본):** {timestamp}") | |
# st.markdown(f"**내용 타입:** {r.get('타입', 'N/A')}") | |
elif not query: | |
st.warning("질문 내용을 입력해주세요.") | |
# API 키 등 다른 요소 부재 시 에러는 각 init 함수에서 처리됨 | |
st.markdown("---") | |
st.caption("Powered by Pinecone, Sentence Transformers, and OpenAI") |