import re import fitz # PyMuPDF import language_tool_python from typing import List, Dict, Any, Tuple from collections import Counter import json import traceback # Keep for debugging, but try to minimize in final user-facing JSON import io import tempfile import os import gradio as gr # Set JAVA_HOME environment variable if 'JAVA_HOME' not in os.environ: potential_java_homes = [ '/usr/lib/jvm/java-11-openjdk-amd64', '/usr/lib/jvm/java-17-openjdk-amd64', # For macOS users with Homebrew OpenJDK (common paths): # '/opt/homebrew/opt/openjdk@11/libexec/openjdk.jdk/Contents/Home', # M1/M2 Macs # '/usr/local/opt/openjdk@11/libexec/openjdk.jdk/Contents/Home', # Intel Macs # '/opt/homebrew/opt/openjdk/libexec/openjdk.jdk/Contents/Home', # Default OpenJDK Homebrew ] # User-specific path from environment if available user_java_home = os.environ.get('USER_JAVA_HOME_CONFIG') # Example custom env var if user_java_home and os.path.exists(user_java_home): potential_java_homes.insert(0, user_java_home) for jh in potential_java_homes: if os.path.exists(jh): os.environ['JAVA_HOME'] = jh print(f"Set JAVA_HOME to: {jh}") break if 'JAVA_HOME' not in os.environ: print("Warning: JAVA_HOME not found or set. LanguageTool might fail.") print("Please set JAVA_HOME environment variable to your JDK (version 11+) installation path,") print("or ensure your LanguageTool setup (e.g., remote server) does not require it locally.") # ------------------------------ # Text Extraction & Analysis Functions # ------------------------------ def extract_pdf_text_for_general_checks(file_path_or_stream) -> str: """Extracts full text from a PDF file using PyMuPDF4LLM for general regex checks.""" temp_file_path_holder = [] pdf_path_for_pymupdf4llm = None try: if isinstance(file_path_or_stream, str) and os.path.exists(file_path_or_stream): pdf_path_for_pymupdf4llm = file_path_or_stream elif hasattr(file_path_or_stream, 'read'): # Gradio File(type="binary") gives bytes, wrapped in BytesIO with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file: file_path_or_stream.seek(0) temp_file.write(file_path_or_stream.read()) pdf_path_for_pymupdf4llm = temp_file.name temp_file_path_holder.append(pdf_path_for_pymupdf4llm) elif isinstance(file_path_or_stream, bytes): with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file: temp_file.write(file_path_or_stream) pdf_path_for_pymupdf4llm = temp_file.name temp_file_path_holder.append(pdf_path_for_pymupdf4llm) else: print(f"Unsupported input type for PyMuPDF4LLM: {type(file_path_or_stream)}") return "" if not pdf_path_for_pymupdf4llm: print("PDF path could not be determined for PyMuPDF4LLM.") return "" import pymupdf4llm full_text = pymupdf4llm.to_markdown(pdf_path_for_pymupdf4llm) return full_text except Exception as e: print(f"Error extracting text with PyMuPDF4LLM: {str(e)}") return "" finally: if temp_file_path_holder: try: os.remove(temp_file_path_holder[0]) except OSError as e_os: print(f"Warning: Could not remove temp file {temp_file_path_holder[0]}: {e_os}") def extract_word_data_and_text_for_lt(file_path_or_stream) -> Tuple[str, List[Dict[str, Any]]]: doc = None try: if isinstance(file_path_or_stream, str) and os.path.exists(file_path_or_stream): doc = fitz.open(file_path_or_stream) elif hasattr(file_path_or_stream, 'read'): # BytesIO or tempfile file_path_or_stream.seek(0) doc = fitz.open(stream=file_path_or_stream.read(), filetype="pdf") elif isinstance(file_path_or_stream, bytes): doc = fitz.open(stream=file_path_or_stream, filetype="pdf") else: print(f"Unsupported input type for extract_word_data_and_text_for_lt: {type(file_path_or_stream)}") return "", [] except Exception as e: print(f"Error opening PDF in extract_word_data_and_text_for_lt: {e}") return "", [] word_coords_data_intermediate = [] for page_idx, page in enumerate(doc): # Using sort=True attempts to get words in reading order. words_on_page = page.get_text("words", sort=True) for w_info in words_on_page: # (x0, y0, x1, y1, "word", block_no, line_no, word_no) word_text = w_info[4] word_rect = fitz.Rect(w_info[0:4]) if word_text.strip(): # Ensure word is not just whitespace word_coords_data_intermediate.append({ 'text': word_text, 'page_num': page_idx, 'rect': word_rect, }) doc.close() text_for_lt = " ".join([item['text'] for item in word_coords_data_intermediate]) word_coords_data_final = [] current_char_pos_recalc = 0 for i, item_data in enumerate(word_coords_data_intermediate): final_item = item_data.copy() final_item['start_offset'] = current_char_pos_recalc word_coords_data_final.append(final_item) current_char_pos_recalc += len(final_item['text']) if i < len(word_coords_data_intermediate) - 1: # Add 1 for the space current_char_pos_recalc += 1 return text_for_lt, word_coords_data_final def check_text_presence(full_text: str, search_terms: List[str]) -> Dict[str, bool]: return {term: term.lower() in full_text.lower() for term in search_terms} def check_metadata(full_text: str) -> Dict[str, Any]: return { "author_email_present": bool(re.search(r'\b[\w.-]+?@\w+?\.\w+?\b', full_text)), "authors_list_heading_present": bool(re.search(r'(?:Authors?|AUTHORS?):\s*', full_text)), "keywords_list_heading_present": bool(re.search(r'(?:Keywords?|KEYWORDS?):\s*', full_text, re.IGNORECASE)), "word_count_estimate": len(full_text.split()) if full_text else "Missing" } def check_disclosures(full_text: str) -> Dict[str, bool]: search_terms = [ "conflict of interest statement", "COI statement", "ethics statement", "ethical approval", "funding statement", "acknowledgment of funding", "data availability statement", "data access statement" ] # Use a more robust check by looking for variations and combining results results = {} results["conflict_of_interest_statement"] = any(term.lower() in full_text.lower() for term in search_terms[0:2]) results["ethics_statement"] = any(term.lower() in full_text.lower() for term in search_terms[2:4]) results["funding_statement"] = any(term.lower() in full_text.lower() for term in search_terms[4:6]) results["data_access_statement"] = any(term.lower() in full_text.lower() for term in search_terms[6:8]) has_author_contribution = ("author contribution" in full_text.lower() or # Catches singular and plural "authors contribution" in full_text.lower()) # Common variation results["author_contribution_statement"] = has_author_contribution return results def check_figures_and_tables_overview(full_text: str) -> Dict[str, bool]: return { "figures_mentioned": bool(re.search(r'Fig(?:ure)?\s*\d+', full_text, re.IGNORECASE)), "tables_mentioned": bool(re.search(r'Table\s*\d+', full_text, re.IGNORECASE)), } def check_references_overview(full_text: str) -> Dict[str, Any]: has_references_section = bool(re.search(r"^\s*(?:References|Bibliography)\s*$", full_text, re.IGNORECASE | re.MULTILINE)) citations_in_text = re.findall(r'\[\d+(?:,\s*\d+)*(?:–\d+)?\]', full_text) # Matches [1], [1,2], [1-3], [1, 2-5] reference_list_items = [] if has_references_section: match_ref_sec = re.search(r"^\s*(?:References|Bibliography)\s*$(.*)", full_text, re.IGNORECASE | re.MULTILINE | re.DOTALL) if match_ref_sec: references_text_block = match_ref_sec.group(1) reference_list_items = re.findall(r"^\s*(?:\[\d+\]|\d+\.\s)", references_text_block, re.MULTILINE) # [1] or 1. return { "references_section_heading_present": has_references_section, "citations_in_text_count": len(citations_in_text), "reference_list_items_count_heuristic": len(reference_list_items), "old_references_present_pre_1995": bool(re.search(r'\b(?:19[0-8]\d|199[0-4])\b', full_text)), } def check_structure_overview(full_text: str) -> Dict[str, bool]: imrad_sections = ["Introduction", "Methods", "Materials and Methods", "Results", "Discussion"] imrad_found_count = 0 if any(re.search(rf"^\s*Introduction\b", full_text, re.IGNORECASE | re.MULTILINE) for s in [imrad_sections[0]]): imrad_found_count+=1 if any(re.search(rf"^\s*(?:Methods|Materials\s+and\s+Methods)\b", full_text, re.IGNORECASE | re.MULTILINE) for s in imrad_sections[1:3]): imrad_found_count+=1 if any(re.search(rf"^\s*Results\b", full_text, re.IGNORECASE | re.MULTILINE) for s in [imrad_sections[3]]): imrad_found_count+=1 if any(re.search(rf"^\s*Discussion\b", full_text, re.IGNORECASE | re.MULTILINE) for s in [imrad_sections[4]]): imrad_found_count+=1 has_abstract_section = bool(re.search(r"^\s*Abstract\b", full_text, re.IGNORECASE | re.MULTILINE)) return { "abstract_section_heading_present": has_abstract_section, "imrad_structure_partially_present": imrad_found_count >=3, # e.g. at least 3 of 4 main sections "imrad_sections_detected_count": imrad_found_count } def check_language_issues(text_for_lt: str) -> Dict[str, Any]: try: tool_path = os.environ.get('LT_PATH') # If LT_PATH is set, use it; otherwise, try remote server or allow LT to manage its server. # Default for language_tool_python if no server/path is given is to often start its own managed server. # Forcing remote_server=None if LT_PATH is given. language_tool = language_tool_python.LanguageTool( 'en-US', remote_server='http://localhost:8081' if not tool_path else None, language_tool_path=tool_path if tool_path else None ) matches = language_tool.check(text_for_lt) issues = [] for match in matches: # Example: ignore a common false positive or stylistic choice if match.ruleId in ["EN_SPLIT_WORDS_HYPHEN", "UPPERCASE_SENTENCE_START", "MORFOLOGIK_RULE_EN_US"]: continue issues.append({ "message": match.message, "context": match.context.strip(), "error_text_segment": match.context[match.contextOffset : match.contextOffset + match.errorLength], "suggestions": match.replacements[:3] if match.replacements else [], "category": match.category, "rule_id": match.ruleId, "offset": match.offset, "length": match.errorLength, }) regex_pattern = r'\b(\w+)\[(\d+)\]' regex_matches = list(re.finditer(regex_pattern, text_for_lt)) for match_re in regex_matches: word = match_re.group(1) number = match_re.group(2) issues.append({ "message": f"Missing space before '[' in '{word}[{number}]'. Suggestion: '{word} [{number}]'.", "context": text_for_lt[max(match_re.start() - 40, 0):min(match_re.end() + 40, len(text_for_lt))].strip(), "error_text_segment": match_re.group(0), "suggestions": [f"{word} [{number}]"], "category": "Formatting", "rule_id": "MISSING_SPACE_BEFORE_BRACKET_CITATION", "offset": match_re.start(), "length": match_re.end() - match_re.start(), }) return {"total_issues": len(issues), "issues": issues} except ConnectionRefusedError: error_msg = "LanguageTool Error: Connection to LT server (e.g., http://localhost:8081) refused. Ensure it's running, or configure LT_PATH for local JAR usage." print(error_msg) return {"error": error_msg, "issues": []} except Exception as e: error_msg = f"Error checking language issues: {type(e).__name__} - {e}" print(error_msg) # print(traceback.format_exc()) # For server-side debugging return {"error": error_msg, "issues": []} def check_figure_table_order(full_text: str) -> Dict[str, Any]: fig_pattern = r'(?:Fig(?:ure)?\.?|Figure)\s*(\d+)' fig_refs_in_order = [int(num) for num in re.findall(fig_pattern, full_text, re.IGNORECASE)] tbl_pattern = r'Table\s*(\d+)' tbl_refs_in_order = [int(num) for num in re.findall(tbl_pattern, full_text, re.IGNORECASE)] def analyze_numbering(refs_list, item_type="Item"): issues = [] if not refs_list: return {"references_in_order_of_appearance": [], f"{item_type}_numbering_issues": ["Not mentioned."]} unique_sorted_refs = sorted(list(set(refs_list))) max_ref_num = unique_sorted_refs[-1] if unique_sorted_refs else 0 # Check for missing numbers in the sequence up to max_ref_num expected_sequence = list(range(1, max_ref_num + 1)) missing_numbers = [num for num in expected_sequence if num not in unique_sorted_refs] if missing_numbers: issues.append(f"Missing {item_type.lower()}(s) in sequence up to {max_ref_num}: {missing_numbers}") # Check if first mention is sequential (e.g. Fig 1 then Fig 2, not Fig 2 then Fig 1) # This is a simplified check on the raw list of appearances. # A more robust check would track first appearances of unique numbers. # For now, if the list of unique items in appearance order is not sorted. first_occurrence_map = {} unique_refs_in_appearance_order = [] for ref_num in refs_list: if ref_num not in first_occurrence_map: first_occurrence_map[ref_num] = True unique_refs_in_appearance_order.append(ref_num) if unique_refs_in_appearance_order != sorted(unique_refs_in_appearance_order): issues.append(f"{item_type}s may not be first mentioned in strict numerical order. Sequence of first mentions: {unique_refs_in_appearance_order}") return { "references_in_order_of_appearance": refs_list, f"{item_type.lower()}_numbering_issues": issues if issues else ["Appears OK based on simple checks."] } fig_analysis = analyze_numbering(fig_refs_in_order, "Figure") tbl_analysis = analyze_numbering(tbl_refs_in_order, "Table") return {**fig_analysis, **tbl_analysis} # ------------------------------ # Highlighting Function # ------------------------------ def highlight_issues_in_pdf( pdf_file_or_stream, word_coords_data: List[Dict[str, Any]], language_issues_list: List[Dict[str, Any]] # text_for_lt is implicitly used via offsets stored in language_issues_list ) -> Tuple[List[Dict[str, Any]], bytes]: doc = None try: if isinstance(pdf_file_or_stream, str) and os.path.exists(pdf_file_or_stream): doc = fitz.open(pdf_file_or_stream) elif hasattr(pdf_file_or_stream, 'read'): pdf_file_or_stream.seek(0) doc = fitz.open(stream=pdf_file_or_stream.read(), filetype="pdf") elif isinstance(pdf_file_or_stream, bytes): doc = fitz.open(stream=pdf_file_or_stream, filetype="pdf") else: print(f"Unsupported PDF input type in highlight_issues_in_pdf: {type(pdf_file_or_stream)}") return language_issues_list, b"" # Return original issues, no PDF bytes except Exception as e: print(f"Error opening PDF in highlight_issues_in_pdf: {e}") return language_issues_list, b"" issues_with_coords_and_page = [] for issue_details in language_issues_list: issue_offset = issue_details["offset"] issue_length = issue_details["length"] error_text_to_search = issue_details["error_text_segment"] current_issue_output = issue_details.copy() current_issue_output["page"] = 0 current_issue_output["coordinates"] = [] # [x0, y0, x1, y1] candidate_pdf_words_info = [] for word_info in word_coords_data: word_start_offset = word_info['start_offset'] word_end_offset = word_start_offset + len(word_info['text']) if word_start_offset < (issue_offset + issue_length) and issue_offset < word_end_offset: candidate_pdf_words_info.append(word_info) if not candidate_pdf_words_info: issues_with_coords_and_page.append(current_issue_output) continue page_num_for_issue = candidate_pdf_words_info[0]["page_num"] page_to_search_on = doc[page_num_for_issue] clip_search_rect = fitz.Rect(candidate_pdf_words_info[0]['rect']) for i in range(1, len(candidate_pdf_words_info)): clip_search_rect.include_rect(candidate_pdf_words_info[i]['rect']) clip_search_rect.x0 -= 3 # Small padding for search_for clip_search_rect.y0 -= 3 clip_search_rect.x1 += 3 clip_search_rect.y1 += 3 clip_search_rect.normalize() found_rects_on_page = [] if error_text_to_search.strip(): try: # search_for is case-sensitive by default if query has mixed case. # LT error_text_segment usually preserves case. found_rects_on_page = page_to_search_on.search_for(error_text_to_search, clip=clip_search_rect, quads=False) except Exception as search_e: print(f"PyMuPDF search_for error: '{search_e}' for text '{error_text_to_search}' on page {page_num_for_issue+1}. Skipping this highlight.") if found_rects_on_page: current_issue_output["page"] = page_num_for_issue + 1 overall_bounds = fitz.Rect(found_rects_on_page[0]) for r_idx in range(1, len(found_rects_on_page)): overall_bounds.include_rect(found_rects_on_page[r_idx]) current_issue_output["coordinates"] = [ round(overall_bounds.x0, 2), round(overall_bounds.y0, 2), round(overall_bounds.x1, 2), round(overall_bounds.y1, 2) ] for rect_to_highlight in found_rects_on_page: if not rect_to_highlight.is_empty and rect_to_highlight.width > 0.1 and rect_to_highlight.height > 0.1: # Min width/height highlight_annot = page_to_search_on.add_highlight_annot(rect_to_highlight) if highlight_annot: highlight_annot.set_colors(stroke=(1, 1, 0)) # Yellow highlight_annot.update(opacity=0.4) # Make highlight slightly transparent issues_with_coords_and_page.append(current_issue_output) output_pdf_bytes = io.BytesIO() try: doc.save(output_pdf_bytes, garbage=3, deflate=True) # Options for smaller size annotated_pdf_bytes_content = output_pdf_bytes.getvalue() except Exception as e: print(f"Error saving annotated PDF: {e}") annotated_pdf_bytes_content = b"" finally: doc.close() output_pdf_bytes.close() return issues_with_coords_and_page, annotated_pdf_bytes_content # ------------------------------ # Main Analysis Function # ------------------------------ def analyze_pdf(pdf_input_data) -> Tuple[Dict[str, Any], bytes]: results = {"language_issues": [], "general_document_checks": {}, "analysis_errors": []} annotated_pdf_bytes = None # Ensure pdf_input_data can be read multiple times if it's a stream input_bytes_content = None if hasattr(pdf_input_data, 'read'): pdf_input_data.seek(0) input_bytes_content = pdf_input_data.read() # For functions below, create new BytesIO if they expect a stream elif isinstance(pdf_input_data, bytes): input_bytes_content = pdf_input_data elif isinstance(pdf_input_data, str) and os.path.exists(pdf_input_data): # Path with open(pdf_input_data, "rb") as f_path: input_bytes_content = f_path.read() else: results["analysis_errors"].append(f"Invalid PDF input data type: {type(pdf_input_data)}") return results, None if not input_bytes_content: results["analysis_errors"].append("PDF input data is empty or unreadable.") return results, None try: # General checks use PyMuPDF4LLM text pdf_stream_for_general = io.BytesIO(input_bytes_content) full_text_for_general_checks = extract_pdf_text_for_general_checks(pdf_stream_for_general) pdf_stream_for_general.close() if full_text_for_general_checks: results["general_document_checks"] = { "metadata": check_metadata(full_text_for_general_checks), "disclosures": check_disclosures(full_text_for_general_checks), "figures_tables_overview": check_figures_and_tables_overview(full_text_for_general_checks), "references_overview": check_references_overview(full_text_for_general_checks), "structure_overview": check_structure_overview(full_text_for_general_checks), "figure_table_order": check_figure_table_order(full_text_for_general_checks), } else: results["analysis_errors"].append("Failed to extract text using PyMuPDF4LLM for general checks.") # Language checks and highlighting use word-based extraction pdf_stream_for_lt = io.BytesIO(input_bytes_content) text_for_lt, word_coords_data = extract_word_data_and_text_for_lt(pdf_stream_for_lt) pdf_stream_for_lt.close() if not text_for_lt and not word_coords_data: results["analysis_errors"].append("Could not extract word data for language analysis and highlighting.") else: language_issues_result = check_language_issues(text_for_lt) # text_for_lt is passed here if "error" in language_issues_result: results["analysis_errors"].append(f"Language check error: {language_issues_result['error']}") lt_issues_list = language_issues_result.get("issues", []) if lt_issues_list: pdf_stream_for_highlighting = io.BytesIO(input_bytes_content) updated_lt_issues_list, annotated_pdf_bytes = highlight_issues_in_pdf( pdf_stream_for_highlighting, word_coords_data, lt_issues_list ) pdf_stream_for_highlighting.close() results["language_issues"] = updated_lt_issues_list else: # No issues, or error in check_language_issues results["language_issues"] = lt_issues_list # Will be empty if no issues, or contain error if LT failed if not results["analysis_errors"]: del results["analysis_errors"] # Rename "issues" to "language_issues" in the top-level results for clarity if "issues" in results and "language_issues" not in results: # Should be handled by now results["language_issues"] = results.pop("issues") return results, annotated_pdf_bytes except Exception as e: error_msg = f"Critical error in analyze_pdf: {type(e).__name__} - {e}" print(error_msg) # print(traceback.format_exc()) # Server-side debug current_errors = results.get("analysis_errors", []) current_errors.append(error_msg) results["analysis_errors"] = current_errors return results, None # ------------------------------ # Gradio Interface # ------------------------------ def process_upload(file_bytes_from_gradio): if file_bytes_from_gradio is None: return json.dumps({"error_message": "No file uploaded"}, indent=2), None try: # analyze_pdf now robustly handles bytes or streams results, annotated_pdf_output_bytes = analyze_pdf(file_bytes_from_gradio) # Sanitize results for JSON (e.g., convert fitz.Rect if any slipped through) # This should ideally be handled within each check function if it returns complex objects not meant for JSON. # For now, assume results are JSON-serializable. results_json = json.dumps(results, indent=2, ensure_ascii=False) if annotated_pdf_output_bytes: with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_annotated_pdf_file: tmp_annotated_pdf_file.write(annotated_pdf_output_bytes) return results_json, tmp_annotated_pdf_file.name # Gradio needs a path for gr.File output else: return results_json, None except Exception as e: error_msg = f"Error processing file in Gradio interface: {type(e).__name__} - {e}" print(error_msg) # print(traceback.format_exc()) # Server-side debug return json.dumps({"error_message": error_msg}, indent=2), None def create_interface(): with gr.Blocks(title="PDF Analyzer", theme=gr.themes.Glass()) as interface: # Changed theme gr.Markdown("# PDF Document Analyzer") gr.Markdown( "Upload a PDF to check for common manuscript issues. " "Language checks use LanguageTool (EN-US). Ensure your LanguageTool setup is correct " "(e.g., local server on port 8081, or LT_PATH environment variable for local JAR)." ) with gr.Row(): file_input = gr.File( label="Upload PDF Document", file_types=[".pdf"], type="binary" # Receives bytes ) analyze_btn = gr.Button("Analyze PDF", variant="primary", scale=0) # scale=0 for smaller button gr.Markdown("## Analysis Results") with gr.Tabs(): with gr.TabItem("Detailed Report"): results_output = gr.JSON(label="JSON Report", scale=2) # Increased scale for more space with gr.TabItem("Annotated PDF"): # Changed to gr.File for download, as direct PDF viewer is not standard in Gradio pdf_output_display = gr.File(label="Download Annotated PDF (if issues were highlighted)", interactive=False) analyze_btn.click( fn=process_upload, inputs=[file_input], outputs=[results_output, pdf_output_display] ) gr.Markdown("---") gr.Markdown("Developed with PyMuPDF, LanguageTool, and Gradio. Alpha version.") return interface if __name__ == "__main__": print("PDF Analyzer launching...") print("Ensure LanguageTool is accessible (e.g., server at http://localhost:8081 or LT_PATH set).") # Example: To run LT server: java -cp languagetool-server.jar org.languagetool.server.HTTPServer --port 8081 --allow-origin "*" # Example: os.environ['LT_PATH'] = '/path/to/languagetool-6.X/' (if you have the full distribution) interface = create_interface() interface.launch( share=True, # For ngrok public link # server_name="0.0.0.0", # To allow access from network # server_port=7860 )