|
import os |
|
import json |
|
import asyncio |
|
from enum import Enum |
|
import platform |
|
import re |
|
import subprocess |
|
import tempfile |
|
import xml.etree.ElementTree as ET |
|
import zipfile |
|
from typing import Any, Dict, List, Tuple |
|
|
|
import gradio as gr |
|
import numpy as np |
|
import torch |
|
import torchvision.transforms.functional as TF |
|
import trimesh |
|
import ast |
|
from agents import Agent, Runner, function_tool |
|
from llama_index.embeddings.clip import ClipEmbedding |
|
from llama_index.embeddings.openai import OpenAIEmbedding, OpenAIEmbeddingMode |
|
from loguru import logger |
|
from PIL import Image |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
from torch import Tensor |
|
|
|
from llm_service import LLMService |
|
from mv_utils_zs import Realistic_Projection |
|
from onshape.onshape_translation import OnshapeTranslation |
|
from onshape.onshape_download import OnshapeDownload |
|
|
|
os.environ.get("GRADIO_TEMP_DIR", "gradio_cache") |
|
os_name = platform.system() |
|
|
|
if os_name == "Linux": |
|
print("Running on Linux") |
|
elif os_name == "Darwin": |
|
print("Running on macOS") |
|
else: |
|
print(f"Running on an unsupported OS: {os_name}") |
|
|
|
|
|
GRADIO_3D_MODEL_DEFAULT_FORMAT = [".obj", ".glb", ".gltf", ".stl", ".splat", ".ply"] |
|
USER_REQUIRE_FORMAT = [".3dxml", ".step"] |
|
FREECAD_LOW_LEVEL_FORMAT = [".step", ".igs", ".iges", ".stp"] |
|
ONSHAPE_SUPPORTED_FORMAT = [".prt", ".asm", ".jt"] |
|
FREECAD_NATIVE_FORMAT = [".fcstd"] |
|
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") |
|
SIMILARITY_SCORE_THRESHOLD = 0.7 |
|
|
|
|
|
llm_service = LLMService.from_partner() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_step_to_obj_with_freecad(step_path, obj_path): |
|
|
|
global os_name |
|
if os_name == "Linux": |
|
freecad_executable = "/usr/bin/freecadcmd" |
|
elif os_name == "Darwin": |
|
freecad_executable = "/Applications/FreeCAD.app/Contents/MacOS/FreeCAD" |
|
else: |
|
raise Exception("Unsupported OS for FreeCAD execution: " + os_name) |
|
|
|
_, ext = os.path.splitext(step_path) |
|
ext = ext.lower() |
|
if ext in FREECAD_LOW_LEVEL_FORMAT: |
|
python_script = """ |
|
import FreeCAD |
|
import Part |
|
import Mesh |
|
|
|
doc = FreeCAD.newDocument() |
|
shape = Part.read("{step_path}") |
|
obj = doc.addObject("Part::Feature", "MyPart") |
|
obj.Shape = shape |
|
doc.recompute() |
|
|
|
Mesh.export([obj], "{obj_path}") |
|
""".format(step_path=step_path, obj_path=obj_path) |
|
elif ext in FREECAD_NATIVE_FORMAT: |
|
python_script = """ |
|
import FreeCAD |
|
import Part |
|
import Mesh |
|
|
|
doc = FreeCAD.open("{step_path}") |
|
to_export = [o for o in doc.Objects if hasattr(o, 'Shape')] |
|
Mesh.export(to_export, "{obj_path}") |
|
""".format(step_path=step_path, obj_path=obj_path) |
|
else: |
|
logger.error(f"Not support {ext} format") |
|
raise Exception(f"Not support {ext} format") |
|
|
|
|
|
command = [freecad_executable, "-c", python_script] |
|
|
|
|
|
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
|
|
|
stdout, stderr = process.communicate() |
|
return stdout.decode(), stderr.decode() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def convert_to_obj(file: str) -> str: |
|
if file is None: |
|
return None |
|
logger.info(f"Converting {file} to .obj") |
|
response_path = file |
|
prefix_path, ext = os.path.splitext(file) |
|
ext = ext.lower() |
|
if ext in FREECAD_LOW_LEVEL_FORMAT + FREECAD_NATIVE_FORMAT: |
|
response_path = prefix_path + ".obj" |
|
if not os.path.exists(response_path): |
|
convert_step_to_obj_with_freecad(file, response_path) |
|
return response_path |
|
elif ext in GRADIO_3D_MODEL_DEFAULT_FORMAT: |
|
return response_path |
|
else: |
|
logger.warning(f"Do nothing at convert_to_obj with file {file}") |
|
raise Exception(f"Do nothing at convert_to_obj with file {file}") |
|
|
|
|
|
async def onshape_converter( |
|
input_file_path: str, |
|
output_file: str | None = None, |
|
did: str = "ef42d7639096f3e61a4d4f07", |
|
wid: str = "5fcd0f25ce3dee08bbb823bf", |
|
format_name: str = "STEP", |
|
) -> Dict: |
|
""" |
|
Convert proprietary 3D file to open-source format using the Onshape API. |
|
""" |
|
file_path = input_file_path |
|
|
|
|
|
translator = OnshapeTranslation(did, wid, file_path, format_name) |
|
response = translator.upload_and_translate() |
|
|
|
|
|
response = translator.get_translation_status(response.id) |
|
while response.request_state not in ["DONE", "FAILED"]: |
|
logger.info( |
|
f"Waiting for translation to complete. Current state: {response.request_state}" |
|
) |
|
response = translator.get_translation_status(response.id) |
|
await asyncio.sleep(6) |
|
logger.success(f"Translation completed with state: {response.request_state}") |
|
|
|
|
|
if response.request_state == "FAILED": |
|
logger.error(f"Translation failed: {response.failure_reason}") |
|
raise gr.Error(f"Translation failed: {response.failure_reason}") |
|
|
|
|
|
|
|
assert ( |
|
response.result_element_ids is not None and len(response.result_element_ids) > 0 |
|
), "No result element IDs found in translation response" |
|
eid = response.result_element_ids[0] |
|
prefix_path, ext = os.path.splitext(file_path) |
|
if output_file is None: |
|
output_file = f"{prefix_path}_{eid}.{format_name.lower()}" |
|
downloader = OnshapeDownload(did, wid, eid, output_file) |
|
downloader.download() |
|
|
|
return { |
|
"eid": eid, |
|
"output_file": output_file, |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_geometric_features(obj_path: str) -> Dict[str, Any]: |
|
try: |
|
mesh = trimesh.load(obj_path) |
|
|
|
|
|
volume = getattr(mesh, "volume", None) |
|
surface_area = getattr(mesh, "area", None) |
|
|
|
|
|
min_corner, max_corner = mesh.bounds |
|
width, height, depth = max_corner - min_corner |
|
|
|
features = { |
|
"Volume": volume, |
|
"Surface_Area": surface_area, |
|
"Width": width, |
|
"Height": height, |
|
"Depth": depth, |
|
|
|
|
|
|
|
} |
|
|
|
return features |
|
|
|
except Exception as e: |
|
print(f"Error reading file {obj_path}: {e}") |
|
return {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def reformat_and_return_top_k_results( |
|
private_model_paths: List[str], |
|
model_names: List[str], |
|
similarity_scores: List[float | None], |
|
top_k: int = 4, |
|
): |
|
"""Reformat the results to return a list of private model paths, model names, and similarity scores. |
|
Args: |
|
private_model_paths (List[str]): List of private model paths. |
|
model_names (List[str]): List of model names. |
|
similarity_scores (List[float | None]): List of similarity scores. |
|
top_k (int): Number of top results to return. |
|
Returns: |
|
List: A list containing private model paths, model names, and similarity scores. |
|
""" |
|
assert ( |
|
len(private_model_paths) == len(model_names) == len(similarity_scores) |
|
), "Length mismatch in similarity search results" |
|
|
|
result = private_model_paths + [None] * ( |
|
top_k - len(private_model_paths) |
|
) |
|
result += model_names + [""] * ( |
|
top_k - len(model_names) |
|
) |
|
result += [ |
|
"Score: " if not isinstance(score, float) else f"Score: {score:.4f}" |
|
for score in similarity_scores |
|
] + [""] * ( |
|
top_k - len(similarity_scores) |
|
) |
|
|
|
logger.info( |
|
f"Found {len(model_names)} similar objects for the query. They are: {model_names}" |
|
) |
|
return result |
|
|
|
|
|
def search_3D_similarity(filepath: str | None, embedding_dict: dict, top_k: int = 4): |
|
if filepath is None: |
|
raise gr.Error("Please select a file!") |
|
if len(embedding_dict) < 2: |
|
raise gr.Error("Require at least two 3D files to search similarity") |
|
if ( |
|
filepath not in embedding_dict |
|
or "image_embedding" not in embedding_dict[filepath] |
|
): |
|
raise ValueError(f"No embedding found for {filepath}") |
|
|
|
features1 = np.array(embedding_dict[filepath]["image_embedding"]).reshape(1, -1) |
|
|
|
|
|
valid_items = [ |
|
(fp, data["image_embedding"]) |
|
for fp, data in embedding_dict.items() |
|
if "image_embedding" in data and fp != filepath |
|
] |
|
filepaths = [fp for fp, _ in valid_items] |
|
feature_matrix = np.array([feat for _, feat in valid_items]) |
|
similarities = cosine_similarity(features1, feature_matrix)[0] |
|
scores = list(zip(filepaths, similarities)) |
|
|
|
|
|
scores.sort(key=lambda x: x[1], reverse=True) |
|
scores = list( |
|
filter(lambda x: x[1] > SIMILARITY_SCORE_THRESHOLD, scores) |
|
) |
|
|
|
|
|
return reformat_and_return_top_k_results( |
|
private_model_paths=[x[0] for x in scores[:top_k]], |
|
model_names=[os.path.basename(x[0]) for x in scores[:top_k]], |
|
similarity_scores=[x[1] for x in scores[:top_k]], |
|
top_k=top_k, |
|
) |
|
|
|
|
|
|
|
|
|
|
|
class Query3DObjectMethod(Enum): |
|
HYBRID_SEARCH = "hybrid_search" |
|
SEMANTIC_SEARCH = "semantic_search" |
|
|
|
|
|
async def query_3D_object( |
|
query: str, |
|
current_obj_path: str, |
|
embedding_dict: dict, |
|
top_k: int = 4, |
|
method: Query3DObjectMethod = Query3DObjectMethod.HYBRID_SEARCH, |
|
) -> List: |
|
if query == "": |
|
raise gr.Error("Query cannot be empty!") |
|
|
|
|
|
if method == Query3DObjectMethod.HYBRID_SEARCH: |
|
logger.info("Running query_3D_object_by_hybrid_search_method") |
|
result = await query_3D_object_by_hybrid_search_method( |
|
query, current_obj_path, embedding_dict, top_k |
|
) |
|
response = result.get( |
|
"final_output", |
|
f"Here are the top-{top_k} results for your query: `{query}`", |
|
) |
|
tripplet = result.get("tripplet", []) |
|
elif method == Query3DObjectMethod.SEMANTIC_SEARCH: |
|
logger.info("Running query_3D_object_by_semantic_search_method") |
|
tripplet = query_3D_object_by_semantic_search_method( |
|
query, current_obj_path, embedding_dict, top_k |
|
) |
|
response = f"Here are the top-{top_k} results for your query: `{query}`" |
|
else: |
|
raise Exception( |
|
f"Unsupported query method: {method}. Supported methods are: {list(Query3DObjectMethod)}" |
|
) |
|
assert len(tripplet) == 3 * top_k |
|
return [response] + tripplet |
|
|
|
|
|
def query_3D_object_by_semantic_search_method( |
|
query: str, current_obj_path: str, embedding_dict: dict, top_k: int = 4 |
|
) -> List: |
|
features1 = np.array(text_embedding_model.get_text_embedding(text=query)).reshape( |
|
1, -1 |
|
) |
|
|
|
valid_items = [ |
|
(fp, data["text_embedding"]) |
|
for fp, data in embedding_dict.items() |
|
if "text_embedding" in data |
|
] |
|
filepaths = [fp for fp, _ in valid_items] |
|
feature_matrix = np.array([feat for _, feat in valid_items]) |
|
similarities = cosine_similarity(features1, feature_matrix)[0] |
|
scores = list(zip(filepaths, similarities)) |
|
|
|
|
|
scores.sort(key=lambda x: x[1], reverse=True) |
|
scores = list( |
|
filter(lambda x: x[1] > SIMILARITY_SCORE_THRESHOLD, scores) |
|
) |
|
|
|
|
|
return reformat_and_return_top_k_results( |
|
private_model_paths=[x[0] for x in scores[:top_k]], |
|
model_names=[os.path.basename(x[0]) for x in scores[:top_k]], |
|
similarity_scores=[x[1] for x in scores[:top_k]], |
|
top_k=top_k, |
|
) |
|
|
|
|
|
async def query_3D_object_by_hybrid_search_method( |
|
query: str, current_obj_path: str, embedding_dict: dict, top_k: int = 4 |
|
) -> Dict: |
|
|
|
@function_tool |
|
def query_3D_object_by_keyword_search(query: str, match_code: str, top_k: int = 4): |
|
logger.info("Datum Agent is running query_3D_object_by_keyword_search") |
|
logger.info(f"The 'match' function is:\n```\n{match_code}\n```") |
|
|
|
|
|
exec_globals = {} |
|
try: |
|
exec(match_code, exec_globals) |
|
match = exec_globals[ |
|
"match" |
|
] |
|
assert ( |
|
"def match(metadata: dict) -> bool:" in match_code |
|
), "The match function is not defined correctly." |
|
except Exception: |
|
raise gr.Error( |
|
"Your query did not generate a valid match function. Try your query again." |
|
) |
|
matched_obj_paths = list( |
|
filter( |
|
lambda obj_path: match(embedding_dict[obj_path]["metadata_dictionary"]), |
|
embedding_dict, |
|
) |
|
) |
|
logger.info( |
|
f"Found {len(matched_obj_paths)} matching objects for the query `{query}`:\n```{matched_obj_paths}```" |
|
) |
|
|
|
|
|
return reformat_and_return_top_k_results( |
|
private_model_paths=[x for x in matched_obj_paths[:top_k]], |
|
model_names=[os.path.basename(x) for x in matched_obj_paths[:top_k]], |
|
similarity_scores=[None] * len(matched_obj_paths[:top_k]), |
|
top_k=top_k, |
|
) |
|
|
|
METADATA_SCHEMA = """Schema of metadata_dictionary: |
|
- Volume: float |
|
- Surface_Area: float |
|
- Width: float |
|
- Height: float |
|
- Depth: float |
|
- Description: str |
|
- Description_Level: int |
|
- FileName: str |
|
- Created: str |
|
- Authors: str |
|
- Organizations: str |
|
- Preprocessor: str |
|
- OriginatingSystem: str |
|
- Authorization: str |
|
- Schema: str |
|
""" |
|
|
|
QUERY_EXAMPLES = """Examples of natural language queries and their intended matching logic: |
|
|
|
### Example 1: "width greater than 7" |
|
```python |
|
def match(metadata: dict) -> bool: |
|
try: |
|
return float(metadata.get("Width", 0)) > 7 |
|
except: |
|
return False |
|
```` |
|
|
|
### Example 2: "description contains STEP" |
|
|
|
```python |
|
def match(metadata: dict) -> bool: |
|
return "step" in str(metadata.get("Description", "")).lower() |
|
``` |
|
|
|
### Example 3: "originating system is ASCON Math Kernel" |
|
|
|
```python |
|
def match(metadata: dict) -> bool: |
|
return str(metadata.get("OriginatingSystem", "")).lower() == "ascon math kernel" |
|
``` |
|
|
|
### Example 4: "volume < 200 and surface area > 300" |
|
|
|
```python |
|
def match(metadata: dict) -> bool: |
|
try: |
|
return float(metadata.get("Volume", 0)) < 200 and float(metadata.get("Surface_Area", 0)) > 300 |
|
except: |
|
return False |
|
``` |
|
|
|
### Example 5: "schema contains 214" |
|
|
|
```python |
|
def match(metadata: dict) -> bool: |
|
return "214" in str(metadata.get("Schema", "")) |
|
``` |
|
""" |
|
|
|
MATCH_GEN_INSTRUCTION = """You are a Python code generator. Your job is to translate a natural language query into a function named `match(metadata: dict) -> bool`. |
|
|
|
Requirements: |
|
- Only use keys present in the schema. |
|
- Match strings case-insensitively. |
|
- For numerical comparisons, cast to float. |
|
- Combine conditions using logical `and`, `or` as inferred from natural language. |
|
- Handle missing keys by returning False. |
|
Return only the function code, nothing else. |
|
""" |
|
|
|
@function_tool |
|
def get_prompt_to_generate_match_code(query: str) -> str: |
|
""" |
|
Generate a prompt to create a match function based on the user's query. |
|
""" |
|
return ( |
|
METADATA_SCHEMA |
|
+ QUERY_EXAMPLES |
|
+ MATCH_GEN_INSTRUCTION |
|
+ f"\nQuery: {query}\n" |
|
) |
|
|
|
KEYWORD_SEARCH_AGENT_INSTRUCTIONS = """You are a Keyword Search Agent specialized in metadata-based filtering. |
|
Given a natural language query from the user, you will automatically generate an executable `match` function based on the prompt provided by `get_prompt_to_generate_match_code`. |
|
The `match` function is crucial for handling constraints on keys and values. Ensure that the keys match those defined in the schema. |
|
For values, in cases where it is unclear whether the value to filter is a lower or upper bound, prioritize using the word as it appears in the user's query. |
|
Combine the `match` function with `query_3D_object_by_keyword_search` to filter the top-K matching 3D object paths.""" |
|
|
|
keyword_search_agent = Agent( |
|
name="Keyword Search Agent", |
|
instructions=KEYWORD_SEARCH_AGENT_INSTRUCTIONS, |
|
tools=[get_prompt_to_generate_match_code, query_3D_object_by_keyword_search], |
|
) |
|
|
|
@function_tool |
|
def query_3D_object_by_semantic_search(query: str, top_k: int = 4): |
|
logger.info("Datum Agent is running query_3D_object_by_semantic_search") |
|
response = query_3D_object_by_semantic_search_method( |
|
query, current_obj_path, embedding_dict, top_k |
|
) |
|
logger.info( |
|
f"Found {len(response) // 3} matching objects for the query `{query}`:\n```{response[: len(response) // 3]}```" |
|
) |
|
return response |
|
|
|
@function_tool |
|
def search_3D_similarity_factory( |
|
query: str, selected_filepath: str, top_k: int = 4 |
|
): |
|
logger.info("Datum Agent is running search_3D_similarity_factory") |
|
response = search_3D_similarity(selected_filepath, embedding_dict, top_k) |
|
logger.info( |
|
f"Found {len(response) // 3} similar objects for the query `{query}`:\n```{response[: len(response) // 3]}```" |
|
) |
|
return response |
|
|
|
@function_tool |
|
def get_description_of_model_to_analysis(current_obj_path: str | None) -> str: |
|
if current_obj_path is None: |
|
raise gr.Error("Please select a file!") |
|
return embedding_dict[current_obj_path]["description"] |
|
|
|
DATUM_AGENT_INSTRUCTIONS = """You are the Datum Agent: you retrieve the top-K most relevant 3D objects using three strategies: |
|
* Use `query_3D_object_by_semantic_search` for abstract or descriptive queries. |
|
* Use `search_3D_similarity_factory` when the query mentions the object currently displayed on the screen and aims to find similar objects. |
|
* Use **Keyword Search Agent** for precise metadata constraints or comparative/filtering information in the query. |
|
Return only the final tuple of file paths and display names. If the response contains private paths which duplicated name, please ignore them! |
|
Moreover, you can able to generate a comprehensive response when our users ask for a description of the current 3D object. In these cases, you are required to: |
|
* Use `get_description_of_model_to_analysis` to retrieve the description of the current 3D object for analysis when receiving a user's query related to analysis or a description of the current view object. |
|
# --- |
|
{schema_metadata} |
|
# --- |
|
{examples} |
|
""" |
|
DATUM_AGENT_EXAMPLES = """ |
|
**Examples:** |
|
1. "Find something that looks like a camera mount." → Use `query_3D_object_by_semantic_search` (abstract visual concept). |
|
2. "Show me more models similar to the one I'm viewing." → Use `search_3D_similarity_factory` (based on current object). |
|
3. "Find objects with height greater than 10 cm and material is steel." → Use **Keyword Search Agent** (metadata-based filtering). |
|
4. "Describe what I'm seeing." → Use `get_description_of_model_to_analysis`. |
|
5. "I need a part shaped like a robotic joint." → Use `query_3D_object_by_semantic_search` (descriptive shape-based intent). |
|
6. "Give me parts that look like this but slightly longer." → Use `search_3D_similarity_factory` (contextual similarity from current view). |
|
7. "List components with width less than 5mm and made of plastic." → Use **Keyword Search Agent** (exact attribute constraints). |
|
8. "What is this component used for?" → Use `get_description_of_model_to_analysis`. |
|
9. "Search for something resembling a gear or cog." → Use `query_3D_object_by_semantic_search` (visual-concept query). |
|
10. "Filter models labeled TS6 with height between 10 and 15." → Use **Keyword Search Agent** (keyword and numeric filtering). |
|
11. "Do any have 12 holes?" → Use `query_3D_object_by_semantic_search` (because the key in the query does not match any defined metadata keys, so semantic search is the only viable option). |
|
""" |
|
|
|
HANDOFF_DESCRIPTION = """Handing off to Datum Agent: you can perform semantic search, keyword-based filtering, or visual similarity search. |
|
If metadata filtering is required, delegate to the **Keyword Search Agent** by calling `get_prompt_to_generate_match_code`. |
|
""" |
|
|
|
datum_agent = Agent( |
|
name="Datum Agent", |
|
handoff_description=HANDOFF_DESCRIPTION, |
|
instructions=DATUM_AGENT_INSTRUCTIONS.format( |
|
examples=DATUM_AGENT_EXAMPLES, schema_metadata=METADATA_SCHEMA |
|
), |
|
tools=[ |
|
query_3D_object_by_semantic_search, |
|
search_3D_similarity_factory, |
|
get_description_of_model_to_analysis, |
|
], |
|
handoffs=[keyword_search_agent], |
|
) |
|
|
|
|
|
prompt_input = f"""An user is watching a 3D object and wants to query it. |
|
The query is: `{query}`. |
|
The current 3D object is `{current_obj_path}`. |
|
You need to find the most relevant 3D objects based on the query and return the top-k results. |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
response = await Runner.run(datum_agent, prompt_input) |
|
|
|
|
|
function_call_output_list = [ |
|
item |
|
for item in response.to_input_list() |
|
if item.get("type") == "function_call_output" |
|
] |
|
files_result = function_call_output_list[-1] |
|
logger.info(f"Datum Agent raw response: {files_result}") |
|
|
|
try: |
|
result = ast.literal_eval(files_result.get("output", "[]")) |
|
except Exception as e: |
|
logger.error( |
|
f"Datum Agent did not return a valid list of file paths due to {e}" |
|
) |
|
return { |
|
"tripplet": [None] * top_k + [""] * top_k + ["Score: "] * top_k, |
|
"final_output": response.final_output, |
|
} |
|
|
|
if not isinstance(result, list): |
|
raise gr.Error("Datum Agent did not return a valid list of file paths.") |
|
|
|
assert ( |
|
len(result) == 3 * top_k |
|
), "Datum Agent did not return a valid list of file paths." |
|
|
|
return { |
|
"tripplet": result, |
|
"final_output": response.final_output, |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_header_from_3dxml(file_path): |
|
header_info = {} |
|
|
|
|
|
with zipfile.ZipFile(file_path, "r") as zip_ref: |
|
zip_ref.extractall("tmp_3dxml_extract") |
|
|
|
|
|
for root, dirs, files in os.walk("tmp_3dxml_extract"): |
|
for file in files: |
|
if file.endswith((".3dxml", ".xml")): |
|
xml_path = os.path.join(root, file) |
|
try: |
|
tree = ET.parse(xml_path) |
|
root_el = tree.getroot() |
|
ns = { |
|
"ns": root_el.tag.split("}")[0].strip("{") |
|
} |
|
|
|
header = root_el.find("ns:Header", ns) |
|
if header is not None: |
|
for child in header: |
|
tag = child.tag.split("}")[-1] |
|
value = child.text.strip() if child.text else "" |
|
header_info[tag] = value |
|
except Exception as e: |
|
print(f"Failed to parse {file}: {e}") |
|
|
|
return header_info |
|
|
|
|
|
|
|
|
|
|
|
def extract_step_metadata(file_path): |
|
metadata = {} |
|
|
|
try: |
|
with open(file_path, "r", encoding="utf-8", errors="ignore") as f: |
|
content = f.read() |
|
|
|
|
|
desc_match = re.search( |
|
r"FILE_DESCRIPTION\s*\(\s*\((.*?)\),\s*'([^']*)'\);", content, re.DOTALL |
|
) |
|
if desc_match: |
|
metadata["Description"] = desc_match.group(1).replace("'", "") |
|
metadata["Description_Level"] = desc_match.group(2) |
|
|
|
|
|
name_match = re.search( |
|
r"FILE_NAME\s*\(\s*'(.*?)',\s*'(.*?)',\s*\((.*?)\),\s*\((.*?)\),\s*'(.*?)',\s*'(.*?)',\s*'(.*?)'\s*\);", |
|
content, |
|
re.DOTALL, |
|
) |
|
if name_match: |
|
metadata["FileName"] = name_match.group(1) |
|
metadata["Created"] = name_match.group(2) |
|
metadata["Authors"] = name_match.group(3).replace("'", "") |
|
metadata["Organizations"] = name_match.group(4).replace("'", "") |
|
metadata["Preprocessor"] = name_match.group(5) |
|
metadata["OriginatingSystem"] = name_match.group(6) |
|
metadata["Authorization"] = name_match.group(7) |
|
|
|
|
|
schema_match = re.search( |
|
r"FILE_SCHEMA\s*\(\s*\((.*?)\)\s*\);", content, re.DOTALL |
|
) |
|
if schema_match: |
|
metadata["Schema"] = schema_match.group(1).replace("'", "") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to read STEP file: {e}") |
|
|
|
return metadata |
|
|
|
|
|
async def extract_step_metadata_using_llm(file_path: str) -> Dict: |
|
logger.info("Extracting STEP metadata using LLM") |
|
metadata = {} |
|
|
|
try: |
|
with open(file_path, "r", encoding="utf-8", errors="ignore") as f: |
|
content = f.read() |
|
|
|
|
|
endsec_index = content.find("ENDSEC;") |
|
if endsec_index != -1: |
|
content = content[:endsec_index].strip() + "\nENDSEC;" |
|
logger.info("Using trimmed content up to ENDSEC;") |
|
else: |
|
logger.warning("No ENDSEC; found in the STEP file, using full content.") |
|
|
|
|
|
system_prompt = """You are a STEP file expert. Given the HEADER section of a STEP file, extract the following fields in JSON format:\n |
|
- Description (from FILE_DESCRIPTION)\n |
|
- Description_Level\n |
|
- FileName\n |
|
- Created\n |
|
- Authors (as a comma-separated string)\n |
|
- Organizations (as a comma-separated string)\n |
|
- Preprocessor\n |
|
- OriginatingSystem\n |
|
- Authorization\n |
|
- Schema\n\n |
|
Only return a valid JSON object with these fields. |
|
Here is the content of the STEP file:\n |
|
content = ```step\n{content}\n``` |
|
""" |
|
|
|
|
|
raw_response = await llm_service.chat_with_text( |
|
prompt=system_prompt.format(content=content), |
|
return_as_json=True, |
|
) |
|
|
|
dict_response = json.loads(raw_response) |
|
return dict_response |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to extract STEP metadata with LLM: {e}") |
|
|
|
return metadata |
|
|
|
|
|
def dict_to_markdown(metadata: dict) -> str: |
|
return "\n".join(f"{key}: {value}" for key, value in metadata.items()) |
|
|
|
|
|
|
|
async def parse_3d_file(original_filepath: str) -> Dict[str, Any]: |
|
if original_filepath is None: |
|
return "No file" |
|
if original_filepath.endswith((".3dxml", ".3DXML")): |
|
meta = extract_header_from_3dxml(original_filepath) |
|
return meta |
|
elif original_filepath.endswith((".step", ".STEP")): |
|
meta = await extract_step_metadata_using_llm(original_filepath) |
|
return meta |
|
logger.warning(f"No metadata found in the file {original_filepath}") |
|
return {} |
|
|
|
|
|
def render_3D_metadata( |
|
original_filepath: str, obj_path: str, embedding_dict: dict |
|
) -> Tuple[str, str]: |
|
logger.info(f"Rendering 3D metadata for {original_filepath} and {obj_path}") |
|
return ( |
|
embedding_dict.get(obj_path, {}).get("metadata", "No metadata found!"), |
|
embedding_dict.get(obj_path, {}).get("description", "No description found!"), |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pc_views = Realistic_Projection() |
|
|
|
|
|
def render_depth_images_from_obj(obj_path: str, imsize: int = 512) -> List[np.ndarray]: |
|
mesh = trimesh.load_mesh(obj_path) |
|
points: Tensor = torch.tensor(mesh.vertices).float() |
|
if points.ndim == 2: |
|
points = points.unsqueeze(0) |
|
images: Tensor = pc_views.get_img(points) |
|
images = torch.nn.functional.interpolate( |
|
images, size=(imsize, imsize), mode="bilinear", align_corners=True |
|
) |
|
np_images: List[np.ndarray] = [] |
|
for tensor_image in images: |
|
np_images.append(np.array(TF.to_pil_image(tensor_image.cpu()))) |
|
return np_images |
|
|
|
|
|
def aggregate_images( |
|
np_images: list[np.ndarray], n_rows: int = 2, n_cols: int = 5 |
|
) -> np.ndarray: |
|
img_height, img_width = np_images[0].shape[:2] |
|
aggregate_img = np.zeros( |
|
(img_height * n_rows, img_width * n_cols, np_images[0].shape[2]), |
|
dtype=np_images[0].dtype, |
|
) |
|
|
|
for i, img in enumerate(np_images): |
|
row = i // n_cols |
|
col = i % n_cols |
|
aggregate_img[ |
|
row * img_height : (row + 1) * img_height, |
|
col * img_width : (col + 1) * img_width, |
|
] = img |
|
|
|
return aggregate_img |
|
|
|
|
|
DESCRIPTION_AGGREGATED_DEPTH_MAP_PROMPT = """You are a manufacturing expert analyzing 3D objects for production purposes. Given a set of multi-view depth maps of a single object, extract all possible special features relevant to manufacturing. |
|
|
|
Your output must follow the structured format provided below and be as complete and specific as possible, even if some features are inferred or uncertain. |
|
``` |
|
🔎 Extracted Manufacturing Features from Depth Maps |
|
|
|
1. Geometric Features |
|
Dimensions: <!-- List key dimensions such as height, width, depth, thickness, or aspect ratios. Use units if possible. Mention estimated ranges if exact values are unclear. --> |
|
Notable Shapes: <!-- Describe the overall shape and form (e.g., cylindrical body with a tapered end, flat rectangular base, spherical top). Mention symmetry or irregularities. --> |
|
Holes: <!-- Count and describe hole types (e.g., through-holes, blind holes), location if visible, and their arrangement or pattern (e.g., circular array, linear slot). --> |
|
Surface Features: <!-- Include textures, fillets, chamfers, ribs, grooves, steps, and engravings. Identify raised or recessed areas that are not part of the base shape. --> |
|
Other: <!-- Any other geometric characteristics not covered above (e.g., draft angles, deformation, cutouts). --> |
|
|
|
2. Material-Related Inferences |
|
Likely Material: <!-- Infer from shape, thickness, or typical use cases (e.g., plastic, aluminum, cast iron). State if uncertain or not visible. --> |
|
Surface Texture: <!-- Describe the expected finish (e.g., rough, matte, polished) based on depth gradients or edge sharpness. --> |
|
Durability Hints: <!-- Mention any features that suggest mechanical strength or wear resistance (e.g., thick load-bearing sections, reinforcement patterns). --> |
|
|
|
3. Manufacturing-Related Features |
|
Manufacturing Process: <!-- Suggest most likely processes (e.g., injection molding, CNC milling, casting) based on geometry and typical industry practices. --> |
|
Draft Angles: <!-- Indicate presence and estimate angles if the object appears designed for mold release. --> |
|
Undercuts: <!-- Identify any undercut areas that may require complex tooling or multi-part molds. --> |
|
Mold Flow Considerations: <!-- Comment on how the material might flow during molding or casting, and whether the geometry supports or hinders it. --> |
|
|
|
4. Functional and Assembly Features |
|
Mounting Points: <!-- Identify places where fasteners or brackets might attach (e.g., holes, bosses, flanges). --> |
|
Jointing Features: <!-- Describe features used to join with other parts, such as snap fits, tabs, slots, dovetails, etc. --> |
|
Alignment Aids: <!-- Note features like pins, grooves, or guide rails that help align components during assembly. --> |
|
Modularity: <!-- Assess whether the object is likely part of a modular system based on interface shapes or repeated features. --> |
|
|
|
5. Inspection and Quality Features |
|
Critical Dimensions: <!-- Highlight any dimensions likely to be functionally critical or require tight tolerance. --> |
|
Surface Finish Zones: <!-- Point out areas that may require fine finishing or polishing for performance or cosmetic reasons. --> |
|
Datums: <!-- Indicate flat surfaces or edges likely to serve as reference datums during measurement or machining. --> |
|
Tolerances: <!-- Mention if any tolerances can be inferred, e.g., tight fits, loose clearances, or any standard class assumptions. --> |
|
|
|
``` |
|
If any feature cannot be determined from the depth maps, state “Not visible” or “Cannot be inferred.” |
|
Use clear technical vocabulary appropriate for manufacturing and quality control.""" |
|
|
|
|
|
async def generate_description_from_aggregated_depth_map(np_image: np.ndarray) -> str: |
|
test_prompt = DESCRIPTION_AGGREGATED_DEPTH_MAP_PROMPT |
|
base64_image = llm_service.encode_image(image=np_image) |
|
return await llm_service.chat_with_image(prompt=test_prompt, image=base64_image) |
|
|
|
|
|
clip_embedding_model = ClipEmbedding( |
|
embed_batch_size=1536, |
|
) |
|
text_embedding_model = OpenAIEmbedding( |
|
mode=OpenAIEmbeddingMode.TEXT_SEARCH_MODE, |
|
model="text-embedding-3-small", |
|
api_key=OPENAI_API_KEY, |
|
dimensions=1536, |
|
embed_batch_size=512, |
|
) |
|
|
|
|
|
async def aget_image_embedding_from_np_image(np_image: np.ndarray): |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file: |
|
temp_file_path = temp_file.name |
|
|
|
Image.fromarray(np_image).save(temp_file_path) |
|
|
|
image_embedding = await clip_embedding_model.aget_image_embedding(temp_file_path) |
|
|
|
|
|
os.remove(temp_file_path) |
|
|
|
return image_embedding |
|
|
|
|
|
async def embedding_3d_object(obj_path: str) -> Dict[str, Any]: |
|
|
|
depth_images = render_depth_images_from_obj(obj_path=obj_path) |
|
|
|
aggregated_image = aggregate_images(depth_images) |
|
|
|
description = await generate_description_from_aggregated_depth_map( |
|
np_image=aggregated_image |
|
) |
|
|
|
image_embedding = await aget_image_embedding_from_np_image( |
|
np_image=aggregated_image |
|
) |
|
return {"description": description, "image_embedding": image_embedding} |
|
|
|
|
|
BASE_SAMPLE_DIR = "/Users/tridoan/Spartan/Datum/service-ai/poc/3D/gradio_cache/" |
|
sample_files = [ |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
def normalize_metadata(metadata: Dict[str, Any]) -> Dict[str, object]: |
|
""" |
|
Convert metadata values to float if possible, else keep original string. |
|
""" |
|
normalized = {} |
|
for k, v in metadata.items(): |
|
if v is None: |
|
normalized[k] = "None" |
|
continue |
|
try: |
|
normalized[k] = float(v) |
|
except (ValueError, TypeError): |
|
normalized[k] = v.strip() if isinstance(v, str) else v |
|
return normalized |
|
|
|
|
|
async def accumulate_and_embedding( |
|
input_files: List[str], |
|
file_list: List[str], |
|
embedding_dict: Dict[str, Any], |
|
converting_store_map: Dict[str, str], |
|
): |
|
|
|
if not isinstance(input_files, list): |
|
input_files = [input_files] |
|
|
|
all_files = input_files |
|
new_files = input_files[len(file_list) :] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for file_path in new_files: |
|
logger.info("Processing new upload file:", file_path) |
|
|
|
|
|
prefix_path, ext = os.path.splitext(file_path) |
|
if ext.lower() in ONSHAPE_SUPPORTED_FORMAT: |
|
response = await onshape_converter(input_file_path=file_path) |
|
step_path = response.get("output_file", "") |
|
logger.info( |
|
f"Converted {file_path} to {step_path} using Onshape converter." |
|
) |
|
else: |
|
step_path = None |
|
|
|
|
|
if step_path is not None: |
|
obj_path = await convert_to_obj(step_path) |
|
logger.info(f"Converted {step_path} to {obj_path} using FreeCAD converter.") |
|
else: |
|
obj_path = await convert_to_obj(file_path) |
|
logger.info(f"Converted {file_path} to {obj_path}.") |
|
|
|
|
|
embeddings = await embedding_3d_object(obj_path) |
|
|
|
|
|
if step_path is not None: |
|
metadata_extraction = await parse_3d_file(original_filepath=step_path) |
|
logger.info(f"Extracted metadata from STEP file: {metadata_extraction}") |
|
else: |
|
metadata_extraction = await parse_3d_file(original_filepath=file_path) |
|
|
|
|
|
metadata_aggregation = extract_geometric_features(obj_path) |
|
metadata = ( |
|
dict_to_markdown(metadata_aggregation) |
|
+ "\n" |
|
+ dict_to_markdown(metadata_extraction) |
|
) |
|
|
|
if obj_path not in embedding_dict: |
|
embedding_dict[obj_path] = {} |
|
text_embedding = await text_embedding_model.aget_text_embedding( |
|
text="The 3D object is: " |
|
+ embeddings["description"] |
|
+ f".\n {'n' * 20}\nMetadata: " |
|
+ metadata |
|
) |
|
metadata_aggregation.update( |
|
metadata_extraction |
|
) |
|
|
|
embedding_dict[obj_path]["metadata"] = metadata |
|
embedding_dict[obj_path]["metadata_dictionary"] = normalize_metadata( |
|
metadata_aggregation |
|
) |
|
embedding_dict[obj_path]["description"] = embeddings["description"] |
|
embedding_dict[obj_path]["image_embedding"] = embeddings["image_embedding"] |
|
embedding_dict[obj_path]["text_embedding"] = text_embedding |
|
|
|
|
|
converting_store_map[file_path] = obj_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
return all_files, gr.update(choices=all_files), embedding_dict, converting_store_map |
|
|
|
|
|
def select_file(filename, file_list): |
|
for file in file_list: |
|
if file.name == filename: |
|
with open(file.name, "r", encoding="utf-8", errors="ignore") as f: |
|
content = f.read() |
|
return f"Selected: {file.name}\n---\n{content[:300]}..." |
|
return "File not found." |
|
|
|
|
|
async def render_3D_object(filepath, converting_store_map) -> Tuple[str, str]: |
|
_, ext = os.path.splitext(filepath) |
|
ext = ext.lower() |
|
if ext in tuple(GRADIO_3D_MODEL_DEFAULT_FORMAT): |
|
return filepath, filepath |
|
if ext in tuple( |
|
USER_REQUIRE_FORMAT |
|
+ FREECAD_LOW_LEVEL_FORMAT |
|
+ FREECAD_NATIVE_FORMAT |
|
+ ONSHAPE_SUPPORTED_FORMAT |
|
): |
|
if filepath in converting_store_map: |
|
return converting_store_map[filepath], filepath |
|
return await convert_to_obj(filepath), filepath |
|
return filepath, filepath |
|
|
|
|
|
|
|
|
|
|
|
valid_file_types = list( |
|
set( |
|
GRADIO_3D_MODEL_DEFAULT_FORMAT |
|
+ USER_REQUIRE_FORMAT |
|
+ FREECAD_NATIVE_FORMAT |
|
+ FREECAD_LOW_LEVEL_FORMAT |
|
+ ONSHAPE_SUPPORTED_FORMAT |
|
) |
|
) |
|
valid_file_types = valid_file_types + [t.upper() for t in valid_file_types] |
|
with gr.Blocks() as demo: |
|
with gr.Row(): |
|
file_state = gr.State(sample_files) |
|
|
|
embedding_store = gr.State({}) |
|
converting_store_map = gr.State({}) |
|
file_input = gr.File( |
|
file_count="multiple", |
|
label="Upload files (You can append more)", |
|
file_types=valid_file_types, |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
file_dropdown = gr.Dropdown( |
|
label="Select a file to process", choices=sample_files, interactive=True |
|
) |
|
metadata_render = gr.Textbox(label="Metadata", lines=6) |
|
description_render = gr.Textbox(label="Description", lines=6) |
|
with gr.Column(scale=1): |
|
model_render = gr.Model3D(label="3D", height=500, interactive=False) |
|
model_hidden_filepath = gr.Textbox(visible=False) |
|
|
|
with gr.Tab("Text Query Search"): |
|
query_input = gr.Textbox(placeholder="Which 3D CAD contains 2 holes?") |
|
query_button = gr.Button("Query Search") |
|
response_box = gr.Textbox(placeholder="Thinking...", label="Response") |
|
|
|
with gr.Row(): |
|
with gr.Row(): |
|
model_q_1 = gr.Model3D( |
|
label="3D Top 1", interactive=False |
|
) |
|
model_q_1_score = gr.Text( |
|
value="Score: ", label="", interactive=False |
|
) |
|
model_q_1_btn = gr.Button(value="3D Top 1", size="sm") |
|
with gr.Row(): |
|
model_q_2 = gr.Model3D(label="3D Top 2", interactive=False) |
|
model_q_2_score = gr.Text( |
|
value="Score: ", label="", interactive=False |
|
) |
|
model_q_2_btn = gr.Button(value="3D Top 2", size="sm") |
|
|
|
with gr.Row(): |
|
with gr.Row(): |
|
model_q_3 = gr.Model3D(label="3D Top 3", interactive=False) |
|
model_q_3_score = gr.Text( |
|
value="Score: ", label="", interactive=False |
|
) |
|
model_q_3_btn = gr.Button(value="3D Top 3", size="sm") |
|
with gr.Row(): |
|
model_q_4 = gr.Model3D(label="3D Top 4", interactive=False) |
|
model_q_4_score = gr.Text( |
|
value="Score: ", label="", interactive=False |
|
) |
|
model_q_4_btn = gr.Button(value="3D Top 4", size="sm") |
|
|
|
with gr.Tab("3D Similarity Search"): |
|
sim_button = gr.Button("Similarity Search") |
|
with gr.Row(): |
|
with gr.Row(): |
|
model_s_1 = gr.Model3D(label="3D Sim 1", interactive=False) |
|
model_s_1_score = gr.Text( |
|
value="Score: ", label="", interactive=False |
|
) |
|
model_s_1_btn = gr.Button(value="3D Sim 1", size="sm") |
|
with gr.Row(): |
|
model_s_2 = gr.Model3D(label="3D Sim 2", interactive=False) |
|
model_s_2_score = gr.Text( |
|
value="Score: ", label="", interactive=False |
|
) |
|
model_s_2_btn = gr.Button(value="3D Sim 2", size="sm") |
|
with gr.Row(): |
|
with gr.Row(): |
|
model_s_3 = gr.Model3D(label="3D Sim 3", interactive=False) |
|
model_s_3_score = gr.Text( |
|
value="Score: ", label="", interactive=False |
|
) |
|
model_s_3_btn = gr.Button(value="3D Sim 3", size="sm") |
|
with gr.Row(): |
|
model_s_4 = gr.Model3D(label="3D Sim 4", interactive=False) |
|
model_s_4_score = gr.Text( |
|
value="Score: ", label="", interactive=False |
|
) |
|
model_s_4_btn = gr.Button(value="3D Sim 4", size="sm") |
|
|
|
file_input.change( |
|
fn=accumulate_and_embedding, |
|
inputs=[file_input, file_state, embedding_store, converting_store_map], |
|
outputs=[file_state, file_dropdown, embedding_store, converting_store_map], |
|
) |
|
|
|
query_button.click( |
|
query_3D_object, |
|
[query_input, model_render, embedding_store], |
|
[ |
|
response_box, |
|
model_q_1, |
|
model_q_2, |
|
model_q_3, |
|
model_q_4, |
|
model_q_1_btn, |
|
model_q_2_btn, |
|
model_q_3_btn, |
|
model_q_4_btn, |
|
model_q_1_score, |
|
model_q_2_score, |
|
model_q_3_score, |
|
model_q_4_score, |
|
], |
|
) |
|
|
|
model_q_1_btn.click( |
|
render_3D_object, |
|
[model_q_1, converting_store_map], |
|
[model_render, model_hidden_filepath], |
|
) |
|
model_q_2_btn.click( |
|
render_3D_object, |
|
[model_q_2, converting_store_map], |
|
[model_render, model_hidden_filepath], |
|
) |
|
model_q_3_btn.click( |
|
render_3D_object, |
|
[model_q_3, converting_store_map], |
|
[model_render, model_hidden_filepath], |
|
) |
|
model_q_4_btn.click( |
|
render_3D_object, |
|
[model_q_4, converting_store_map], |
|
[model_render, model_hidden_filepath], |
|
) |
|
|
|
sim_button.click( |
|
search_3D_similarity, |
|
[model_render, embedding_store], |
|
[ |
|
model_s_1, |
|
model_s_2, |
|
model_s_3, |
|
model_s_4, |
|
model_s_1_btn, |
|
model_s_2_btn, |
|
model_s_3_btn, |
|
model_s_4_btn, |
|
model_s_1_score, |
|
model_s_2_score, |
|
model_s_3_score, |
|
model_s_4_score, |
|
], |
|
) |
|
|
|
model_s_1_btn.click( |
|
render_3D_object, |
|
[model_s_1, converting_store_map], |
|
[model_render, model_hidden_filepath], |
|
) |
|
model_s_2_btn.click( |
|
render_3D_object, |
|
[model_s_2, converting_store_map], |
|
[model_render, model_hidden_filepath], |
|
) |
|
model_s_3_btn.click( |
|
render_3D_object, |
|
[model_s_3, converting_store_map], |
|
[model_render, model_hidden_filepath], |
|
) |
|
model_s_4_btn.click( |
|
render_3D_object, |
|
[model_s_4, converting_store_map], |
|
[model_render, model_hidden_filepath], |
|
) |
|
|
|
file_dropdown.change( |
|
render_3D_object, |
|
[file_dropdown, converting_store_map], |
|
[model_render, model_hidden_filepath], |
|
) |
|
|
|
model_hidden_filepath.change( |
|
render_3D_metadata, |
|
[model_hidden_filepath, model_render, embedding_store], |
|
[metadata_render, description_render], |
|
) |
|
|
|
if __name__ == "__main__": |
|
_env = os.environ.get("ENVIRONMENT", "dev") |
|
demo.launch(share=True if _env in ["dev", "prod"] else False) |
|
|