khanpm / app.py
morethanair's picture
Add files via upload
07af1d2
raw
history blame
18.2 kB
import streamlit as st
import os
from pinecone import Pinecone
from sentence_transformers import SentenceTransformer
from typing import List, Dict
import re # For parsing timestamp and extracting video ID
import streamlit.components.v1 as components # For embedding HTML
from openai import OpenAI # Import OpenAI library
import logging
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Helper Functions (Existing: parse_timestamp_to_seconds, get_youtube_video_id, add_timestamp_to_youtube_url, generate_youtube_embed_html) ---
def parse_timestamp_to_seconds(timestamp: str) -> int | None:
"""HH:MM:SS 또는 HH:MM:SS.ms 형식의 타임스탬프를 초 단위로 변환합니다."""
if not isinstance(timestamp, str):
return None
# Remove milliseconds part if present
timestamp_no_ms = timestamp.split('.')[0]
parts = timestamp_no_ms.split(':')
try:
if len(parts) == 3:
h, m, s = map(int, parts)
return h * 3600 + m * 60 + s
elif len(parts) == 2:
m, s = map(int, parts)
return m * 60 + s
elif len(parts) == 1:
return int(parts[0])
else:
return None
except ValueError:
return None
def get_youtube_video_id(url: str) -> str | None:
"""YouTube URL에서 비디오 ID를 추출합니다."""
if not isinstance(url, str):
return None
# Standard YouTube URLs (youtube.com/watch?v=...), shortened URLs (youtu.be/...), etc.
match = re.search(r"(?:v=|/|youtu\.be/|embed/|shorts/)([0-9A-Za-z_-]{11})", url)
return match.group(1) if match else None
def add_timestamp_to_youtube_url(youtube_url: str, timestamp: str) -> str:
"""YouTube URL에 타임스탬프를 추가합니다."""
seconds = parse_timestamp_to_seconds(timestamp)
if seconds is None or not youtube_url:
return youtube_url # Return original URL if timestamp is invalid or URL is empty
separator = '&' if '?' in youtube_url else '?'
# Remove existing t= parameter if present
cleaned_url = re.sub(r'[?&]t=\d+s?', '', youtube_url)
separator = '&' if '?' in cleaned_url else '?' # Re-check separator after cleaning
return f"{cleaned_url}{separator}t={seconds}s"
def generate_youtube_embed_html(youtube_url: str, timestamp: str) -> str | None:
"""타임스탬프가 적용된 YouTube 임베드 HTML 코드를 생성합니다. 가로 800px 고정, 세로 자동 조절."""
video_id = get_youtube_video_id(youtube_url)
start_seconds = parse_timestamp_to_seconds(timestamp)
if not video_id:
logger.warning(f"Could not extract video ID from URL: {youtube_url}")
return None # Cannot generate embed code without video ID
start_param = f"start={start_seconds}" if start_seconds is not None else ""
# Use aspect ratio approach with fixed width 800px
return f'''
<div style="position: relative; width: 800px; padding-bottom: 450px; /* 800px * 9 / 16 = 450px */ height: 0; overflow: hidden;">
<iframe
src="https://www.youtube.com/embed/{video_id}?{start_param}&autoplay=0&rel=0"
frameborder="0"
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
referrerpolicy="strict-origin-when-cross-origin"
allowfullscreen
style="position: absolute; top: 0; left: 0; width: 100%; height: 100%;">
</iframe>
</div>
'''
# --- 설정 ---
# Pinecone 설정
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY","pcsk_PZHLK_TRAvMCyNmJM4FKGCX7rbbY22a58fhnWYasx1mf3WL6sRasoASZXfsbnJYvCQ13w") # Load from environment variable
PINECONE_ENV = os.getenv("PINECONE_ENV", "us-east-1")
INDEX_NAME = "video-embeddings"
EMBEDDING_MODEL = "jhgan/ko-sroberta-multitask"
# OpenAI 설정
OPENAI_API_KEY = "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"
# --- 리소스 로딩 (캐싱 활용) ---
@st.cache_resource
def init_pinecone():
"""Pinecone 클라이언트를 초기화합니다."""
api_key = PINECONE_API_KEY
if not api_key:
st.error("Pinecone API 키가 설정되지 않았습니다. 환경 변수를 확인하세요.")
st.stop()
try:
pc = Pinecone(api_key=api_key)
logger.info("Successfully connected to Pinecone.")
return pc
except Exception as e:
st.error(f"Pinecone 초기화 중 오류 발생: {e}")
st.stop()
@st.cache_resource
def load_embedding_model():
"""Sentence Transformer 모델을 로드합니다."""
try:
model = SentenceTransformer(EMBEDDING_MODEL)
logger.info(f"Successfully loaded embedding model: {EMBEDDING_MODEL}")
return model
except Exception as e:
st.error(f"임베딩 모델 로딩 중 오류 발생: {e}")
st.stop()
@st.cache_resource
def get_pinecone_index(_pc: Pinecone, index_name: str):
"""Pinecone 인덱스 객체를 가져옵니다."""
try:
index = _pc.Index(index_name)
# Optionally, do a quick check like index.describe_index_stats() to confirm connection
stats = index.describe_index_stats()
logger.info(f"Successfully connected to Pinecone index '{index_name}'. Stats: {stats.get('total_vector_count', 'N/A')} vectors")
return index
except Exception as e:
st.error(f"Pinecone 인덱스 '{index_name}' 연결 중 오류 발생: {e}. 인덱스가 존재하고 활성 상태인지 확인하세요.")
st.stop()
@st.cache_resource
def init_openai_client():
"""OpenAI 클라이언트를 초기화합니다."""
if not OPENAI_API_KEY:
st.error("OpenAI API 키가 설정되지 않았습니다. 환경 변수를 확인하세요.")
st.stop()
try:
client = OpenAI(api_key=OPENAI_API_KEY)
# Test connection (optional, but recommended)
client.models.list()
logger.info("Successfully connected to OpenAI.")
return client
except Exception as e:
st.error(f"OpenAI 클라이언트 초기화 또는 연결 테스트 중 오류 발생: {e}")
st.stop()
# --- 검색 함수 ---
def search(query: str, top_k: int = 5, _index=None, _model=None) -> List[Dict]:
"""Pinecone 인덱스에서 검색을 수행하고 title과 original_text를 포함합니다."""
if not query or _index is None or _model is None:
return []
try:
query_vec = _model.encode(query, convert_to_numpy=True).tolist()
result = _index.query(vector=query_vec, top_k=top_k, include_metadata=True)
matches = result.get("matches", [])
search_results = []
for m in matches:
metadata = m.get("metadata", {})
search_results.append({
"URL": metadata.get("url", "N/A"),
"타임스탬프": metadata.get("timestamp", "N/A"),
"타입": metadata.get("type", "N/A"),
"제목": metadata.get("title", "N/A"), # 제목 추가
"요약": metadata.get("summary", "N/A"),
"원본텍스트": metadata.get("original_text", "N/A"), # 컨텍스트로 활용할 원본 텍스트
"점수": m.get("score", 0.0)
})
logger.info(f"Pinecone search returned {len(search_results)} results for query: '{query[:50]}...'")
return search_results
except Exception as e:
st.error(f"Pinecone 검색 중 오류 발생: {e}")
logger.error(f"Error during Pinecone search: {e}", exc_info=True)
return []
# --- OpenAI 답변 생성 함수 ---
def generate_khan_answer(query: str, search_results: List[Dict], client: OpenAI) -> str:
"""사용자 질문과 검색 결과를 바탕으로 Khan 페르소나 답변을 생성합니다."""
if not search_results:
# Return a persona-consistent message even when no results are found
return "현재 질문에 대해 참고할 만한 관련 영상을 찾지 못했습니다. 질문을 조금 더 명확하게 해주시거나 다른 방식으로 질문해주시면 도움이 될 것 같습니다."
# Build context string for OpenAI more robustly, including timestamped URL
context_parts = []
for i, r in enumerate(search_results):
original_text_snippet = ""
if r.get('원본텍스트'):
snippet = r['원본텍스트'][:200]
original_text_snippet = f"\n(원본 내용 일부: {snippet}...)"
# Generate timestamped URL if possible
timestamped_url_str = "N/A"
url = r.get('URL', 'N/A')
timestamp = r.get('타임스탬프', 'N/A')
is_youtube = url and isinstance(url, str) and ('youtube.com' in url or 'youtu.be' in url)
has_valid_timestamp = timestamp and timestamp != 'N/A' and parse_timestamp_to_seconds(timestamp) is not None
if is_youtube and has_valid_timestamp:
try:
timestamped_url_str = add_timestamp_to_youtube_url(url, timestamp)
except Exception:
timestamped_url_str = url # Fallback to original URL on error
elif url != "N/A":
timestamped_url_str = url # Use original URL if not YouTube/no timestamp
context_parts.append(
f"관련 정보 {i+1}:\n"
f"제목: {r.get('제목', 'N/A')}\n"
f"영상 URL (원본): {url}\n"
f"타임스탬프: {timestamp}\n"
f"타임스탬프 적용 URL: {timestamped_url_str}\n" # Add the timestamped URL here
f"내용 타입: {r.get('타입', 'N/A')}\n"
f"요약: {r.get('요약', 'N/A')}"
f"{original_text_snippet}" # Append the snippet safely
)
context = "\n\n---\n\n".join(context_parts) # Join the parts
# Updated system prompt to instruct Markdown link usage
system_prompt = """너는 현실적인 조언을 잘하는 PM 멘토 Khan이다.
- 말투는 단호하지만 공감력이 있다. "~입니다." 또는 "~죠." 와 같이 명확하게 끝맺는다. 존댓말을 사용한다.
- 모호한 위로보다는 구조적이고 실용적인 제안을 한다. 문제의 핵심을 파악하고 구체적인 해결책이나 다음 단계를 제시한다.
- 질문이 막연하면 구체화해서 되물어본다.
- (이전 대화 기록은 없으므로) 반복 질문에는 "이전에 유사한 내용을 찾아봤었죠. 다시 한번 살펴보면..." 과 같이 언급할 수 있다.
- 긴 설명보단 핵심을 빠르게 전달하고, 필요하다면 간결한 비유를 활용한다.
- 주어진 '관련 정보' (영상 제목, 요약, 원본 내용, 타임스탬프 적용 URL 등)를 바탕으로 답변해야 한다. 정보가 부족하거나 질문과 관련성이 낮으면, 그 점을 명확히 밝히고 추가 정보를 요청하거나 질문을 구체화하도록 유도한다.
- **답변 중 관련 정보를 참조할 때는, 반드시 '타임스탬프 적용 URL'을 사용하여 다음과 같은 Markdown 링크 형식으로 만들어야 한다: `[영상 제목](타임스탬프_적용_URL)`. 예를 들어, "자세한 내용은 [비개발자가 연봉 2억을 받는 현실적인 방법](https://www.youtube.com/watch?v=VIDEO_ID&t=178s) 영상을 참고하시면 도움이 될 겁니다." 와 같이 표시한다.**
- 답변은 한국어로 한다."""
# Use triple quotes for the multi-line f-string
user_message = f"""사용자 질문: {query}
아래 관련 정보를 바탕으로 Khan 멘토로서 답변해주세요:
{context}"""
try:
logger.info("Calling OpenAI API...")
completion = client.chat.completions.create(
model="gpt-4o", # Use gpt-4 if available and preferred
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
],
temperature=0.5, # Slightly less creative, more focused on instructions
)
answer = completion.choices[0].message.content
logger.info("Received response from OpenAI.")
return answer.strip()
except Exception as e:
st.error(f"OpenAI 답변 생성 중 오류 발생: {e}")
logger.error(f"Error during OpenAI API call: {e}", exc_info=True)
return "답변을 생성하는 중에 문제가 발생했습니다. OpenAI API 키 또는 서비스 상태를 확인해주세요."
# --- Streamlit 앱 UI ---
st.set_page_config(page_title="Khan 멘토 (PM 영상 기반)", layout="wide")
st.title("✨ Khan 멘토에게 질문하기")
st.markdown("PM 관련 영상 내용을 기반으로 Khan 멘토가 답변해 드립니다.")
# --- API 키 확인 및 리소스 초기화 ---
openai_client = init_openai_client()
pc = init_pinecone()
model = load_embedding_model()
index = get_pinecone_index(pc, INDEX_NAME)
# --- 사용자 입력 ---
query = st.text_input("멘토에게 질문할 내용을 입력하세요:", placeholder="예: 신입 PM이 가장 먼저 해야 할 일은 무엇인가요?")
# --- 검색 및 답변 생성 실행 ---
if st.button("Khan 멘토에게 질문하기"):
# Always use top_k=3 for Pinecone search
if query and index and model and openai_client:
with st.spinner("관련 영상을 찾고 Khan 멘토가 답변을 준비하는 중..."):
# 1. Pinecone 검색 (Always use top_k=3)
pinecone_results = search(query, top_k=5, _index=index, _model=model)
# 2. OpenAI 답변 생성
khan_answer = generate_khan_answer(query, pinecone_results, openai_client)
# 3. 결과 표시
st.subheader("💡 Khan 멘토의 답변")
st.markdown(khan_answer) # 생성된 답변 표시
# 4. 참고 자료 (Pinecone 검색 결과) 표시
if pinecone_results:
with st.expander("답변에 참고한 영상 정보 보기"):
displayed_urls = set() # Keep track of displayed video URLs
# Display up to 3 *unique* results based on URL
for i, r in enumerate(pinecone_results):
url = r.get('URL', 'N/A')
# Skip if this video URL has already been displayed
if url in displayed_urls or url == 'N/A':
continue
# Add the URL to the set of displayed URLs
displayed_urls.add(url)
# --- Display unique video info ---
st.markdown(f"--- **참고 자료 {len(displayed_urls)} (유사도: {r['점수']:.4f})** ---") # Use length of set for counter
st.markdown(f"**제목:** {r.get('제목', 'N/A')}")
st.markdown(f"**요약:** {r.get('요약', 'N/A')}")
timestamp = r.get('타임스탬프', 'N/A')
is_youtube = url and isinstance(url, str) and ('youtube.com' in url or 'youtu.be' in url)
start_seconds = None # Initialize start_seconds
# Try to calculate start_seconds if timestamp is valid
if is_youtube and timestamp and timestamp != 'N/A':
start_seconds = parse_timestamp_to_seconds(timestamp)
# Display timestamped link (still useful for user)
if is_youtube and start_seconds is not None:
try:
# We still generate timestamped URL for the link text
timestamped_link_url = add_timestamp_to_youtube_url(url, timestamp)
st.markdown(f"**영상 링크 (타임스탬프 포함):** [{timestamped_link_url}]({timestamped_link_url})")
except Exception as e:
logger.error(f"Error creating timestamped URL for link: {e}")
st.markdown(f"**영상 링크 (원본):** [{url}]({url})") # Fallback link
elif url != "N/A" and isinstance(url, str) and url.startswith("http"):
st.markdown(f"**URL:** [{url}]({url})")
else:
st.markdown(f"**URL:** {url}")
# Use st.video with original URL and start_time parameter
if is_youtube and url != "N/A":
# Create columns to control width, place video in the first column (50% width)
col1, col2 = st.columns(2)
with col1:
try:
# Pass original URL and calculated start_seconds to st.video
st.video(url, start_time=start_seconds or 0)
except Exception as e:
st.error(f"비디오({url}) 재생 중 오류 발생: {e}")
# Fallback link uses the original URL here as start_time likely failed
st.markdown(f"[YouTube에서 보기]({url})")
elif url != "N/A": # Try st.video for other potential video URLs (no start_time)
# Create columns for non-YouTube videos as well
col1, col2 = st.columns(2)
with col1:
try:
st.video(url)
except Exception as e:
logger.warning(f"st.video failed for non-YouTube URL {url}: {e}")
# Remove the display of original timestamp and type
# st.markdown(f"**타임스탬프 (원본):** {timestamp}")
# st.markdown(f"**내용 타입:** {r.get('타입', 'N/A')}")
elif not query:
st.warning("질문 내용을 입력해주세요.")
# API 키 등 다른 요소 부재 시 에러는 각 init 함수에서 처리됨
st.markdown("---")
st.caption("Powered by Pinecone, Sentence Transformers, and OpenAI")