ask-candid / ask_candid /tools /elastic /index_details_tool.py
brainsqueeze's picture
Adding optional news data source
bea5044 verified
raw
history blame
2.47 kB
from typing import Type, Optional
import logging
from pydantic import BaseModel, Field
from elasticsearch import Elasticsearch
from langchain.callbacks.manager import (
AsyncCallbackManagerForToolRun,
CallbackManagerForToolRun,
)
from langchain.tools.base import BaseTool
from ask_candid.base.config.connections import SEMANTIC_ELASTIC_QA
logging.basicConfig(level="INFO")
logger = logging.getLogger("elasticsearch_playground")
es = Elasticsearch(
cloud_id=SEMANTIC_ELASTIC_QA.cloud_id,
api_key=SEMANTIC_ELASTIC_QA.api_key,
verify_certs=True,
request_timeout=60 * 3,
)
class IndexDetailsInput(BaseModel):
"""Input for the list index details tool."""
index_name: str = Field(
...,
description="The name of the index for which the details are to be retrieved",
)
class IndexDetailsTool(BaseTool):
"""Tool for getting information about a single ElasticSearch index."""
name: str = "elastic_index_show_details" # Added type annotation
description: str = (
"Input is an index name, output is a JSON-based string with the aliases, mappings containing the field names, and settings of an index."
)
args_schema: Optional[Type[BaseModel]] = (
IndexDetailsInput # Ensure this is above the methods
)
def _run(
self,
index_name: str,
run_manager: Optional[CallbackManagerForToolRun] = None,
) -> str:
"""Get information about a single Elasticsearch index."""
try:
# Ensure that `es` is correctly initialized before calling this method
alias = es.indices.get_alias(index=index_name)
field_mappings = es.indices.get_field_mapping(index=index_name, fields="*")
field_settings = es.indices.get_settings(index=index_name)
return str(
{
"alias": alias[index_name],
"field_mappings": field_mappings[index_name],
"settings": field_settings[index_name],
}
)
except Exception as e:
logger.exception(
"Could not fetch index information for %s: %s", index_name, e
)
return ""
async def _arun(
self,
index_name: str = "",
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
) -> str:
raise NotImplementedError("IndexDetailsTool does not support async operations")