Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import time | |
import numpy as np | |
import networkx as nx | |
from textwrap import dedent | |
from dotenv import load_dotenv | |
from openai import AzureOpenAI | |
from huggingface_hub import InferenceClient | |
from lightrag import LightRAG | |
from lightrag.utils import EmbeddingFunc | |
from lightrag.kg.shared_storage import initialize_pipeline_status | |
load_dotenv() | |
# Load the environment variables | |
HF_API_TOKEN = os.environ["HF_TOKEN"] | |
HF_API_ENDPOINT = os.environ["HF_API_ENDPOINT"] | |
AZURE_OPENAI_API_VERSION = os.environ["AZURE_OPENAI_API_VERSION"] | |
AZURE_OPENAI_DEPLOYMENT = os.environ["AZURE_OPENAI_DEPLOYMENT"] | |
AZURE_OPENAI_API_KEY = os.environ["AZURE_OPENAI_API_KEY"] | |
AZURE_OPENAI_ENDPOINT = os.environ["AZURE_OPENAI_ENDPOINT"] | |
AZURE_EMBEDDING_DEPLOYMENT = os.environ["AZURE_EMBEDDING_DEPLOYMENT"] | |
AZURE_EMBEDDING_API_VERSION = os.environ["AZURE_EMBEDDING_API_VERSION"] | |
WORKING_DIR = "./sample" | |
GRAPHML_FILE = WORKING_DIR + "/graph_chunk_entity_relation.graphml" | |
MODEL_LIST = [ | |
"EmergentMethods/Phi-3-mini-128k-instruct-graph", | |
"OpenAI/GPT-4.1-mini", | |
] | |
# Read the system prompt | |
sys_prompt_file = "./data/sys_prompt.txt" | |
with open(sys_prompt_file, 'r', encoding='utf-8') as file: | |
sys_prompt = file.read() | |
class LLMGraph: | |
""" | |
A class to interact with LLMs for knowledge graph extraction. | |
""" | |
async def initialize_rag(self, embedding_dimension=3072): | |
""" | |
Initialize the LightRAG instance with the specified embedding dimension. | |
""" | |
if self.rag is None: | |
self.rag = LightRAG( | |
working_dir=WORKING_DIR, | |
llm_model_func=self._llm_model_func, | |
embedding_func=EmbeddingFunc( | |
embedding_dim=embedding_dimension, | |
max_token_size=8192, | |
func=self._embedding_func, | |
), | |
) | |
await self.rag.initialize_storages() | |
await initialize_pipeline_status() | |
# async def test_responses(self): | |
# """ | |
# Test the LLM and embedding functions. | |
# """ | |
# result = await self._llm_model_func("How are you?") | |
# print("Response from llm_model_func: ", result) | |
# result = await self._embedding_func(["How are you?"]) | |
# print("Result of embedding_func: ", result.shape) | |
# print("Dimension of embedding: ", result.shape[1]) | |
# return True | |
def __init__(self): | |
""" | |
Initialize the Phi3InstructGraph with a specified model. | |
""" | |
# Hugging Face Inference API for Phi-3-mini-128k-instruct-graph | |
self.hf_client = InferenceClient( | |
model=HF_API_ENDPOINT, | |
token=HF_API_TOKEN | |
) | |
self.rag = None # Lazy loading of RAG instance | |
def _generate(self, messages): | |
""" | |
Generate a response from the model based on the provided messages. | |
""" | |
# Use the chat_completion method | |
response = self.hf_client.chat_completion( | |
messages=messages, | |
max_tokens=1024, | |
) | |
# Access the generated text | |
generated_text = response.choices[0].message.content | |
return generated_text | |
def _get_messages(self, text): | |
""" | |
Construct the message list for the chat model. | |
""" | |
context = dedent(sys_prompt) | |
user_message = dedent(f"""\n | |
-------Text begin------- | |
{text} | |
-------Text end------- | |
""") | |
messages = [ | |
{ | |
"role": "system", | |
"content": context | |
}, | |
{ | |
"role": "user", | |
"content": user_message | |
} | |
] | |
return messages | |
def extract(self, text, model_name=MODEL_LIST[0]): | |
""" | |
Extract knowledge graph in structured format from text. | |
""" | |
if model_name == MODEL_LIST[0]: | |
# Use Hugging Face Inference API with Phi-3-mini-128k-instruct-graph | |
messages = self._get_messages(text) | |
json_graph = self._generate(messages) | |
return json_graph | |
else: | |
# Use LightRAG with Azure OpenAI | |
self.rag.insert(text) # Insert the text into the RAG storage | |
# Wait for GRAPHML_FILE to be created | |
while not os.path.exists(GRAPHML_FILE): | |
time.sleep(0.1) # Sleep for 0.1 seconds before checking again | |
# Extract dict format of the knowledge graph | |
G = nx.read_graphml(GRAPHML_FILE) | |
# Convert the graph to node-link data format | |
dict_graph = nx.node_link_data(G, edges="edges") | |
return dict_graph | |
async def _llm_model_func(self, prompt, system_prompt=None, history_messages=[], **kwargs) -> str: | |
""" | |
Call the Azure OpenAI chat completion endpoint with the given prompt and optional system prompt and history messages. | |
""" | |
llm_client = AzureOpenAI( | |
api_key=AZURE_OPENAI_API_KEY, | |
api_version=AZURE_OPENAI_API_VERSION, | |
azure_endpoint=AZURE_OPENAI_ENDPOINT, | |
) | |
messages = [] | |
if system_prompt: | |
messages.append({"role": "system", "content": system_prompt}) | |
if history_messages: | |
messages.extend(history_messages) | |
messages.append({"role": "user", "content": prompt}) | |
chat_completion = llm_client.chat.completions.create( | |
model=AZURE_OPENAI_DEPLOYMENT, | |
messages=messages, | |
temperature=kwargs.get("temperature", 0), | |
top_p=kwargs.get("top_p", 1), | |
n=kwargs.get("n", 1), | |
) | |
return chat_completion.choices[0].message.content | |
async def _embedding_func(self, texts: list[str]) -> np.ndarray: | |
""" | |
Call the Azure OpenAI embeddings endpoint with the given texts. | |
""" | |
emb_client = AzureOpenAI( | |
api_key=AZURE_OPENAI_API_KEY, | |
api_version=AZURE_EMBEDDING_API_VERSION, | |
azure_endpoint=AZURE_OPENAI_ENDPOINT, | |
) | |
embedding = emb_client.embeddings.create(model=AZURE_EMBEDDING_DEPLOYMENT, input=texts) | |
embeddings = [item.embedding for item in embedding.data] | |
return np.array(embeddings) | |
# if __name__ == "__main__": | |
# # Initialize the LLMGraph model | |
# model = LLMGraph() | |
# asyncio.run(model.initialize_rag()) # Ensure RAG is initialized | |
# print("LLMGraph model initialized.") | |