File size: 6,792 Bytes
d17ca98 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
import xml.etree.ElementTree as ET
from elasticsearch import Elasticsearch, helpers
import torch
from transformers import CLIPProcessor, CLIPModel
import numpy as np
from server.utils.database import get_db
from server.utils.model import get_clip_model
from server.models.database import DocumentModel
# Load CLIP model globally for reuse
clip_model, clip_processor = get_clip_model()
def insert_data(bulk_data, db=get_db(), index_name="patents"):
if bulk_data:
helpers.bulk(db, bulk_data)
return f"Inserted {len(bulk_data)} patent records with embeddings into Elasticsearch."
else:
return "No patent records found to insert."
def search_data(embedding: list[float] = None, db=get_db(), top_k=5, index_name="patents"):
if embedding is None or len(embedding) != 512:
raise ValueError("Embedding must be a list of 512 floats.")
body = {
"size": top_k,
"query": {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
"params": {"query_vector": embedding}
}
}
}
}
res = db.search(index=index_name, body=body)
results = []
for hit in res['hits']['hits']:
doc = hit['_source']
results.append({
"patent_id": doc.get('doc_numbers', [''])[0] if doc.get('doc_numbers') else '',
"invention_title": doc.get('invention_title', ''),
"assignors": doc.get('assignors', ''),
"assignees": doc.get('assignees', ''),
"conveyance_text": doc.get('conveyance_text', ''),
"reel_no": doc.get('reel_no', ''),
"frame_no": doc.get('frame_no', '')
})
return results
# CRUD: Insert patent data from XML
def insert_patent_data(xml_file, index_name: str = "patents"):
tree = ET.parse(xml_file)
root = tree.getroot()
es = get_db()
# Create index if not exists
if not es.indices.exists(index=index_name):
es.indices.create(index=index_name, body={
"mappings": {
"properties": {
"reel_no": {"type": "keyword"},
"frame_no": {"type": "keyword"},
"assignors": {"type": "text"},
"assignees": {"type": "text"},
"invention_title": {"type": "text"},
"conveyance_text": {"type": "text"},
"doc_numbers": {"type": "keyword"},
"raw_text": {"type": "text"},
"embedding": {"type": "dense_vector", "dims": 512, "index": True, "similarity": "cosine"}
}
}
})
get_text = lambda el: el.text.strip() if el is not None and el.text else ""
bulk_data = []
for pa in root.findall('.//patent-assignment'):
record = pa.find('assignment-record')
if record is None:
continue
reel_no = get_text(record.find('reel-no'))
frame_no = get_text(record.find('frame-no'))
conveyance_text = get_text(record.find('conveyance-text'))
assignors = ", ".join([
get_text(a.find('name')) for a in pa.findall('.//patent-assignor') if get_text(a.find('name'))
])
assignees = ", ".join([
get_text(a.find('name')) for a in pa.findall('.//patent-assignee') if get_text(a.find('name'))
])
invention_title = ""
doc_numbers = []
for prop in pa.findall('.//patent-property'):
title = prop.find('invention-title')
if title is not None:
invention_title = get_text(title)
for doc in prop.findall('document-id'):
doc_num = get_text(doc.find('doc-number'))
if doc_num:
doc_numbers.append(doc_num)
embedding = None
if invention_title:
inputs = clip_processor(text=[invention_title], return_tensors="pt", padding=True, truncation=True)
with torch.no_grad():
embedding = clip_model.get_text_features(**inputs)[0].cpu().numpy().astype(np.float32).tolist()
else:
embedding = [0.0]*512
doc = {
"reel_no": reel_no,
"frame_no": frame_no,
"assignors": assignors,
"assignees": assignees,
"invention_title": invention_title,
"conveyance_text": conveyance_text,
"doc_numbers": doc_numbers,
"raw_text": invention_title,
"embedding": embedding
}
bulk_data.append({"_index": index_name, "_source": doc})
if bulk_data:
helpers.bulk(es, bulk_data)
return f"Inserted {len(bulk_data)} patent records with embeddings into Elasticsearch."
else:
return "No patent records found to insert."
# CRUD: Search patents by text or image
def search_patents(query=None, image_path=None, top_k=5, index_name="patents"):
es = get_db()
if query:
inputs = clip_processor(text=[query], return_tensors="pt", padding=True, truncation=True)
with torch.no_grad():
embedding = clip_model.get_text_features(**inputs)[0].cpu().numpy().astype(np.float32).tolist()
elif image_path:
from PIL import Image
image = Image.open(image_path).convert("RGB")
inputs = clip_processor(images=image, return_tensors="pt")
with torch.no_grad():
embedding = clip_model.get_image_features(**inputs)[0].cpu().numpy().astype(np.float32).tolist()
else:
return []
body = {
"size": top_k,
"query": {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
"params": {"query_vector": embedding}
}
}
}
}
res = es.search(index=index_name, body=body)
results = []
for hit in res['hits']['hits']:
doc = hit['_source']
results.append({
"patent_id": doc.get('doc_numbers', [''])[0] if doc.get('doc_numbers') else '',
"invention_title": doc.get('invention_title', ''),
"assignors": doc.get('assignors', ''),
"assignees": doc.get('assignees', ''),
"conveyance_text": doc.get('conveyance_text', ''),
"reel_no": doc.get('reel_no', ''),
"frame_no": doc.get('frame_no', '')
})
return results
|