# coding: utf-8 # Copyright (c) 2025 inclusionAI. import json import os import base64 import tempfile import subprocess from typing import Any, Dict, Tuple from urllib.parse import urlparse from pydantic import BaseModel from aworld.config import ToolConfig from examples.tools.tool_action import DocumentExecuteAction from aworld.core.common import Observation, ActionModel, ActionResult from aworld.core.tool.base import ToolFactory, Tool from aworld.logs.util import logger from examples.tools.document.utils import encode_image_from_file, encode_image_from_url from aworld.utils import import_package, import_packages from aworld.tools.utils import build_observation class InputDocument(BaseModel): document_path: str | None = None @ToolFactory.register(name="document_analysis", desc="document analysis", supported_action=DocumentExecuteAction, conf_file_name=f'document_analysis_tool.yaml') class DocumentTool(Tool): def __init__(self, conf: ToolConfig, **kwargs) -> None: """Init document tool.""" import_package('cv2', install_name='opencv-python') import_packages(['xmltodict', 'pandas', 'docx2markdown', 'PyPDF2', 'numpy']) super(DocumentTool, self).__init__(conf, **kwargs) self.cur_observation = None self.content = None self.keyframes = [] self.init() self.step_finished = True def reset(self, *, seed: int | None = None, options: Dict[str, str] | None = None) -> Tuple[ Observation, dict[str, Any]]: super().reset(seed=seed, options=options) self.close() self.step_finished = True return build_observation(observer=self.name(), ability=DocumentExecuteAction.DOCUMENT_ANALYSIS.value.name), {} def init(self) -> None: self.initialized = True def close(self) -> None: pass def finished(self) -> bool: return self.step_finished def do_step(self, actions: list[ActionModel], **kwargs) -> Tuple[Observation, float, bool, bool, Dict[str, Any]]: self.step_finished = False reward = 0. fail_error = "" observation = build_observation(observer=self.name(), ability=DocumentExecuteAction.DOCUMENT_ANALYSIS.value.name) info = {} try: if not actions: raise ValueError("actions is empty") action = actions[0] document_path = action.params.get("document_path", "") if not document_path: raise ValueError("document path invalid") output, keyframes, error = self.document_analysis(document_path) observation.content = output observation.action_result.append( ActionResult(is_done=True, success=False if error else True, content=f"{output}", error=f"{error}", keep=False)) info['key_frame'] = f"{keyframes}" reward = 1. except Exception as e: fail_error = str(e) finally: self.step_finished = True info["exception"] = fail_error info.update(kwargs) return (observation, reward, kwargs.get("terminated", False), kwargs.get("truncated", False), info) def document_analysis(self, document_path): import xmltodict error = None # Initialize content to empty list to avoid None return self.content = [] try: if any(document_path.endswith(ext) for ext in [".jpg", ".jpeg", ".png"]): parsed_url = urlparse(document_path) is_url = all([parsed_url.scheme, parsed_url.netloc]) if not is_url: base64_image = encode_image_from_file(document_path) else: base64_image = encode_image_from_url(document_path) self.content = f"data:image/jpeg;base64,{base64_image}" if any(document_path.endswith(ext) for ext in ["xls", "xlsx"]): try: try: import pandas as pd except ImportError: error = "pandas library not found. Please install pandas: pip install pandas" return self.content, self.keyframes, error excel_data = {} with pd.ExcelFile(document_path) as xls: sheet_names = xls.sheet_names for sheet_name in sheet_names: df = pd.read_excel(xls, sheet_name=sheet_name) sheet_data = df.to_dict(orient='records') excel_data[sheet_name] = sheet_data self.content = json.dumps(excel_data, ensure_ascii=False) logger.info(f"Successfully processed Excel file: {document_path}") logger.info(f"Found {len(sheet_names)} sheets: {', '.join(sheet_names)}") except Exception as excel_error: error = str(excel_error) if any(document_path.endswith(ext) for ext in ["json", "jsonl", "jsonld"]): with open(document_path, "r", encoding="utf-8") as f: self.content = json.load(f) f.close() if any(document_path.endswith(ext) for ext in ["xml"]): data = None with open(document_path, "r", encoding="utf-8") as f: data = f.read() f.close() try: self.content = xmltodict.parse(data) logger.info(f"The extracted xml data is: {self.content}") except Exception as e: logger.info(f"The raw xml data is: {data}") error = str(e) self.content = data if any(document_path.endswith(ext) for ext in ["doc", "docx"]): from docx2markdown._docx_to_markdown import docx_to_markdown file_name = os.path.basename(document_path) md_file_path = f"{file_name}.md" docx_to_markdown(document_path, md_file_path) with open(md_file_path, "r") as f: self.content = f.read() f.close() if any(document_path.endswith(ext) for ext in ["pdf"]): # try using pypdf to extract text from pdf try: from PyPDF2 import PdfReader # Open file in binary mode for PdfReader f = open(document_path, "rb") reader = PdfReader(f) extracted_text = "" for page in reader.pages: extracted_text += page.extract_text() self.content = extracted_text f.close() except Exception as pdf_error: error = str(pdf_error) # audio if any(document_path.endswith(ext.lower()) for ext in [".mp3", ".wav", ".wave"]): try: # audio-> base64 with open(document_path, "rb") as audio_file: audio_bytes = audio_file.read() audio_base64 = base64.b64encode(audio_bytes).decode('utf-8') # ext ext = os.path.splitext(document_path)[1].lower() mime_type = "audio/mpeg" if ext == ".mp3" else "audio/wav" # data URI self.content = f"data:{mime_type};base64,{audio_base64}" except Exception as audio_error: error = str(audio_error) logger.error(f"Error processing audio file: {error}") # video if any(document_path.endswith(ext.lower()) for ext in [".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv"]): try: try: import cv2 import numpy as np except ImportError: error = "Required libraries not found. Please install opencv-python: pip install opencv-python" return None, None, error # create temp dir temp_dir = tempfile.mkdtemp() # 1.get audio -> base64 audio_path = os.path.join(temp_dir, "extracted_audio.mp3") # get audio by ffmpeg try: subprocess.run([ "ffmpeg", "-i", document_path, "-q:a", "0", "-map", "a", audio_path, "-y" ], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # audio->base64 with open(audio_path, "rb") as audio_file: audio_bytes = audio_file.read() audio_base64 = base64.b64encode(audio_bytes).decode('utf-8') audio_data_uri = f"data:audio/mpeg;base64,{audio_base64}" except (subprocess.SubprocessError, FileNotFoundError) as e: logger.warning(f"Failed to extract audio: {str(e)}") audio_data_uri = None # 2. get keyframes cap = cv2.VideoCapture(document_path) if not cap.isOpened(): raise ValueError(f"Could not open video file: {document_path}") # get video message fps = cap.get(cv2.CAP_PROP_FPS) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = frame_count / fps if fps > 0 else 0 # keyframes policy- per duration/10s,max 10 keyframes_count = min(10, int(frame_count)) frames_interval = max(1, int(frame_count / keyframes_count)) self.keyframes = [] frame_index = 0 while True: ret, frame = cap.read() if not ret: break # per frames_interval save if frame_index % frames_interval == 0: # save JPEG -> base64 _, buffer = cv2.imencode(".jpg", frame) img_base64 = base64.b64encode(buffer).decode('utf-8') time_position = frame_index / fps if fps > 0 else 0 self.keyframes.append(f"data:image/jpeg;base64,{img_base64}") if len(self.keyframes) >= keyframes_count: break frame_index += 1 cap.release() self.content = audio_data_uri logger.info(f"Successfully processed video file: {document_path}") logger.info(f"Extracted {len(self.keyframes)} keyframes and audio track") # clean tmp files try: os.remove(audio_path) os.rmdir(temp_dir) except Exception as cleanup_error: logger.warning(f"Error cleaning up temp files: {str(cleanup_error)}") except Exception as video_error: error = str(video_error) logger.error(f"Error processing video file: {error}") if any(document_path.endswith(ext) for ext in ["pptx"]): try: # Initialize content list and empty keyframes self.content = [] self.keyframes = [] # Check if file exists if not os.path.exists(document_path): error = f"File does not exist: {document_path}" return self.content, self.keyframes, error # Check if file is readable if not os.access(document_path, os.R_OK): error = f"File is not readable: {document_path}" return self.content, self.keyframes, error # Check file size try: file_size = os.path.getsize(document_path) if file_size == 0: error = "File is empty" return self.content, self.keyframes, error except Exception as size_error: logger.warning(f"Cannot get file size: {str(size_error)}") try: # Import required libraries from pptx import Presentation from PIL import Image, ImageDraw, ImageFont import io except ImportError as import_error: error = f"Missing required libraries: {str(import_error)}. Please install: pip install python-pptx Pillow" return self.content, self.keyframes, error # Create temporary directory for images try: temp_dir = tempfile.mkdtemp() except Exception as temp_dir_error: error = f"Failed to create temporary directory: {str(temp_dir_error)}" return self.content, self.keyframes, error # Open presentation try: presentation = Presentation(document_path) # Get total slides count total_slides = len(presentation.slides) if total_slides == 0: error = "PPTX file does not contain any slides" return self.content, self.keyframes, error # Process each slide for i, slide in enumerate(presentation.slides): # Generate temporary file path for current slide img_path = os.path.join(temp_dir, f"slide_{i + 1}.jpg") # Get slide dimensions try: slide_width = presentation.slide_width slide_height = presentation.slide_height # PPTX dimensions are in EMU (English Metric Unit) # 1 inch = 914400 EMU, 1 cm = 360000 EMU # Convert to pixels (assuming 96 DPI) slide_width_px = int(slide_width / 914400 * 96 * 10) slide_height_px = int(slide_height / 914400 * 96 * 10) # Ensure dimensions are reasonable positive integers slide_width_px = max(1, min(slide_width_px, 4000)) # Limit max width to 4000px slide_height_px = max(1, min(slide_height_px, 3000)) # Limit max height to 3000px except Exception as size_error: # Use default dimensions slide_width_px = 960 # Default width 960px slide_height_px = 720 # Default height 720px # Create blank image try: # Log operation start # Create blank image try: slide_img = Image.new('RGB', (slide_width_px, slide_height_px), 'white') draw = ImageDraw.Draw(slide_img) except Exception as img_create_error: logger.error( f"Slide {i + 1} blank image creation failed: {str(img_create_error) or 'Unknown error'}") raise # Draw slide number try: font = ImageFont.load_default() draw.text((20, 20), f"Slide {i + 1}/{total_slides}", fill="black", font=font) except Exception as font_error: logger.warning(f"Failed to draw slide number: {str(font_error) or 'Unknown error'}") # Record shape count try: shape_count = len(slide.shapes) except Exception as shape_count_error: logger.warning( f"Failed to get slide {i + 1} shape count: {str(shape_count_error) or 'Unknown error'}") shape_count = 0 # Try to render shapes on image shape_success_count = 0 shape_fail_count = 0 try: for j, shape in enumerate(slide.shapes): try: shape_type = type(shape).__name__ # Process images if hasattr(shape, 'image') and shape.image: try: # Extract image from shape image_stream = io.BytesIO(shape.image.blob) img = Image.open(image_stream) # Calculate position left = shape.left top = shape.top # Paste image onto slide slide_img.paste(img, (left, top)) shape_success_count += 1 except Exception as img_error: logger.warning( f"Failed to process image {j + 1} in slide {i + 1}: {str(img_error) or 'Unknown error'}") if not str(img_error): import traceback logger.warning( f"Image processing stack: {traceback.format_exc()}") shape_fail_count += 1 # Process text elif hasattr(shape, 'text') and shape.text: try: text = shape.text[:30] + "..." if len( shape.text) > 30 else shape.text # Simple text rendering text_left = shape.left text_top = shape.top draw.text((text_left, text_top), shape.text, fill="black", font=font) shape_success_count += 1 except Exception as text_error: logger.warning( f"Failed to process text {j + 1} in slide {i + 1}: {str(text_error) or 'Unknown error'}") if not str(text_error): import traceback logger.warning( f"Text processing stack: {traceback.format_exc()}") shape_fail_count += 1 else: logger.info( f"Shape {j + 1} in slide {i + 1} is neither image nor text, skipping") except Exception as shape_error: if not str(shape_error): import traceback logger.warning(f"Shape processing stack: {traceback.format_exc()}") shape_fail_count += 1 except Exception as shapes_iteration_error: logger.error( f"Failed while iterating through shapes in slide {i + 1}: {str(shapes_iteration_error) or 'Unknown error'}") if not str(shapes_iteration_error): import traceback logger.error(f"Shape iteration stack: {traceback.format_exc()}") # Save slide image try: slide_img.save(img_path, 'JPEG') # Check if image was saved successfully if not os.path.exists(img_path): raise ValueError(f"Saved image file does not exist: {img_path}") file_size = os.path.getsize(img_path) if file_size == 0: raise ValueError( f"Saved image file is empty: {img_path}, size: {file_size} bytes") # Convert to base64 try: base64_image = encode_image_from_file(img_path) self.content.append(f"data:image/jpeg;base64,{base64_image}") except Exception as base64_error: error_msg = str(base64_error) or "Unknown base64 conversion error" if not str(base64_error): import traceback logger.error(f"Base64 conversion stack: {traceback.format_exc()}") raise ValueError(f"Base64 conversion error: {error_msg}") except Exception as save_error: error_msg = str(save_error) or "Unknown save error" logger.error(f"Failed to save slide {i + 1} as image: {error_msg}") if not str(save_error): import traceback logger.error(f"Image save stack: {traceback.format_exc()}") raise ValueError(f"Image save error: {error_msg}") except Exception as slide_render_error: error_msg = str(slide_render_error) or "Unknown rendering error" logger.error(f"Failed to render slide {i + 1}: {error_msg}") if not str(slide_render_error): import traceback logger.error(f"Slide rendering stack: {traceback.format_exc()}") # Continue processing next slide, don't interrupt the entire process continue except Exception as pptx_error: error = f"Failed to process PPTX file: {str(pptx_error)}" import traceback # Clean up temporary files try: for file in os.listdir(temp_dir): try: file_path = os.path.join(temp_dir, file) os.remove(file_path) except Exception as file_error: logger.warning(f"Failed to delete temporary file: {str(file_error)}") os.rmdir(temp_dir) except Exception as cleanup_error: logger.warning(f"Failed to clean up temporary files: {str(cleanup_error)}") if len(self.content) > 0: logger.info(f"Extracted {len(self.content)} slides") else: error = error or "Could not extract any slides from PPTX file" logger.error(error) except Exception as outer_error: error = f"Error occurred during PPTX file processing: {str(outer_error)}" import traceback return self.content, self.keyframes, error finally: pass return self.content, self.keyframes, error