|
""" |
|
optimized_rag_proper.py |
|
LLM์ ์ ์งํ๋ฉด์ ์๋ต์๊ฐ์ ๊ฐ์ ํ๋ ์ฌ๋ฐ๋ฅธ ๋ฐฉ๋ฒ |
|
๋ชฉํ: 1์ด ์ด๋ด (ํ์ค์ ๋ชฉํ) |
|
""" |
|
|
|
from langchain.chains import RetrievalQA |
|
from langchain.prompts import PromptTemplate |
|
from langchain_openai import ChatOpenAI |
|
from langchain_community.vectorstores import FAISS |
|
from langchain_core.documents import Document |
|
from typing import List, Dict, Any, Optional |
|
import os |
|
import time |
|
import asyncio |
|
import aiohttp |
|
from concurrent.futures import ThreadPoolExecutor |
|
import json |
|
|
|
class ProperlyOptimizedRAG: |
|
""" |
|
LLM์ ์ ์งํ๋ฉด์ ์ต์ ํํ๋ ์ฌ๋ฐ๋ฅธ ๋ฐฉ๋ฒ |
|
""" |
|
|
|
def __init__(self, vector_store: FAISS): |
|
self.vector_store = vector_store |
|
|
|
|
|
self.llm = ChatOpenAI( |
|
model="gpt-3.5-turbo", |
|
temperature=0.1, |
|
max_tokens=300, |
|
api_key=os.getenv("OPENAI_API_KEY"), |
|
request_timeout=5 |
|
) |
|
|
|
|
|
self.prompt_template = self._create_minimal_prompt() |
|
|
|
|
|
self.cache = {} |
|
self.cache_ttl = 3600 |
|
|
|
|
|
self.executor = ThreadPoolExecutor(max_workers=2) |
|
|
|
def _create_minimal_prompt(self) -> PromptTemplate: |
|
""" |
|
์ต์ํ์ ํ๋กฌํํธ (ํ ํฐ ์ ์ค์ด๊ธฐ) |
|
""" |
|
|
|
template = """๋งค๋ด์ผ: {context} |
|
|
|
์ง๋ฌธ: {question} |
|
|
|
๋งค๋ด์ผ ๋ด์ฉ๋ง์ผ๋ก ๊ฐ๋จ๋ช
๋ฃํ๊ฒ ๋ต๋ณ:""" |
|
|
|
return PromptTemplate( |
|
template=template, |
|
input_variables=["context", "question"] |
|
) |
|
|
|
def answer_question(self, question: str) -> Dict[str, Any]: |
|
""" |
|
์ต์ ํ๋ ๋ต๋ณ ์์ฑ (์ฌ์ ํ LLM ์ฌ์ฉ) |
|
""" |
|
start_time = time.time() |
|
|
|
|
|
cache_key = hash(question) |
|
if cache_key in self.cache: |
|
cached_answer, cached_time = self.cache[cache_key] |
|
if time.time() - cached_time < self.cache_ttl: |
|
return { |
|
"question": question, |
|
"answer": cached_answer['answer'], |
|
"source_pages": cached_answer['pages'], |
|
"response_time": 0.01, |
|
"cached": True |
|
} |
|
|
|
|
|
with self.executor as executor: |
|
|
|
future_search = executor.submit( |
|
self._fast_vector_search, |
|
question |
|
) |
|
|
|
|
|
search_results = future_search.result(timeout=0.5) |
|
|
|
if not search_results: |
|
return self._fallback_response(question, start_time) |
|
|
|
|
|
context = self._optimize_context(search_results) |
|
|
|
|
|
answer = self._fast_llm_call(question, context) |
|
|
|
|
|
pages = list(set([ |
|
doc.metadata.get('page', 0) |
|
for doc in search_results |
|
]))[:3] |
|
|
|
response = { |
|
"question": question, |
|
"answer": answer, |
|
"source_pages": sorted(pages), |
|
"response_time": time.time() - start_time, |
|
"cached": False |
|
} |
|
|
|
|
|
self.cache[cache_key] = ( |
|
{"answer": answer, "pages": pages}, |
|
time.time() |
|
) |
|
|
|
return response |
|
|
|
def _fast_vector_search(self, question: str, k: int = 3) -> List[Document]: |
|
""" |
|
๋น ๋ฅธ ๋ฒกํฐ ๊ฒ์ (k๋ฅผ ์ค์ด๊ณ MMR ์ ๊ฑฐ) |
|
""" |
|
try: |
|
|
|
docs = self.vector_store.similarity_search( |
|
question, |
|
k=k, |
|
fetch_k=k |
|
) |
|
return docs |
|
except Exception as e: |
|
print(f"๋ฒกํฐ ๊ฒ์ ์ค๋ฅ: {e}") |
|
return [] |
|
|
|
def _optimize_context(self, docs: List[Document]) -> str: |
|
""" |
|
์ปจํ
์คํธ ์ต์ ํ (์ค๋ณต ์ ๊ฑฐ, ํต์ฌ๋ง ์ถ์ถ) |
|
""" |
|
seen_content = set() |
|
optimized = [] |
|
|
|
for doc in docs: |
|
content = doc.page_content.strip() |
|
|
|
|
|
content_hash = hash(content[:50]) |
|
if content_hash in seen_content: |
|
continue |
|
seen_content.add(content_hash) |
|
|
|
|
|
if len(content) < 50 or len(content) > 500: |
|
content = content[:500] |
|
|
|
optimized.append(content) |
|
|
|
|
|
return "\n---\n".join(optimized[:3]) |
|
|
|
def _fast_llm_call(self, question: str, context: str) -> str: |
|
""" |
|
๋น ๋ฅธ LLM ํธ์ถ |
|
""" |
|
try: |
|
|
|
prompt = self.prompt_template.format( |
|
context=context, |
|
question=question |
|
) |
|
|
|
|
|
response = self.llm.invoke(prompt) |
|
|
|
|
|
if hasattr(response, 'content'): |
|
return response.content |
|
else: |
|
return str(response) |
|
|
|
except Exception as e: |
|
print(f"LLM ํธ์ถ ์ค๋ฅ: {e}") |
|
return "๋ต๋ณ ์์ฑ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค." |
|
|
|
def _fallback_response(self, question: str, start_time: float) -> Dict: |
|
""" |
|
ํด๋ฐฑ ์๋ต (๋ฒกํฐ ๊ฒ์ ์คํจ ์) |
|
""" |
|
return { |
|
"question": question, |
|
"answer": "ํด๋น ์ ๋ณด๋ฅผ ๋งค๋ด์ผ์์ ์ฐพ์ ์ ์์ต๋๋ค. ๋ค๋ฅธ ํํ์ผ๋ก ์ง๋ฌธํด ์ฃผ์๊ฑฐ๋, ์๋น์ค์ผํฐ(1577-0001)๋ก ๋ฌธ์ํด ์ฃผ์ธ์.", |
|
"source_pages": [], |
|
"response_time": time.time() - start_time, |
|
"cached": False |
|
} |
|
|
|
def batch_test(self, questions: List[str]) -> Dict[str, Any]: |
|
""" |
|
๋ฐฐ์น ํ
์คํธ ๋ฐ ์ฑ๋ฅ ์ธก์ |
|
""" |
|
results = [] |
|
times = [] |
|
|
|
print("\n" + "="*60) |
|
print("๐ ์ต์ ํ๋ RAG ์ฑ๋ฅ ํ
์คํธ (LLM ์ ์ง)") |
|
print("="*60) |
|
|
|
for i, question in enumerate(questions, 1): |
|
result = self.answer_question(question) |
|
results.append(result) |
|
times.append(result['response_time']) |
|
|
|
status = "๐ฆ ์บ์" if result.get('cached') else "๐ ๊ฒ์" |
|
print(f"\n[{i}] {question}") |
|
print(f"โฑ๏ธ {result['response_time']:.2f}์ด ({status})") |
|
print(f"๐ ํ์ด์ง: {result['source_pages']}") |
|
print(f"๐ฌ {result['answer'][:150]}...") |
|
|
|
|
|
avg_time = sum(times) / len(times) |
|
cached_count = sum(1 for r in results if r.get('cached')) |
|
|
|
print("\n" + "="*60) |
|
print(f"๐ ์ฑ๋ฅ ํต๊ณ:") |
|
print(f" ํ๊ท ์๋ต: {avg_time:.2f}์ด") |
|
print(f" ์ต์/์ต๋: {min(times):.2f}์ด / {max(times):.2f}์ด") |
|
print(f" ์บ์ ์ ์ค: {cached_count}/{len(questions)}") |
|
print(f" ๋ชฉํ ๋ฌ์ฑ: {'โ
' if avg_time < 1.0 else 'โ ๏ธ 1์ด ๋ชฉํ ๋ฏธ๋ฌ'}") |
|
print("="*60) |
|
|
|
return { |
|
"average": avg_time, |
|
"min": min(times), |
|
"max": max(times), |
|
"results": results |
|
} |
|
|
|
|
|
|
|
class AsyncOptimizedRAG: |
|
""" |
|
๋น๋๊ธฐ ์ฒ๋ฆฌ๋ก ๋ ๋น ๋ฅธ ์๋ต (์คํ์ ) |
|
""" |
|
|
|
def __init__(self, vector_store: FAISS): |
|
self.vector_store = vector_store |
|
self.llm = ChatOpenAI( |
|
model="gpt-3.5-turbo-1106", |
|
temperature=0.1, |
|
max_tokens=300, |
|
api_key=os.getenv("OPENAI_API_KEY") |
|
) |
|
self.cache = {} |
|
|
|
async def answer_question_async(self, question: str) -> Dict[str, Any]: |
|
""" |
|
๋น๋๊ธฐ ๋ต๋ณ ์์ฑ |
|
""" |
|
start_time = time.time() |
|
|
|
|
|
search_task = asyncio.create_task( |
|
self._async_vector_search(question) |
|
) |
|
|
|
|
|
docs = await search_task |
|
|
|
if not docs: |
|
return { |
|
"question": question, |
|
"answer": "์ ๋ณด๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค.", |
|
"source_pages": [], |
|
"response_time": time.time() - start_time |
|
} |
|
|
|
|
|
context = "\n".join([d.page_content[:300] for d in docs[:3]]) |
|
answer = await self._async_llm_call(question, context) |
|
|
|
return { |
|
"question": question, |
|
"answer": answer, |
|
"source_pages": [d.metadata.get('page', 0) for d in docs], |
|
"response_time": time.time() - start_time |
|
} |
|
|
|
async def _async_vector_search(self, question: str) -> List[Document]: |
|
"""๋น๋๊ธฐ ๋ฒกํฐ ๊ฒ์""" |
|
return await asyncio.to_thread( |
|
self.vector_store.similarity_search, |
|
question, |
|
k=3 |
|
) |
|
|
|
async def _async_llm_call(self, question: str, context: str) -> str: |
|
"""๋น๋๊ธฐ LLM ํธ์ถ""" |
|
prompt = f"๋งค๋ด์ผ: {context}\n์ง๋ฌธ: {question}\n๋ต๋ณ:" |
|
|
|
response = await asyncio.to_thread( |
|
self.llm.invoke, |
|
prompt |
|
) |
|
|
|
return response.content if hasattr(response, 'content') else str(response) |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
from embeddings import VehicleManualEmbeddings |
|
import os |
|
|
|
|
|
if not os.getenv("OPENAI_API_KEY"): |
|
print("OpenAI API Key๋ฅผ ์
๋ ฅํ์ธ์:") |
|
api_key = input("sk-... : ").strip() |
|
os.environ["OPENAI_API_KEY"] = api_key |
|
|
|
|
|
print("๋ฒกํฐ ์ธ๋ฑ์ค ๋ก๋ฉ ์ค...") |
|
embedder = VehicleManualEmbeddings() |
|
vector_store = embedder.load_index() |
|
|
|
|
|
rag = ProperlyOptimizedRAG(vector_store) |
|
|
|
|
|
test_questions = [ |
|
"์์ง์ค์ผ ๊ต์ฒด ์ฃผ๊ธฐ๋?", |
|
"ํ์ด์ด ๊ณต๊ธฐ์์ ์ผ๋ง?", |
|
"๊ฒฝ๊ณ ๋ฑ์ด ์ผ์ก์ ๋ ์ด๋ป๊ฒ ํด์ผ ํ๋์?", |
|
"๋๊ธธ ์ฃผํ ์ ์ฃผ์์ฌํญ", |
|
"์ด์ ์ ๋ณด์กฐ ์์คํ
์ค์ " |
|
] |
|
|
|
|
|
print("\n### 1์ฐจ ์คํ (์บ์ ์์) ###") |
|
stats1 = rag.batch_test(test_questions) |
|
|
|
|
|
print("\n### 2์ฐจ ์คํ (์บ์ ํ์ฉ) ###") |
|
stats2 = rag.batch_test(test_questions[:3]) |
|
|
|
|
|
improvement = ((stats1['average'] - stats2['average']) / stats1['average']) * 100 |
|
print(f"\n๐ฏ ์บ์ ํจ๊ณผ: {improvement:.1f}% ์๋ ํฅ์") |