from typing import List, Dict, Any, Optional import re import os import numpy as np from langchain_huggingface.embeddings import HuggingFaceEndpointEmbeddings from langchain_core.documents import Document from ...socialspark.retrieval_qa.elastic import ElasticHitsResult, ElasticsearchQABase from .elastic import query_builder from .config import ALL_INDICES timestamp_pattern = re.compile(r"^(\d:\d{2}:\d{2}\.\d{3}),(\d:\d{2}:\d{2}\.\d{3})\n(.*)$") def parse_youtube_captions(caption_text: str) -> List[str]: timestamp_blocks = re.split(r'\n{2}', caption_text) parsed_blocks = [m.groups() for block in timestamp_blocks if (m:=timestamp_pattern.match(block)) if not None] video_length_mins = int(parsed_blocks[-1][0].split(':')[1]) # what is the block timing in minutes which gives 10 chunks? keep minimum 1 minute breaks to maintain context minute_difference = min(video_length_mins // 10, 1) prev_minute_start = 0 texts = [] text_block = [] for start, _, text in parsed_blocks: text_block.append(text) current_minute = int(start.split(':')[1]) if (current_minute - prev_minute_start) >= minute_difference: texts.append(' '.join(text_block)) text_block.clear() prev_minute_start = current_minute return texts def parse_candid_learning(text: str) -> List[str]: texts = [] for block in map(str.strip, re.split(r'\n{1,}', text, flags=re.I | re.M)): if ( 'back to top' in block.lower() or 'table of contents' in block.lower() ): continue texts.append(block) return texts class ElasticsearchQA(ElasticsearchQABase): indices: Optional[List[str]] = ALL_INDICES embedding: Any = HuggingFaceEndpointEmbeddings( huggingfacehub_api_token=os.getenv("HF_API_KEY"), # model="sentence-transformers/all-mpnet-base-v2" model="mixedbread-ai/mxbai-embed-large-v1" ) def build_query(self, query: str, **kwargs) -> List[Dict[str, Any]]: queries = query_builder(query=query, indices=self.indices) return queries def sub_section_alignment(self, query: str, document: List[str]) -> str: question_vector = np.array(self.embedding.embed_query(query), dtype='float32') vectors = np.array(self.embedding.embed_documents(document), dtype='float32') vectors = np.array(vectors, dtype='float32') size = max(5, int(0.1 * len(document))) doc_vector = vectors.sum(axis=0, keepdims=True) vectors /= np.linalg.norm(vectors, ord=2.0, axis=-1, keepdims=True) doc_vector /= np.linalg.norm(doc_vector, ord=2.0, axis=-1, keepdims=True) question_vector /= np.linalg.norm(question_vector, ord=2.0, axis=-1, keepdims=True) # similarity = (doc_vector * vectors).sum(axis=-1) similarity = (question_vector * vectors).sum(axis=-1) return '\n'.join(text for text, _ in sorted(zip(document, similarity), key=lambda x: 1 - x[-1])[:size]) def process_hit(self, hit: ElasticHitsResult, q: str) -> Document | None: if "news" in hit.index: doc = Document( page_content='\n\n'.join(v for k, v in hit.source["texts"].items() if v), metadata={ "source": "news", "source_id": hit.source['metadata']['link'] } ) elif "transactions" in hit.index: doc = Document( page_content='\n\n'.join(v for k, v in hit.source["semantic_texts"].items() if v), metadata={ "source": "cds-transactions", "source_id": hit.source["id"] } ) elif "organizations" in hit.index: source = hit.source org_gen = source.get("combined_organization_description_general", "") org_fin = source.get("combined_organization_description_financial", "") org_contact = source.get("combined_organization_description_contacts", "") mission = "" if source.get("mission_statement", None) is not None: mission = source.get("mission_statement", "") keyword = "" if source.get("keyword", None) is not None: keyword = source.get("keyword", "") programs = source.get("programs", "") doc = Document( page_content='\n\n'.join([org_gen, mission, keyword, org_fin, org_contact, programs]), metadata={ "source": "UP-organizations-QA", "source_id": hit.source["candid_entity_id"] } ) # elif "issuelab" in hit.index: # doc = Document( # page_content='\n\n'.join(v for k, v in hit.source["semantic_texts"].items() if v), # metadata={ # "source": "IssueLab", # "source_id": hit.source["resource_id"] # } # ) elif "issuelab-elser" in hit.index: title = hit.source.get("title", "") description = hit.source.get("description", "") doc = Document( page_content='\n\n'.join([title, description]), metadata={ "source": "IssueLab", "source_id": hit.source["resource_id"], "url": hit.source.get("permalink", "") } ) elif "youtube" in hit.index: title = hit.source.get("title", "") summary = self.sub_section_alignment( query=q, document=parse_youtube_captions(hit.source.get("text")) ) doc = Document( # page_content='\n\n'.join([title]), page_content=summary, metadata={ "source": "Candid's Youtube channel", "source_id": hit.source['video_id'], "url": f"https://www.youtube.com/watch?v={hit.source['video_id']}" } ) elif "candid-blog" in hit.index: excerpt = hit.source.get("excerpt", "") title = hit.source.get("title", "") doc = Document( page_content='\n\n'.join([title, excerpt]), metadata={ "source": "Candid Blog", "source_id": hit.source["id"], "url": hit.source["link"] } ) elif "candid-learning" in hit.index: # content = hit.source.get("content", "") title = hit.source.get("title", "") summary = self.sub_section_alignment( query=q, document=parse_candid_learning(hit.source.get("content", "")) ) doc = Document( # page_content='\n\n'.join([title]), page_content=summary, metadata={ "source": "Candid Learning", "source_id": hit.source["post_id"], "url": hit.source.get("url", "") } ) elif "candid-help" in hit.index: title = hit.source.get("title", "") content = hit.source.get("content", "") doc = Document( page_content='\n\n'.join([title, content]), metadata={ "source": "Candid Help", "source_id": hit.source["id"], "url": hit.source.get("link", "") } ) else: doc = None return doc