import requests, os, zipfile, subprocess, re, warnings warnings.filterwarnings("ignore") os.environ["CURL_CA_BUNDLE"] = "" from io import BytesIO from dotenv import load_dotenv load_dotenv() from datasets import load_dataset import fitz from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel app = FastAPI(title="Specification Retriever/Splitter API", description=open('documentation.md').read(), docs_url="/") origins = [ "*", ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) spec_contents_3gpp = load_dataset("OrganizedProgrammers/3GPPSpecContent") spec_contents_3gpp = spec_contents_3gpp["train"].to_list() spec_contents_etsi = load_dataset("OrganizedProgrammers/ETSISpecContent") spec_contents_etsi = spec_contents_etsi["train"].to_list() spec_3gpp_format = re.compile(r'^\d{2}\.\d{3}(?:-\d+)?') spec_etsi_format = re.compile(r'^\d{,3} \d{,3}(?:-\d+)?') class SpecRequest(BaseModel): spec_id: str def is_doc_indexed(spec_id: str): return any([True if spec_id == s["doc_id"] else False for s in spec_contents_3gpp]) or any([True if spec_id == s["doc_id"] else False for s in spec_contents_etsi]) def get_doc(spec_id: str): doc = [] for spec in spec_contents_3gpp + spec_contents_etsi: if spec["doc_id"] == spec_id: doc.append(f"{spec['section']}\n{spec['content']}") return "\n\n".join(doc) def get_structured_doc(spec_id: str): doc = {} for spec in spec_contents_3gpp + spec_contents_etsi: if spec["doc_id"] == spec_id: doc[spec["section"]] = spec["content"] return doc def get_pdf_data(request: SpecRequest): specification = request.spec_id if is_doc_indexed(specification): return get_doc(specification) url = requests.post( "https://organizedprogrammers-docfinder.hf.space/find/single", verify=False, headers={"Content-Type": "application/json"}, json={"doc_id": specification} ) if url.status_code != 200: raise HTTPException(404, detail="Not found") url = url.json()['url'] response = requests.get( url, verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"} ) pdf = fitz.open(stream=response.content, filetype="pdf") return pdf, pdf.get_toc() @app.post("/extract_text/full") def extract_full_spec(request: SpecRequest): specification = request.spec_id if is_doc_indexed(specification): return get_doc(specification) print(f"[WARNING] Document no. {specification} not indexed or is a TDoc, if it's a specification, try to reindex") total_file = [] if spec_3gpp_format.match(specification): url = requests.post( "https://organizedprogrammers-docfinder.hf.space/find/single", verify=False, headers={"Content-Type": "application/json"}, json={"doc_id": specification} ) if url.status_code != 200: raise HTTPException(404, detail="Not found") url = url.json()['url'] response = requests.get( url, verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"} ) zip_bytes = BytesIO(response.content) current_zip_file = zipfile.ZipFile(zip_bytes) for file_info in current_zip_file.infolist(): if file_info.filename.endswith(".zip") and len(current_zip_file.namelist()) == 1: nested_zip_bytes = BytesIO(current_zip_file.read(file_info.filename)) current_zip_file = zipfile.ZipFile(nested_zip_bytes) break for file_info in current_zip_file.infolist(): filename = file_info.filename if (filename.endswith('.doc') or filename.endswith('.docx')) and ("cover" not in filename.lower() and "annex" not in filename.lower()): doc_bytes = current_zip_file.read(filename) ext = filename.split(".")[-1] input_path = f"/tmp/{specification}.{ext}" output_path = f"/tmp/{specification}.txt" with open(input_path, "wb") as f: f.write(doc_bytes) subprocess.run([ "libreoffice", "--headless", "--convert-to", "txt", "--outdir", "/tmp", input_path ], check=True) with open(output_path, "r") as f: txt_data = [line.strip() for line in f if line.strip()] os.remove(input_path) os.remove(output_path) total_file.extend(txt_data) if total_file == []: raise HTTPException(status_code=404, detail="Not found !") else: return total_file elif spec_etsi_format.match(specification): print("\n[INFO] Tentative de récupération du texte", flush=True) pdf, doc_toc = get_pdf_data(request) text = [] first = 0 for level, title, page in doc_toc: if title[0].isnumeric(): first = page - 1 break for page in pdf[first:]: text.append("\n".join([line.strip() for line in page.get_text().splitlines()])) text = "\n".join(text) if not text or not doc_toc: print("\n[ERREUR] Pas de texte/table of contents trouvé !") return {} print(f"\n[INFO] Texte {request.spec_id} récupéré", flush=True) return text else: raise HTTPException(status_code=400, detail="Document ID format invalid !") @app.post("/extract_text/structured") def extract_full_spec_by_chapters(request: SpecRequest): specification = request.spec_id if is_doc_indexed(request.spec_id): return get_structured_doc(request.spec_id) print(f"[WARNING] Document no. {specification} not indexed or is a TDoc, if it's a specification, try to reindex") total_file = [] text = extract_full_spec(request) if spec_3gpp_format.match(specification): chapters = [] chapter_regex = re.compile(r"^(\d+[a-z]?(?:\.\d+)*)\t[A-Z0-9][\ \S]+$") for i, line in enumerate(text): if chapter_regex.fullmatch(line): chapters.append((i, line)) document = {} for i in range(len(chapters)): start_index, chapter_title = chapters[i] end_index = chapters[i+1][0] if i+1 < len(chapters) else len(text) content_lines = text[start_index + 1 : end_index] document[chapter_title.replace('\t', " ")] = "\n".join(content_lines) return document elif spec_etsi_format.match(specification): def extract_sections(text, titles): sections = {} # On trie les titres selon leur position dans le texte sorted_titles = sorted(titles, key=lambda t: text.find(t)) for i, title in enumerate(sorted_titles): start = text.find(title) if i + 1 < len(sorted_titles): end = text.find(sorted_titles[i + 1]) sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:end].replace(title, "").strip().rstrip()) else: sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:].replace(title, "").strip().rstrip()) return sections pdf, toc = get_pdf_data(request) if not text or not toc: print("\n[ERREUR] Pas de texte/table of contents trouvé !") return {} print(f"\n[INFO] Texte {request.spec_id} récupéré", flush=True) titles = [] for level, title, page in toc: if title[0].isnumeric() and '\n'.join(title.strip().split(" ", 1)) in text: titles.append('\n'.join(title.strip().split(" ", 1))) return extract_sections(text, titles) else: raise HTTPException(status_code=400, detail="Document ID format invalid !")