from src.services.utils import load_data, stem, set_gemini import requests as r import json import nltk import itertools import numpy as np import requests from datasets import concatenate_datasets from sentence_transformers import * model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') def retrieve_constraints(prompt): request_input = {"models": ["meta-llama/llama-4-scout-17b-16e-instruct"], "messages": [{"role":"user", "content":prompt}]} response = r.post("https://organizedprogrammers-bettergroqinterface.hf.space/chat", json=request_input) decoded_content = json.loads(response.content.decode()) llm_response = decoded_content["content"] print(f"llm response : {llm_response}") start_marker = '{' end_marker = '}' start_index = llm_response.find(start_marker) + len(start_marker) end_index = llm_response.find(end_marker, start_index) json_str = llm_response[start_index:end_index].strip() constraints_json = json.loads("{"+json_str+"}") return constraints_json def remove_over_repeated_technologies(result): total_lists = len(result) tech_title = {} for idx, item in enumerate(result): for tech in item['technologies']: tech_title[tech[0]['name']] = 0 if tech[0]['name'] not in tech_title else tech_title[tech[0]['name']] + 1 threshold = total_lists * 0.3 print(threshold) print(tech_title) to_delete = [] for tech, lists in tech_title.items(): if lists > threshold: print(f"This technology have been found over repeated : " + tech) to_delete.append(tech) for idx, item in enumerate(result): result[idx]['technologies'] = [tech for tech in item['technologies'] if tech[0]['name'] not in to_delete] return result def get_contrastive_similarities(constraints, dataset): selected_pairs = [] matrix = [] constraint_descriptions = [c["description"] for c in constraints] constraint_embeddings = model.encode(constraint_descriptions, show_progress_bar=False) for i, constraint in enumerate(constraints): constraint_embedding = constraint_embeddings[i] constraint_matrix = [] for j, row in enumerate(dataset): tech_embedding = row["embeddings"] purpose_sim = model.similarity(constraint_embedding, tech_embedding) if np.isnan(purpose_sim): purpose_sim = 0.0 selected_pairs.append({ "constraint": constraint, "id2": j, "similarity": purpose_sim }) constraint_matrix.append(purpose_sim) matrix.append(constraint_matrix) return selected_pairs, matrix def find_best_list_combinations(list1: list[str], list2: list[str], matrix) -> list[dict]: if not list1 or not list2: print("Warning: One or both input lists are empty. Returning an empty list.") return [] print(list2) MIN_SIMILARITY = 0.3 MAX_SIMILARITY = 0.8 possible_matches_for_each_l1 = [] for i, row_i in enumerate(list1): valid_matches_for_l1_element = [] for j, row_j in enumerate(list2): score = matrix[i][j] if MIN_SIMILARITY <= score <= MAX_SIMILARITY: data = row_j del data["embeddings"] data["id"] = j valid_matches_for_l1_element.append((data, score)) if not valid_matches_for_l1_element: print(f"No valid matches found in list2 for '{row_i}' from list1 " f"(score between {MIN_SIMILARITY} and {MAX_SIMILARITY}). " "Returning an empty list as no complete combinations can be formed.") else: possible_matches_for_each_l1.append((valid_matches_for_l1_element, row_i)) result = [] for tech_list, problem in possible_matches_for_each_l1: sorted_list = sorted( tech_list, key=lambda x: x[1].item() if hasattr(x[1], 'item') else float(x[1]), reverse=True ) top5 = sorted_list[:5] result.append({ 'technologies': top5, 'problem': problem }) result = remove_over_repeated_technologies(result) return result def search_technology_by_name(user_input, dataset): url = "https://heymenn-search-technologies-api.hf.space/search-technologies" headers = { "accept": "application/json", "Content-Type": "application/json" } results = [] for input in user_input: payload = { "title": input, "type": "title" } response = requests.post(url, headers=headers, json=payload) print(response.json()) results.append(response.json()) technologies = [] for result in results: technology = dataset.filter(lambda row: row["name"] == result["title"]) technologies.append(technology) combined_dataset = concatenate_datasets(technologies) return combined_dataset def select_technologies(problem_technology_list, forced_technology_list=[]): distinct_techs = set() candidate_map = [] if len(forced_technology_list) == 0: for problem_data in forced_technology_list: cand_dict = {} for tech_info, sim in problem_data['technologies']: tech_id = tech_info['id'] distinct_techs.add(tech_id) cand_dict[tech_id] = float(sim) for problem_data in problem_technology_list: cand_dict = {} for tech_info, sim in problem_data['technologies']: tech_id = tech_info['id'] distinct_techs.add(tech_id) cand_dict[tech_id] = float(sim) if cand_dict not in candidate_map: candidate_map.append(cand_dict) distinct_techs = sorted(list(distinct_techs)) n = len(problem_technology_list) if n == 0: return set() min_k = None best_set = None best_avg = -1 print(f"Distinct technologies: {distinct_techs}") print(f"Candidate map: {candidate_map}") print(f"Number of problems: {n}") for k in range(1, len(distinct_techs)+1): if min_k is not None and k > min_k: break for T in itertools.combinations(distinct_techs, k): total_sim = 0.0 covered = True for i in range(n): max_sim = -1.0 found = False for tech in T: if tech in candidate_map[i]: found = True sim_val = candidate_map[i][tech] if sim_val > max_sim: max_sim = sim_val if not found: covered = False break else: total_sim += max_sim if covered: avg_sim = total_sim / n if min_k is None or k < min_k: min_k = k best_set = T best_avg = avg_sim elif k == min_k and avg_sim > best_avg: best_set = T best_avg = avg_sim if min_k is not None and k == min_k: break if best_set is None: return set() return set(best_set) def load_titles(techno, data_type): if data_type == "pydantic": technology_titles = [tech.name for tech in techno] else: # data_type == "dict" technologies = techno["technologies"] technology_titles = [tech["name"] for tech in technologies] return technology_titles def search_prior_art(technologies_input: list, data: str, data_type: str, techno_type: str) -> json: """ Searches for prior art patents online that solve a given technical problem using a set of specified technologies, leveraging the Gemini model's search capabilities. """ technology_titles = load_titles(technologies_input, techno_type) if data_type == "problem": prompt = f"Find prior art patents or research paper online that address the technical problem: '{data}'. " \ elif data_type == "constraints": prompt = f"Find prior art patents or research paper online that address those constraints: '{data}'. " \ prompt += f"Using any combination of the following technologies: {', '.join(technology_titles)}. " \ f"Specifically look for patents that integrate multiple of these technologies." \ f"Indicate for each document found what technologies is used inside of it from the provided list" \ f"Indicate for each document the solution, then the twist of this solution," \ f"What makes it different from all the other existing solutions." \ f"Output only one sentence for the solution and the twist." \ client,config = set_gemini() response = client.models.generate_content( model="gemini-2.5-flash", contents=prompt, config=config, ) return response def add_citations_and_collect_uris(response): try: print(response) text = response.text supports = response.candidates[0].grounding_metadata.grounding_supports chunks = response.candidates[0].grounding_metadata.grounding_chunks sorted_supports = sorted(supports, key=lambda s: s.segment.end_index, reverse=True) uris_added = set() for support in sorted_supports: end_index = support.segment.end_index if support.grounding_chunk_indices: citation_links = [] for i in support.grounding_chunk_indices: if i < len(chunks): uri = chunks[i].web.uri if uri not in text and uri not in uris_added: citation_links.append(f"[{i + 1}]({uri})") uris_added.add(uri) if citation_links: citation_string = ", ".join(citation_links) text = text[:end_index] + citation_string + text[end_index:] return {"content": text,"uris": list(uris_added)} except Exception as e: print(f"Error : {e}") return {"content": e, "uris": []}