Spaces:
Running
Running
from typing import List, Optional, Callable, Any | |
import logging | |
import re | |
from thefuzz import fuzz | |
from langchain.output_parsers.openai_tools import JsonOutputToolsParser | |
from langchain_core.runnables import RunnableSequence | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_core.language_models.llms import LLM | |
from langchain_core.messages import AIMessage | |
from langgraph.constants import END | |
from pydantic import BaseModel, Field | |
from ask_candid.agents.schema import AgentState | |
from ask_candid.services.org_search import OrgSearch | |
search = OrgSearch() | |
logging.basicConfig(format="[%(levelname)s] (%(asctime)s) :: %(message)s") | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
class OrganizationNames(BaseModel): | |
"""List of names of social-sector organizations, such as nonprofits and foundations.""" | |
orgnames: List[str] = Field(description="List of organization names") | |
def extract_org_links_from_chatbot(chatbot_output: str, llm: LLM): | |
""" | |
Extracts a list of organization names from the provided text. | |
Args: | |
chatbot_output (str):The chatbot output containing organization names and other content. | |
Returns: | |
list: A list of organization names extracted from the text. | |
Raises: | |
ValueError: If parsing fails or if an unexpected output format is received. | |
""" | |
prompt = """Extract only the names of officially recognized organizations, foundations, and government entities | |
from the text below. Do not include any entries that contain descriptions, regional identifiers, or explanations | |
within parentheses or following the name. Strictly exclude databases, resources, crowdfunding platforms, and general | |
terms. Provide the output only in the specified JSON format. | |
input text below: | |
```{chatbot_output}`` | |
output format: | |
{{ | |
'orgnames' : [list of organization names without any additional descriptions or identifiers] | |
}} | |
""" | |
try: | |
parser = JsonOutputToolsParser() | |
model = llm.bind_tools([OrganizationNames]) | |
prompt = ChatPromptTemplate.from_template(prompt) | |
chain = RunnableSequence(prompt, model, parser) | |
# Run the chain with the input data | |
result = chain.invoke({"chatbot_output": chatbot_output}) | |
# Extract the organization names from the output | |
output_list = result[0]["args"].get("orgnames", []) | |
# Validate output format | |
if not isinstance(output_list, list): | |
raise ValueError("Unexpected output format: 'orgnames' should be a list") | |
return output_list | |
except Exception as e: | |
# Log or print the error as needed for debugging | |
print(f"text does not have any organization: {e}") | |
return [] | |
def is_similar(name: str, list_of_dict: list, threshold: int = 80): | |
""" | |
Returns True if `name` is similar to any names in `list_of_dict` based on a similarity threshold. | |
""" | |
try: | |
for item in list_of_dict: | |
try: | |
# Attempt to calculate similarity score | |
similarity = fuzz.ratio(name.lower(), item["name"].lower()) | |
if similarity >= threshold: | |
return True | |
except KeyError: | |
# Handle cases where 'name' key might be missing in dictionary | |
print(f"KeyError: Missing 'name' key in dictionary item {item}") | |
continue | |
except AttributeError: | |
# Handle non-string name values in dictionary items | |
print(f"AttributeError: Non-string 'name' in dictionary item {item}") | |
continue | |
except TypeError as e: | |
# Handle cases where input types are incorrect | |
print(f"TypeError: {e}") | |
return False | |
return False | |
def generate_org_link_dict(org_names_list: list): | |
""" | |
Maps organization names to their Candid profile URLs if available. | |
For each organization in `output_list`, this function attempts to retrieve a matching profile | |
using `search_org`. If a similar name is found and a Candid entity ID is available, it constructs | |
a profile URL. If no ID or similar match is found, or if an error occurs, it assigns an empty string. | |
Args: | |
output_list (list): List of organization names (str) to retrieve Candid profile links for. | |
Returns: | |
dict: Dictionary with organization names as keys and Candid profile URLs or empty strings as values. | |
Example: | |
get_org_link(['New York-Presbyterian Hospital']) | |
# {'New York-Presbyterian Hospital': 'https://app.candid.org/profile/6915255'} | |
""" | |
link_dict = {} | |
for org in org_names_list: | |
try: | |
# Attempt to retrieve organization data | |
response = search(org, name_only=True) | |
# Check if there is a valid response and if names are similar | |
if response and is_similar(org, response[0].get("names", "")): | |
# Try to get the Candid entity ID and construct the URL | |
candid_entity_id = response[0].get("candid_entity_id") | |
if candid_entity_id: | |
link_dict[org] = ( | |
f"https://app.candid.org/profile/{candid_entity_id}" | |
) | |
else: | |
link_dict[org] = "" # No ID found, set empty string | |
else: | |
link_dict[org] = "" # No similar match found | |
except KeyError as e: | |
# Handle missing keys in the response dictionary | |
print(f"KeyError encountered for organization '{org}': {e}") | |
link_dict[org] = "" | |
except Exception as e: | |
# Catch any other unexpected errors | |
print(f"An error occurred for organization '{org}': {e}") | |
link_dict[org] = "" | |
return link_dict | |
def embed_org_links_in_text(input_text: str, org_link_dict: dict): | |
""" | |
Replaces organization names in `text` with links from `link_dict` and appends a Candid info message. | |
Args: | |
text (str): The text containing organization names. | |
link_dict (dict): Mapping of organization names to URLs. | |
Returns: | |
str: Updated text with linked organization names and an appended Candid message. | |
""" | |
try: | |
for org_name, url in org_link_dict.items(): | |
if url: # Only proceed if the URL is not empty | |
regex_pattern = re.compile(re.escape(org_name)) | |
input_text = regex_pattern.sub( | |
repl=f"<a href={url} target='_blank' rel='noreferrer' class='candid-org-link'>{org_name}</a>", | |
string=input_text | |
) | |
# Append Candid information message at the end | |
input_text += ( | |
"<p class='candid-app-link'> " | |
"Visit <a href=https://app.candid.org/ target='_blank' rel='noreferrer' class='candid-org-link'>Candid</a> " | |
"to get nonprofit information you need.</p>" | |
) | |
except TypeError as e: | |
print(f"TypeError encountered: {e}") | |
return input_text | |
except re.error as e: | |
print(f"Regex error encountered for '{org_name}': {e}") | |
return input_text | |
except Exception as e: | |
print(f"Unexpected error: {e}") | |
return input_text | |
return input_text | |
def has_org_name( | |
state: AgentState, | |
llm: LLM, | |
user_callback: Optional[Callable[[str], Any]] = None | |
) -> AgentState: | |
"""Processes the latest message to extract organization links and determine the next step. | |
Parameters | |
---------- | |
state : AgentState | |
The current state of the agent, including a list of messages. | |
llm : LLM | |
user_callback : Optional[Callable[[str], Any]], optional | |
Optional UI callback to inform the user of apps states, by default None | |
Returns | |
------- | |
AgentState | |
""" | |
logger.info("---HAS ORG NAMES?---") | |
if user_callback is not None: | |
try: | |
user_callback("Checking for relevant organizations") | |
except Exception as ex: | |
logger.warning("User callback was passed in but failed: %s", ex) | |
messages = state["messages"] | |
last_message = messages[-1].content | |
output_list = extract_org_links_from_chatbot(last_message, llm=llm) | |
link_dict = generate_org_link_dict(output_list) if output_list else {} | |
if link_dict: | |
logger.info("---FOUND ORG NAMES---") | |
return {"next": "insert_org_link", "org_dict": link_dict} | |
logger.info("---NO ORG NAMES FOUND---") | |
return {"next": END, "messages": messages} | |
def insert_org_link(state: AgentState) -> AgentState: | |
""" | |
Embeds organization links in the latest message content and returns it as an AI message. | |
Args: | |
state (dict): The current state, including the organization links and latest message. | |
Returns: | |
dict: A dictionary with the updated message content as an AIMessage. | |
""" | |
logger.info("---INSERT ORG LINKS---") | |
messages = state["messages"] | |
last_message = messages[-1].content | |
messages.pop(-1) # Deleting the original message because we will append the same one but with links | |
link_dict = state["org_dict"] | |
last_message = embed_org_links_in_text(last_message, link_dict) | |
return {"messages": [AIMessage(content=last_message)]} | |