Spaces:
Sleeping
Sleeping
import shutil | |
import bm25s | |
from bm25s.hf import BM25HF | |
import threading, re, time, concurrent.futures, requests, os, hashlib, traceback, io, zipfile, subprocess, tempfile, json, fitz | |
import pandas as pd | |
import numpy as np | |
from bs4 import BeautifulSoup | |
from datasets import load_dataset, Dataset | |
from datasets.data_files import EmptyDatasetError | |
from dotenv import load_dotenv | |
load_dotenv() | |
class TDocIndexer: | |
def __init__(self, max_workers=33): | |
self.indexer_length = 0 | |
self.dataset = "OrganizedProgrammers/3GPPTDocLocation" | |
self.indexer = self.load_indexer() | |
self.main_ftp_url = "https://3gpp.org/ftp" | |
self.valid_doc_pattern = re.compile(r'^(S[1-6P]|C[1-6P]|R[1-6P])-\d+', flags=re.IGNORECASE) | |
self.max_workers = max_workers | |
self.print_lock = threading.Lock() | |
self.indexer_lock = threading.Lock() | |
self.total_indexed = 0 | |
self.processed_count = 0 | |
self.total_count = 0 | |
def load_indexer(self): | |
self.indexer_length = 0 | |
all_docs = {} | |
tdoc_locations = load_dataset(self.dataset) | |
tdoc_locations = tdoc_locations["train"].to_list() | |
for doc in tdoc_locations: | |
self.indexer_length += 1 | |
all_docs[doc["doc_id"]] = doc["url"] | |
return all_docs | |
def save_indexer(self): | |
"""Save the updated index""" | |
data = [] | |
for doc_id, url in self.indexer.items(): | |
data.append({"doc_id": doc_id, "url": url}) | |
dataset = Dataset.from_list(data) | |
dataset.push_to_hub(self.dataset, token=os.environ["HF"]) | |
self.indexer = self.load_indexer() | |
def get_docs_from_url(self, url): | |
try: | |
response = requests.get(url, verify=False, timeout=10) | |
soup = BeautifulSoup(response.text, "html.parser") | |
return [item.get_text() for item in soup.select("tr td a")] | |
except Exception as e: | |
with self.print_lock: | |
print(f"Erreur lors de l'accès à {url}: {e}") | |
return [] | |
def is_valid_document_pattern(self, filename): | |
return bool(self.valid_doc_pattern.match(filename)) | |
def is_zip_file(self, filename): | |
return filename.lower().endswith('.zip') | |
def extract_doc_id(self, filename): | |
if self.is_valid_document_pattern(filename): | |
match = self.valid_doc_pattern.match(filename) | |
if match: | |
# Retourner le motif complet (comme S1-12345) | |
full_id = filename.split('.')[0] # Enlever l'extension si présente | |
return full_id.split('_')[0] # Enlever les suffixes après underscore si présents | |
return None | |
def process_zip_files(self, files_list, base_url, workshop=False): | |
"""Traiter une liste de fichiers pour trouver et indexer les ZIP valides""" | |
indexed_count = 0 | |
for file in files_list: | |
if file in ['./', '../', 'ZIP/', 'zip/']: | |
continue | |
# Vérifier si c'est un fichier ZIP et s'il correspond au motif | |
if self.is_zip_file(file) and (self.is_valid_document_pattern(file) or workshop): | |
file_url = f"{base_url}/{file}" | |
# Extraire l'ID du document | |
doc_id = self.extract_doc_id(file) | |
if doc_id is None: | |
doc_id = file.split('.')[0] | |
if doc_id: | |
# Vérifier si ce fichier est déjà indexé | |
with self.indexer_lock: | |
if doc_id in self.indexer and self.indexer[doc_id] == file_url: | |
continue | |
# Ajouter ou mettre à jour l'index | |
self.indexer[doc_id] = file_url | |
indexed_count += 1 | |
self.total_indexed += 1 | |
return indexed_count | |
def process_meeting(self, meeting, wg_url, workshop=False): | |
"""Traiter une réunion individuelle avec multithreading""" | |
try: | |
if meeting in ['./', '../']: | |
return 0 | |
meeting_url = f"{wg_url}/{meeting}" | |
with self.print_lock: | |
print(f"Vérification du meeting: {meeting}") | |
# Vérifier le contenu de la réunion | |
meeting_contents = self.get_docs_from_url(meeting_url) | |
key = None | |
if "docs" in [x.lower() for x in meeting_contents]: | |
key = "docs" | |
elif "tdocs" in [x.lower() for x in meeting_contents]: | |
key = "tdocs" | |
elif "tdoc" in [x.lower() for x in meeting_contents]: | |
key = "tdoc" | |
if key is not None: | |
docs_url = f"{meeting_url}/{key}" | |
with self.print_lock: | |
print(f"Vérification des documents présent dans {docs_url}") | |
# Récupérer la liste des fichiers dans le dossier Docs | |
docs_files = self.get_docs_from_url(docs_url) | |
# 1. Indexer les fichiers ZIP directement dans le dossier Docs | |
docs_indexed_count = self.process_zip_files(docs_files, docs_url, workshop) | |
if docs_indexed_count > 0: | |
with self.print_lock: | |
print(f"{docs_indexed_count} fichiers trouvés") | |
# 2. Vérifier le sous-dossier ZIP s'il existe | |
if "zip" in [x.lower() for x in docs_files]: | |
zip_url = f"{docs_url}/zip" | |
with self.print_lock: | |
print(f"Vérification du dossier ./zip: {zip_url}") | |
# Récupérer les fichiers dans le sous-dossier ZIP | |
zip_files = self.get_docs_from_url(zip_url) | |
# Indexer les fichiers ZIP dans le sous-dossier ZIP | |
zip_indexed_count = self.process_zip_files(zip_files, zip_url, workshop) | |
if zip_indexed_count > 0: | |
with self.print_lock: | |
print(f"{zip_indexed_count} fichiers trouvés") | |
# Mise à jour du compteur de progression | |
with self.indexer_lock: | |
self.processed_count += 1 | |
# Affichage de la progression | |
with self.print_lock: | |
progress = (self.processed_count / self.total_count) * 100 if self.total_count > 0 else 0 | |
print(f"\rProgression: {self.processed_count}/{self.total_count} réunions traitées ({progress:.1f}%)") | |
return 1 # Réunion traitée avec succès | |
except Exception as e: | |
with self.print_lock: | |
print(f"\nErreur lors du traitement de la réunion {meeting}: {str(e)}") | |
return 0 | |
def process_workgroup(self, wg, main_url): | |
"""Traiter un groupe de travail avec multithreading pour ses réunions""" | |
if wg in ['./', '../']: | |
return | |
wg_url = f"{main_url}/{wg}" | |
with self.print_lock: | |
print(f"Vérification du working group: {wg}") | |
# Récupérer les dossiers de réunion | |
meeting_folders = self.get_docs_from_url(wg_url) | |
# Ajouter au compteur total | |
self.total_count += len([m for m in meeting_folders if m not in ['./', '../']]) | |
# Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle | |
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
futures = [executor.submit(self.process_meeting, meeting, wg_url) | |
for meeting in meeting_folders if meeting not in ['./', '../']] | |
# Attendre que toutes les tâches soient terminées | |
concurrent.futures.wait(futures) | |
def index_all_tdocs(self): | |
"""Indexer tous les documents ZIP dans la structure FTP 3GPP avec multithreading""" | |
print("Démarrage de l'indexation des TDocs 3GPP complète") | |
start_time = time.time() | |
docs_count_before = self.indexer_length | |
# Principaux groupes TSG | |
main_groups = ["tsg_sa", "tsg_ct", "tsg_ran"] # Ajouter d'autres si nécessaire | |
for main_tsg in main_groups: | |
print(f"Indexation de {main_tsg.upper()}...") | |
main_url = f"{self.main_ftp_url}/{main_tsg}" | |
# Récupérer les groupes de travail | |
workgroups = self.get_docs_from_url(main_url) | |
# Traiter chaque groupe de travail séquentiellement | |
# (mais les réunions à l'intérieur seront traitées en parallèle) | |
for wg in workgroups: | |
self.process_workgroup(wg, main_url) | |
docs_count_after = len(self.indexer) | |
new_docs_count = abs(docs_count_after - docs_count_before) | |
print(f"Indexation terminée en {time.time() - start_time:.2f} secondes") | |
print(f"Nouveaux documents ZIP indexés: {new_docs_count}") | |
print(f"Total des documents dans l'index: {docs_count_after}") | |
return self.indexer | |
def index_all_workshops(self): | |
print("Démarrage de l'indexation des workshops ZIP 3GPP...") | |
start_time = time.time() | |
docs_count_before = len(self.indexer) | |
print("\nIndexation du dossier 'workshop'") | |
main_url = f"{self.main_ftp_url}/workshop" | |
# Récupérer les dossiers de réunion | |
meeting_folders = self.get_docs_from_url(main_url) | |
# Ajouter au compteur total | |
self.total_count += len([m for m in meeting_folders if m not in ['./', '../']]) | |
# Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle | |
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
futures = [executor.submit(self.process_meeting, meeting, main_url, workshop=True) | |
for meeting in meeting_folders if meeting not in ['./', '../']] | |
concurrent.futures.wait(futures) | |
docs_count_after = len(self.indexer) | |
new_docs_count = docs_count_after - docs_count_before | |
print(f"\nIndexation terminée en {time.time() - start_time:.2f} secondes") | |
print(f"Nouveaux documents ZIP indexés: {new_docs_count}") | |
print(f"Total des documents dans l'index: {docs_count_after}") | |
return self.indexer | |
class Spec3GPPIndexer: | |
def __init__(self, max_workers=16): | |
self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list() | |
self.documents_by_spec_num = self._make_doc_index(self.spec_contents) | |
self.indexed_specifications = {} | |
self.specifications_passed = set() | |
self.processed_count = 0 | |
self.total_count = 0 | |
self.DICT_LOCK = threading.Lock() | |
self.DOCUMENT_LOCK = threading.Lock() | |
self.STOP_EVENT = threading.Event() | |
self.max_workers = max_workers | |
self.LIBREOFFICE_SEMAPHORE = threading.Semaphore(self.max_workers) | |
def _make_doc_index(self, specs): | |
doc_index = {} | |
for section in specs: | |
if section["doc_id"] not in doc_index: | |
doc_index[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]} | |
else: | |
doc_index[section["doc_id"]]["content"][section["section"]] = section["content"] | |
return doc_index | |
def version_to_code(version_str): | |
chars = "0123456789abcdefghijklmnopqrstuvwxyz" | |
parts = version_str.split('.') | |
if len(parts) != 3: | |
return None | |
try: | |
x, y, z = [int(p) for p in parts] | |
except ValueError: | |
return None | |
if x < 36 and y < 36 and z < 36: | |
return f"{chars[x]}{chars[y]}{chars[z]}" | |
else: | |
return f"{str(x).zfill(2)}{str(y).zfill(2)}{str(z).zfill(2)}" | |
def hasher(specification, version_code): | |
return hashlib.md5(f"{specification}{version_code}".encode()).hexdigest() | |
def get_scope(content): | |
for title, text in content.items(): | |
if title.lower().endswith("scope"): | |
return text | |
return "" | |
def get_text(self, specification, version_code): | |
if self.STOP_EVENT.is_set(): | |
return [] | |
doc_id = specification | |
series = doc_id.split(".")[0] | |
url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" | |
try: | |
response = requests.get(url, verify=False) | |
if response.status_code != 200: | |
return [] | |
zip_bytes = io.BytesIO(response.content) | |
with zipfile.ZipFile(zip_bytes) as zip_file: | |
# Filtrer uniquement fichiers .doc et .docx | |
docx_files = [f for f in zip_file.namelist() if f.lower().endswith(('.doc', '.docx'))] | |
if not docx_files: | |
return [] | |
full_text = [] | |
for doc_file in docx_files: | |
with tempfile.TemporaryDirectory() as tmpdir: | |
extracted_path = os.path.join(tmpdir, os.path.basename(doc_file)) | |
with open(extracted_path, 'wb') as f: | |
f.write(zip_file.read(doc_file)) | |
# Profil libreoffice temp dédié | |
profile_dir = tempfile.mkdtemp(prefix="libreoffice_profile_") | |
try: | |
with self.LIBREOFFICE_SEMAPHORE: | |
cmd = [ | |
'soffice', | |
'--headless', | |
f'-env:UserInstallation=file://{profile_dir}', | |
'--convert-to', 'txt:Text', | |
'--outdir', tmpdir, | |
extracted_path | |
] | |
subprocess.run(cmd, check=True, timeout=60*5, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
txt_file = os.path.splitext(extracted_path)[0] + '.txt' | |
if os.path.exists(txt_file): | |
with open(txt_file, 'r', encoding='utf-8', errors='ignore') as ftxt: | |
full_text.extend(ftxt.readlines()) | |
finally: | |
shutil.rmtree(profile_dir, ignore_errors=True) | |
return full_text | |
except Exception as e: | |
print(f"Error getting text for {specification} v{version_code}: {e}") | |
return [] | |
def get_spec_content(self, specification, version_code): | |
if self.STOP_EVENT.is_set(): | |
return {} | |
text = self.get_text(specification, version_code) | |
if not text: | |
return {} | |
chapters = [] | |
chapter_regex = re.compile(r"^(\d+[a-z]?(?:\.\d+)*)\t[A-Z0-9][\ \S]+[^\.]$") | |
for i, line in enumerate(text): | |
if chapter_regex.fullmatch(line): | |
chapters.append((i, line)) | |
document = {} | |
for i in range(len(chapters)): | |
start_index, chapter_title = chapters[i] | |
end_index = chapters[i+1][0] if i+1 < len(chapters) else len(text) | |
content_lines = text[start_index + 1:end_index] | |
document[chapter_title.replace("\t", " ")] = "\n".join(content_lines) | |
return document | |
def fetch_spec_table(self): | |
response = requests.get( | |
'https://www.3gpp.org/dynareport?code=status-report.htm', | |
headers={"User-Agent": 'Mozilla/5.0'}, | |
verify=False | |
) | |
dfs = pd.read_html(io.StringIO(response.text)) | |
for x in range(len(dfs)): | |
dfs[x] = dfs[x].replace({np.nan: None}) | |
columns_needed = [0, 1, 2, 3, 4] | |
extracted_dfs = [df.iloc[:, columns_needed] for df in dfs] | |
columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns] | |
specifications = [] | |
for df in extracted_dfs: | |
for index, row in df.iterrows(): | |
doc = row.to_list() | |
doc_dict = dict(zip(columns, doc)) | |
specifications.append(doc_dict) | |
return specifications | |
def process_specification(self, spec): | |
if self.STOP_EVENT.is_set(): | |
return | |
try: | |
doc_id = str(spec['spec_num']) | |
version_code = self.version_to_code(str(spec['vers'])) | |
if not version_code: | |
with self.DICT_LOCK: | |
self.processed_count += 1 | |
return | |
document = None | |
already_indexed = False | |
with self.DOCUMENT_LOCK: | |
doc_in_cache = doc_id in self.documents_by_spec_num and \ | |
self.documents_by_spec_num[doc_id]["hash"] == self.hasher(doc_id, version_code) | |
if doc_in_cache and doc_id not in self.specifications_passed: | |
document = self.documents_by_spec_num[doc_id] | |
self.specifications_passed.add(doc_id) | |
already_indexed = True | |
elif doc_id not in self.specifications_passed: | |
doc_content = self.get_spec_content(doc_id, version_code) | |
if doc_content: | |
document = {"content": doc_content, "hash": self.hasher(doc_id, version_code)} | |
with self.DOCUMENT_LOCK: | |
self.documents_by_spec_num[doc_id] = document | |
self.specifications_passed.add(doc_id) | |
already_indexed = False | |
if document: | |
url = f"https://www.3gpp.org/ftp/Specs/archive/{doc_id.split('.')[0]}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" | |
metadata = { | |
"id": doc_id, | |
"title": spec.get("title", ""), | |
"type": spec.get("type", ""), | |
"version": str(spec.get("vers", "")), | |
"working_group": spec.get("WG", ""), | |
"url": url, | |
"scope": self.get_scope(document["content"]) | |
} | |
key = f"{doc_id}+-+{spec.get('title', '')}+-+{spec.get('type', '')}+-+{spec.get('vers', '')}+-+{spec.get('WG', '')}" | |
with self.DICT_LOCK: | |
self.indexed_specifications[key] = metadata | |
with self.DICT_LOCK: | |
self.processed_count += 1 | |
status = "already indexed" if already_indexed else "indexed now" | |
print(f"Spec {doc_id} ({spec.get('title', '')}): {status} - Progress {self.processed_count}/{self.total_count}") | |
except Exception as e: | |
traceback.print_exc() | |
print(f"Error processing spec {spec.get('spec_num')} v{spec.get('vers')}: {e}") | |
with self.DICT_LOCK: | |
self.processed_count += 1 | |
print(f"Progress: {self.processed_count}/{self.total_count} specs processed") | |
def get_document(self, spec_id: str, spec_title: str): | |
text = [f"{spec_id} - {spec_title}\n"] | |
for section in self.spec_contents: | |
if spec_id == section["doc_id"]: | |
text.extend([f"{section['section']}\n\n{section['content']}"]) | |
return text | |
def create_bm25_index(self): | |
dataset_metadata = self.indexed_specifications.values() | |
unique_specs = set() | |
corpus_json = [] | |
for specification in dataset_metadata: | |
if specification['id'] in unique_specs: continue | |
for section in self.spec_contents: | |
if specification['id'] == section['doc_id']: | |
corpus_json.append({"text": f"{section['section']}\n{section['content']}", "metadata": { | |
"id": specification['id'], | |
"title": specification['title'], | |
"section_title": section['section'], | |
"version": specification['version'], | |
"type": specification['type'], | |
"working_group": specification['working_group'], | |
"url": specification['url'], | |
"scope": specification['scope'] | |
}}) | |
corpus_text = [doc["text"] for doc in corpus_json] | |
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") | |
print("Indexing BM25") | |
retriever = BM25HF(corpus=corpus_json) | |
retriever.index(corpus_tokens) | |
retriever.save_to_hub("OrganizedProgrammers/3GPPBM25IndexSections", token=os.environ.get("HF")) | |
unique_specs = set() | |
corpus_json = [] | |
for specification in dataset_metadata: | |
if specification['id'] in unique_specs: continue | |
text_list = self.get_document(specification['id'], specification['title']) | |
text = "\n".join(text_list) | |
if len(text_list) == 1: continue | |
corpus_json.append({"text": text, "metadata": specification}) | |
unique_specs.add(specification['id']) | |
corpus_text = [doc["text"] for doc in corpus_json] | |
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") | |
print("Indexing BM25") | |
retriever = BM25HF(corpus=corpus_json) | |
retriever.index(corpus_tokens) | |
retriever.save_to_hub("OrganizedProgrammers/3GPPBM25IndexSingle", token=os.environ.get("HF")) | |
def run(self): | |
print("Fetching specification tables from 3GPP...") | |
specifications = self.fetch_spec_table() | |
self.total_count = len(specifications) | |
print(f"Processing {self.total_count} specs with {self.max_workers} threads...") | |
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
futures = [executor.submit(self.process_specification, spec) for spec in specifications] | |
for f in concurrent.futures.as_completed(futures): | |
if self.STOP_EVENT.is_set(): | |
break | |
print("All specs processed.") | |
# Sauvegarde (identique au script original) | |
def save(self): | |
print("Saving indexed data...") | |
flat_metadata = [metadata for metadata in self.indexed_specifications.values()] | |
flat_docs = [] | |
print("Flatting doc contents") | |
for doc_id, data in self.documents_by_spec_num.items(): | |
for title, content in data["content"].items(): | |
flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content}) | |
print("Creating datasets ...") | |
push_spec_content = Dataset.from_list(flat_docs) | |
push_spec_metadata = Dataset.from_list(flat_metadata) | |
# Token handling assumed set in environment | |
print("Pushing ...") | |
push_spec_content.push_to_hub("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF"]) | |
push_spec_metadata.push_to_hub("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF"]) | |
self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list() | |
self.documents_by_spec_num = self._make_doc_index(self.spec_contents) | |
print("Save finished.") | |
class SpecETSIIndexer: | |
def __init__(self, max_workers=16): | |
self.session = requests.Session() | |
self.session.verify = False | |
self.spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent")["train"].to_list() | |
self.documents_by_spec_num = self._make_doc_index(self.spec_contents) | |
self.indexed_specifications = {} | |
self.specifications_passed = set() | |
self.processed_count = 0 | |
self.total_count = 0 | |
self.DICT_LOCK = threading.Lock() | |
self.DOCUMENT_LOCK = threading.Lock() | |
self.STOP_EVENT = threading.Event() | |
self.max_workers = max_workers | |
self.df = self._fetch_spec_table() | |
def _make_doc_index(self, specs): | |
doc_index = {} | |
for section in specs: | |
if section["doc_id"] not in doc_index: | |
doc_index[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]} | |
else: | |
doc_index[section["doc_id"]]["content"][section["section"]] = section["content"] | |
return doc_index | |
def _fetch_spec_table(self): | |
# Connexion login et récupération CSV TS/TR | |
print("Connexion login ETSI...") | |
self.session.post( | |
"https://portal.etsi.org/ETSIPages/LoginEOL.ashx", | |
verify=False, | |
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ..."}, | |
data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")}), | |
) | |
print("Récupération des métadonnées TS/TR …") | |
url_ts = "https://www.etsi.org/?option=com_standardssearch&view=data&format=csv&includeScope=1&page=1&search=&title=1&etsiNumber=1&content=0&version=0&onApproval=0&published=1&withdrawn=0&historical=0&isCurrent=1&superseded=0&harmonized=0&keyword=&TB=&stdType=TS&frequency=&mandate=&collection=&sort=1" | |
url_tr = url_ts.replace("stdType=TS", "stdType=TR") | |
data_ts = self.session.get(url_ts, verify=False).content | |
data_tr = self.session.get(url_tr, verify=False).content | |
df_ts = pd.read_csv(io.StringIO(data_ts.decode('utf-8')), sep=";", skiprows=1, index_col=False) | |
df_tr = pd.read_csv(io.StringIO(data_tr.decode('utf-8')), sep=";", skiprows=1, index_col=False) | |
backup_ts = df_ts["ETSI deliverable"] | |
backup_tr = df_tr["ETSI deliverable"] | |
df_ts["ETSI deliverable"] = df_ts["ETSI deliverable"].str.extract(r"\s*ETSI TS (\d+ \d+(?:-\d+(?:-\d+)?)?)") | |
df_tr["ETSI deliverable"] = df_tr["ETSI deliverable"].str.extract(r"\s*ETSI TR (\d+ \d+(?:-\d+(?:-\d+)?)?)") | |
version1 = backup_ts.str.extract(r"\s*ETSI TS \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)") | |
version2 = backup_tr.str.extract(r"\s*ETSI TR \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)") | |
df_ts["Version"] = version1[0] | |
df_tr["Version"] = version2[0] | |
def ver_tuple(v): | |
return tuple(map(int, v.split("."))) | |
df_ts["temp"] = df_ts["Version"].apply(ver_tuple) | |
df_tr["temp"] = df_tr["Version"].apply(ver_tuple) | |
df_ts["Type"] = "TS" | |
df_tr["Type"] = "TR" | |
df = pd.concat([df_ts, df_tr]) | |
unique_df = df.loc[df.groupby("ETSI deliverable")["temp"].idxmax()] | |
unique_df = unique_df.drop(columns="temp") | |
unique_df = unique_df[(~unique_df["title"].str.contains("3GPP", case=True, na=False))] | |
df = df.drop(columns="temp") | |
df = df[(~df["title"].str.contains("3GPP", case=True, na=False))] | |
return df | |
def hasher(specification: str, version: str): | |
return hashlib.md5(f"{specification}{version}".encode()).hexdigest() | |
def get_scope(content): | |
for title, text in content.items(): | |
if title.lower().endswith("scope"): | |
return text | |
return "" | |
def get_document(self, spec_id: str, spec_title: str): | |
text = [f"{spec_id} - {spec_title}\n"] | |
for section in self.spec_contents: | |
if spec_id == section["doc_id"]: | |
text.extend([f"{section['section']}\n\n{section['content']}"]) | |
return text | |
def get_text(self, specification: str): | |
if self.STOP_EVENT.is_set(): | |
return None, [] | |
print(f"\n[INFO] Tentative de récupération de la spécification {specification}", flush=True) | |
try: | |
# Récupérer la ligne avec le bon lien PDF | |
row = self.df[self.df["ETSI deliverable"] == specification] | |
if row.empty: | |
print(f"[WARN] Spécification {specification} absente du tableau") | |
return None, [] | |
pdf_link = row.iloc[0]["PDF link"] | |
response = self.session.get( | |
pdf_link, | |
headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...'} | |
) | |
if response.status_code != 200: | |
print(f"[ERREUR] Echec du téléchargement du PDF pour {specification}.") | |
return None, [] | |
pdf = fitz.open(stream=response.content, filetype="pdf") | |
return pdf, pdf.get_toc() | |
except Exception as e: | |
print(f"[ERROR] Échec get_text pour {specification} : {e}", flush=True) | |
return None, [] | |
def get_spec_content(self, specification: str): | |
def extract_sections(text, titles): | |
sections = {} | |
sorted_titles = sorted(titles, key=lambda t: text.find(t)) | |
for i, title in enumerate(sorted_titles): | |
start = text.find(title) | |
if i + 1 < len(sorted_titles): | |
end = text.find(sorted_titles[i + 1]) | |
sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:end].replace(title, "").strip().rstrip()) | |
else: | |
sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:].replace(title, "").strip().rstrip()) | |
return sections | |
if self.STOP_EVENT.is_set(): | |
return {} | |
print(f"[INFO] Extraction du contenu de {specification}", flush=True) | |
pdf, doc_toc = self.get_text(specification) | |
text = [] | |
if not pdf or not doc_toc: | |
print("[ERREUR] Pas de texte ou table of contents trouvé !") | |
return {} | |
# On prend à partir de la première réelle page référencée | |
first_page = 0 | |
for level, title, page in doc_toc: | |
first_page = page - 1 | |
break | |
for page in pdf[first_page:]: | |
text.append("\n".join([line.strip() for line in page.get_text().splitlines()])) | |
text = "\n".join(text) | |
if not text or not doc_toc or self.STOP_EVENT.is_set(): | |
print("[ERREUR] Pas de texte/table of contents récupéré !") | |
return {} | |
titles = [] | |
for level, title, page in doc_toc: | |
if self.STOP_EVENT.is_set(): | |
return {} | |
if title and title[0].isnumeric() and '\n'.join(title.strip().split(" ", 1)) in text: | |
titles.append('\n'.join(title.strip().split(" ", 1))) | |
return extract_sections(text, titles) | |
def process_specification(self, spec): | |
if self.STOP_EVENT.is_set(): | |
return | |
try: | |
version = spec.get('Version') | |
if not version: return | |
doc_id = str(spec.get("ETSI deliverable")) | |
document = None | |
already_indexed = False | |
with self.DOCUMENT_LOCK: | |
if (doc_id in self.documents_by_spec_num | |
and self.documents_by_spec_num[doc_id]["hash"] == self.hasher(doc_id, version) | |
and doc_id not in self.specifications_passed): | |
document = self.documents_by_spec_num[doc_id] | |
self.specifications_passed.add(doc_id) | |
already_indexed = True | |
elif doc_id in self.specifications_passed: | |
document = self.documents_by_spec_num[doc_id] | |
already_indexed = True | |
else: | |
document_content = self.get_spec_content(doc_id) | |
if document_content: | |
self.documents_by_spec_num[doc_id] = {"content": document_content, "hash": self.hasher(doc_id, version)} | |
document = {"content": document_content, "hash": self.hasher(doc_id, version)} | |
self.specifications_passed.add(doc_id) | |
already_indexed = False | |
if document: | |
string_key = f"{doc_id}+-+{spec['title']}+-+{spec['Type']}+-+{spec['Version']}" | |
metadata = { | |
"id": str(doc_id), | |
"title": spec["title"], | |
"type": spec["Type"], | |
"version": version, | |
"url": spec["PDF link"], | |
"scope": "" if not document else self.get_scope(document["content"]) | |
} | |
with self.DICT_LOCK: | |
self.indexed_specifications[string_key] = metadata | |
with self.DICT_LOCK: | |
self.processed_count += 1 | |
status = "already indexed" if already_indexed else "indexed now" | |
print(f"Spec {doc_id} ({spec.get('title', '')}): {status} - Progress {self.processed_count}/{self.total_count}") | |
except Exception as e: | |
traceback.print_exc() | |
print(f"\n[ERREUR] Échec du traitement de {doc_id} {spec.get('Version')}: {e}", flush=True) | |
with self.DICT_LOCK: | |
self.processed_count += 1 | |
print(f"Progress: {self.processed_count}/{self.total_count} specs processed") | |
def run(self): | |
print("Démarrage indexation ETSI…") | |
specifications = self.df.to_dict(orient="records") | |
self.total_count = len(specifications) | |
print(f"Traitement de {self.total_count} specs avec {self.max_workers} threads...\n") | |
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
futures = [executor.submit(self.process_specification, spec) for spec in specifications] | |
for f in concurrent.futures.as_completed(futures): | |
if self.STOP_EVENT.is_set(): | |
break | |
print(f"\nAll {self.processed_count}/{self.total_count} specs processed.") | |
def save(self): | |
print("\nSauvegarde en cours...", flush=True) | |
flat_metadata = [metadata for metadata in self.indexed_specifications.values()] | |
flat_docs = [] | |
for doc_id, data in self.documents_by_spec_num.items(): | |
for title, content in data["content"].items(): | |
flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content}) | |
push_spec_content = Dataset.from_list(flat_docs) | |
push_spec_metadata = Dataset.from_list(flat_metadata) | |
push_spec_content.push_to_hub("OrganizedProgrammers/ETSISpecContent", token=os.environ["HF"]) | |
push_spec_metadata.push_to_hub("OrganizedProgrammers/ETSISpecMetadata", token=os.environ["HF"]) | |
self.spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent")["train"].to_list() | |
self.documents_by_spec_num = self._make_doc_index(self.spec_contents) | |
print("Sauvegarde terminée.") | |
def create_bm25_index(self): | |
dataset_metadata = self.indexed_specifications.values() | |
unique_specs = set() | |
corpus_json = [] | |
for specification in dataset_metadata: | |
if specification['id'] in unique_specs: continue | |
for section in self.spec_contents: | |
if specification['id'] == section['doc_id']: | |
corpus_json.append({"text": f"{section['section']}\n{section['content']}", "metadata": { | |
"id": specification['id'], | |
"title": specification['title'], | |
"section_title": section['section'], | |
"version": specification['version'], | |
"type": specification['type'], | |
"url": specification['url'], | |
"scope": specification['scope'] | |
}}) | |
corpus_text = [doc["text"] for doc in corpus_json] | |
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") | |
print("Indexing BM25") | |
retriever = BM25HF(corpus=corpus_json) | |
retriever.index(corpus_tokens) | |
retriever.save_to_hub("OrganizedProgrammers/ETSIBM25IndexSections", token=os.environ.get("HF")) | |
unique_specs = set() | |
corpus_json = [] | |
for specification in dataset_metadata: | |
if specification['id'] in unique_specs: continue | |
text_list = self.get_document(specification['id'], specification['title']) | |
text = "\n".join(text_list) | |
if len(text_list) == 1: continue | |
corpus_json.append({"text": text, "metadata": specification}) | |
unique_specs.add(specification['id']) | |
corpus_text = [doc["text"] for doc in corpus_json] | |
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") | |
print("Indexing BM25") | |
retriever = BM25HF(corpus=corpus_json) | |
retriever.index(corpus_tokens) | |
retriever.save_to_hub("OrganizedProgrammers/ETSIBM25IndexSingle", token=os.environ.get("HF")) | |