|
import xml.etree.ElementTree as ET
|
|
from elasticsearch import Elasticsearch, helpers
|
|
import torch
|
|
from transformers import CLIPProcessor, CLIPModel
|
|
import numpy as np
|
|
|
|
from server.utils.database import get_db
|
|
from server.utils.model import get_clip_model
|
|
|
|
from server.models.database import DocumentModel
|
|
|
|
clip_model, clip_processor = get_clip_model()
|
|
|
|
|
|
def insert_data(bulk_data, db=get_db(), index_name="patents"):
|
|
if bulk_data:
|
|
helpers.bulk(db, bulk_data)
|
|
return f"Inserted {len(bulk_data)} patent records with embeddings into Elasticsearch."
|
|
else:
|
|
return "No patent records found to insert."
|
|
|
|
|
|
def search_data(embedding: list[float] = None, db=get_db(), top_k=5, index_name="patents"):
|
|
if embedding is None or len(embedding) != 512:
|
|
raise ValueError("Embedding must be a list of 512 floats.")
|
|
|
|
body = {
|
|
"size": top_k,
|
|
"query": {
|
|
"script_score": {
|
|
"query": {"match_all": {}},
|
|
"script": {
|
|
"source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
|
|
"params": {"query_vector": embedding}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
res = db.search(index=index_name, body=body)
|
|
results = []
|
|
for hit in res['hits']['hits']:
|
|
doc = hit['_source']
|
|
results.append({
|
|
"patent_id": doc.get('doc_numbers', [''])[0] if doc.get('doc_numbers') else '',
|
|
"invention_title": doc.get('invention_title', ''),
|
|
"assignors": doc.get('assignors', ''),
|
|
"assignees": doc.get('assignees', ''),
|
|
"conveyance_text": doc.get('conveyance_text', ''),
|
|
"reel_no": doc.get('reel_no', ''),
|
|
"frame_no": doc.get('frame_no', '')
|
|
})
|
|
return results
|
|
|
|
|
|
def insert_patent_data(xml_file, index_name: str = "patents"):
|
|
tree = ET.parse(xml_file)
|
|
root = tree.getroot()
|
|
es = get_db()
|
|
|
|
if not es.indices.exists(index=index_name):
|
|
es.indices.create(index=index_name, body={
|
|
"mappings": {
|
|
"properties": {
|
|
"reel_no": {"type": "keyword"},
|
|
"frame_no": {"type": "keyword"},
|
|
"assignors": {"type": "text"},
|
|
"assignees": {"type": "text"},
|
|
"invention_title": {"type": "text"},
|
|
"conveyance_text": {"type": "text"},
|
|
"doc_numbers": {"type": "keyword"},
|
|
"raw_text": {"type": "text"},
|
|
"embedding": {"type": "dense_vector", "dims": 512, "index": True, "similarity": "cosine"}
|
|
}
|
|
}
|
|
})
|
|
get_text = lambda el: el.text.strip() if el is not None and el.text else ""
|
|
bulk_data = []
|
|
for pa in root.findall('.//patent-assignment'):
|
|
record = pa.find('assignment-record')
|
|
if record is None:
|
|
continue
|
|
reel_no = get_text(record.find('reel-no'))
|
|
frame_no = get_text(record.find('frame-no'))
|
|
conveyance_text = get_text(record.find('conveyance-text'))
|
|
assignors = ", ".join([
|
|
get_text(a.find('name')) for a in pa.findall('.//patent-assignor') if get_text(a.find('name'))
|
|
])
|
|
assignees = ", ".join([
|
|
get_text(a.find('name')) for a in pa.findall('.//patent-assignee') if get_text(a.find('name'))
|
|
])
|
|
invention_title = ""
|
|
doc_numbers = []
|
|
for prop in pa.findall('.//patent-property'):
|
|
title = prop.find('invention-title')
|
|
if title is not None:
|
|
invention_title = get_text(title)
|
|
for doc in prop.findall('document-id'):
|
|
doc_num = get_text(doc.find('doc-number'))
|
|
if doc_num:
|
|
doc_numbers.append(doc_num)
|
|
embedding = None
|
|
if invention_title:
|
|
inputs = clip_processor(text=[invention_title], return_tensors="pt", padding=True, truncation=True)
|
|
with torch.no_grad():
|
|
embedding = clip_model.get_text_features(**inputs)[0].cpu().numpy().astype(np.float32).tolist()
|
|
else:
|
|
embedding = [0.0]*512
|
|
doc = {
|
|
"reel_no": reel_no,
|
|
"frame_no": frame_no,
|
|
"assignors": assignors,
|
|
"assignees": assignees,
|
|
"invention_title": invention_title,
|
|
"conveyance_text": conveyance_text,
|
|
"doc_numbers": doc_numbers,
|
|
"raw_text": invention_title,
|
|
"embedding": embedding
|
|
}
|
|
bulk_data.append({"_index": index_name, "_source": doc})
|
|
if bulk_data:
|
|
helpers.bulk(es, bulk_data)
|
|
return f"Inserted {len(bulk_data)} patent records with embeddings into Elasticsearch."
|
|
else:
|
|
return "No patent records found to insert."
|
|
|
|
|
|
|
|
def search_patents(query=None, image_path=None, top_k=5, index_name="patents"):
|
|
es = get_db()
|
|
if query:
|
|
inputs = clip_processor(text=[query], return_tensors="pt", padding=True, truncation=True)
|
|
with torch.no_grad():
|
|
embedding = clip_model.get_text_features(**inputs)[0].cpu().numpy().astype(np.float32).tolist()
|
|
elif image_path:
|
|
from PIL import Image
|
|
image = Image.open(image_path).convert("RGB")
|
|
inputs = clip_processor(images=image, return_tensors="pt")
|
|
with torch.no_grad():
|
|
embedding = clip_model.get_image_features(**inputs)[0].cpu().numpy().astype(np.float32).tolist()
|
|
else:
|
|
return []
|
|
body = {
|
|
"size": top_k,
|
|
"query": {
|
|
"script_score": {
|
|
"query": {"match_all": {}},
|
|
"script": {
|
|
"source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
|
|
"params": {"query_vector": embedding}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
res = es.search(index=index_name, body=body)
|
|
results = []
|
|
for hit in res['hits']['hits']:
|
|
doc = hit['_source']
|
|
results.append({
|
|
"patent_id": doc.get('doc_numbers', [''])[0] if doc.get('doc_numbers') else '',
|
|
"invention_title": doc.get('invention_title', ''),
|
|
"assignors": doc.get('assignors', ''),
|
|
"assignees": doc.get('assignees', ''),
|
|
"conveyance_text": doc.get('conveyance_text', ''),
|
|
"reel_no": doc.get('reel_no', ''),
|
|
"frame_no": doc.get('frame_no', '')
|
|
})
|
|
return results
|
|
|
|
|