SpecSplitter / app.py
om4r932's picture
Add doc
38799af
import requests, os, zipfile, subprocess, re, warnings
warnings.filterwarnings("ignore")
os.environ["CURL_CA_BUNDLE"] = ""
from io import BytesIO
from dotenv import load_dotenv
load_dotenv()
from datasets import load_dataset
import fitz
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
app = FastAPI(title="Specification Retriever/Splitter API",
description=open('documentation.md').read(),
docs_url="/")
origins = [
"*",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
spec_contents_3gpp = load_dataset("OrganizedProgrammers/3GPPSpecContent")
spec_contents_3gpp = spec_contents_3gpp["train"].to_list()
spec_contents_etsi = load_dataset("OrganizedProgrammers/ETSISpecContent")
spec_contents_etsi = spec_contents_etsi["train"].to_list()
spec_3gpp_format = re.compile(r'^\d{2}\.\d{3}(?:-\d+)?')
spec_etsi_format = re.compile(r'^\d{,3} \d{,3}(?:-\d+)?')
class SpecRequest(BaseModel):
spec_id: str
def is_doc_indexed(spec_id: str):
return any([True if spec_id == s["doc_id"] else False for s in spec_contents_3gpp]) or any([True if spec_id == s["doc_id"] else False for s in spec_contents_etsi])
def get_doc(spec_id: str):
doc = []
for spec in spec_contents_3gpp + spec_contents_etsi:
if spec["doc_id"] == spec_id:
doc.append(f"{spec['section']}\n{spec['content']}")
return "\n\n".join(doc)
def get_structured_doc(spec_id: str):
doc = {}
for spec in spec_contents_3gpp + spec_contents_etsi:
if spec["doc_id"] == spec_id:
doc[spec["section"]] = spec["content"]
return doc
def get_pdf_data(request: SpecRequest):
specification = request.spec_id
if is_doc_indexed(specification):
return get_doc(specification)
url = requests.post(
"https://organizedprogrammers-docfinder.hf.space/find/single",
verify=False,
headers={"Content-Type": "application/json"},
json={"doc_id": specification}
)
if url.status_code != 200:
raise HTTPException(404, detail="Not found")
url = url.json()['url']
response = requests.get(
url,
verify=False,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"}
)
pdf = fitz.open(stream=response.content, filetype="pdf")
return pdf, pdf.get_toc()
@app.post("/extract_text/full")
def extract_full_spec(request: SpecRequest):
specification = request.spec_id
if is_doc_indexed(specification):
return get_doc(specification)
print(f"[WARNING] Document no. {specification} not indexed or is a TDoc, if it's a specification, try to reindex")
total_file = []
if spec_3gpp_format.match(specification):
url = requests.post(
"https://organizedprogrammers-docfinder.hf.space/find/single",
verify=False,
headers={"Content-Type": "application/json"},
json={"doc_id": specification}
)
if url.status_code != 200:
raise HTTPException(404, detail="Not found")
url = url.json()['url']
response = requests.get(
url,
verify=False,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"}
)
zip_bytes = BytesIO(response.content)
current_zip_file = zipfile.ZipFile(zip_bytes)
for file_info in current_zip_file.infolist():
if file_info.filename.endswith(".zip") and len(current_zip_file.namelist()) == 1:
nested_zip_bytes = BytesIO(current_zip_file.read(file_info.filename))
current_zip_file = zipfile.ZipFile(nested_zip_bytes)
break
for file_info in current_zip_file.infolist():
filename = file_info.filename
if (filename.endswith('.doc') or filename.endswith('.docx')) and ("cover" not in filename.lower() and "annex" not in filename.lower()):
doc_bytes = current_zip_file.read(filename)
ext = filename.split(".")[-1]
input_path = f"/tmp/{specification}.{ext}"
output_path = f"/tmp/{specification}.txt"
with open(input_path, "wb") as f:
f.write(doc_bytes)
subprocess.run([
"libreoffice",
"--headless",
"--convert-to", "txt",
"--outdir", "/tmp",
input_path
], check=True)
with open(output_path, "r") as f:
txt_data = [line.strip() for line in f if line.strip()]
os.remove(input_path)
os.remove(output_path)
total_file.extend(txt_data)
if total_file == []:
raise HTTPException(status_code=404, detail="Not found !")
else:
return total_file
elif spec_etsi_format.match(specification):
print("\n[INFO] Tentative de récupération du texte", flush=True)
pdf, doc_toc = get_pdf_data(request)
text = []
first = 0
for level, title, page in doc_toc:
if title[0].isnumeric():
first = page - 1
break
for page in pdf[first:]:
text.append("\n".join([line.strip() for line in page.get_text().splitlines()]))
text = "\n".join(text)
if not text or not doc_toc:
print("\n[ERREUR] Pas de texte/table of contents trouvé !")
return {}
print(f"\n[INFO] Texte {request.spec_id} récupéré", flush=True)
return text
else:
raise HTTPException(status_code=400, detail="Document ID format invalid !")
@app.post("/extract_text/structured")
def extract_full_spec_by_chapters(request: SpecRequest):
specification = request.spec_id
if is_doc_indexed(request.spec_id):
return get_structured_doc(request.spec_id)
print(f"[WARNING] Document no. {specification} not indexed or is a TDoc, if it's a specification, try to reindex")
total_file = []
text = extract_full_spec(request)
if spec_3gpp_format.match(specification):
chapters = []
chapter_regex = re.compile(r"^(\d+[a-z]?(?:\.\d+)*)\t[A-Z0-9][\ \S]+$")
for i, line in enumerate(text):
if chapter_regex.fullmatch(line):
chapters.append((i, line))
document = {}
for i in range(len(chapters)):
start_index, chapter_title = chapters[i]
end_index = chapters[i+1][0] if i+1 < len(chapters) else len(text)
content_lines = text[start_index + 1 : end_index]
document[chapter_title.replace('\t', " ")] = "\n".join(content_lines)
return document
elif spec_etsi_format.match(specification):
def extract_sections(text, titles):
sections = {}
# On trie les titres selon leur position dans le texte
sorted_titles = sorted(titles, key=lambda t: text.find(t))
for i, title in enumerate(sorted_titles):
start = text.find(title)
if i + 1 < len(sorted_titles):
end = text.find(sorted_titles[i + 1])
sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:end].replace(title, "").strip().rstrip())
else:
sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:].replace(title, "").strip().rstrip())
return sections
pdf, toc = get_pdf_data(request)
if not text or not toc:
print("\n[ERREUR] Pas de texte/table of contents trouvé !")
return {}
print(f"\n[INFO] Texte {request.spec_id} récupéré", flush=True)
titles = []
for level, title, page in toc:
if title[0].isnumeric() and '\n'.join(title.strip().split(" ", 1)) in text:
titles.append('\n'.join(title.strip().split(" ", 1)))
return extract_sections(text, titles)
else:
raise HTTPException(status_code=400, detail="Document ID format invalid !")