""" 🤗 SkladBot Free AI Microservice Hugging Face Spaces микросервис для БЕСПЛАТНОЙ обработки складских документов Возможности: - TrOCR для печатного и рукописного текста - LayoutLM для понимания структуры документов - Table Transformer для обработки таблиц - Gradio API для REST запросов - 100% БЕСПЛАТНО - 20k запросов/месяц """ import gradio as gr import torch import numpy as np from PIL import Image import io import base64 import json import re from datetime import datetime from typing import Dict, List, Any, Optional # Transformers models from transformers import ( TrOCRProcessor, VisionEncoderDecoderModel, pipeline, AutoTokenizer, AutoModelForTokenClassification ) # Импортируем наш кастомный токенайзер from custom_tokenizers import Byt5LangTokenizer # Регистрируем кастомный токенайзер в transformers from transformers import AutoConfig, AutoTokenizer from transformers.models.auto.tokenization_auto import TOKENIZER_MAPPING from transformers.tokenization_utils_base import TOKENIZER_CONFIG_FILE # Регистрация кастомного токенайзера if 'Byt5LangTokenizer' not in dir(): try: # Добавляем токенайзер в систему автоматического обнаружения transformers from transformers.models.auto.tokenization_auto import TOKENIZER_MAPPING, TOKENIZER_MAPPING_NAMES from transformers.tokenization_utils_base import TOKENIZER_CONFIG_FILE print("🔄 Регистрируем кастомный токенайзер Byt5LangTokenizer...") except ImportError as e: print(f"⚠️ Предупреждение при импорте модулей трансформеров: {e}") from surya.table_rec import TableRecPredictor class FreeAIOrchestrator: """Координатор БЕСПЛАТНЫХ AI сервисов для складских документов""" def __init__(self): print("🚀 Инициализация SkladBot Free AI...") # TrOCR для печатного текста (БЕСПЛАТНО) self.printed_processor = TrOCRProcessor.from_pretrained("microsoft/trocr-base-printed") self.printed_model = VisionEncoderDecoderModel.from_pretrained("microsoft/trocr-base-printed") # TrOCR для рукописного текста (БЕСПЛАТНО) self.handwritten_processor = TrOCRProcessor.from_pretrained("microsoft/trocr-base-handwritten") self.handwritten_model = VisionEncoderDecoderModel.from_pretrained("microsoft/trocr-base-handwritten") # LayoutLM для понимания документов (БЕСПЛАТНО) self.document_qa = pipeline( "document-question-answering", model="impira/layoutlm-document-qa" ) # Table Transformer для таблиц (БЕСПЛАТНО) self.table_detector = pipeline( "object-detection", model="microsoft/table-transformer-structure-recognition" ) # NEW: Добавляем интеграцию с Surya Table (БЕСПЛАТНО) try: # Регистрируем кастомный токенайзер перед загрузкой модели print("🔄 Инициализация кастомного токенайзера для Surya Table...") # Используем пайплайн с указанием нашего токенайзера self.surya_table_model = TableRecPredictor() print("✅ Surya Table модель загружена успешно") self.surya_table_available = True except Exception as e: print(f"⚠️ Не удалось загрузить Surya Table: {e}") self.surya_table_available = False self.stats = { "total_requests": 0, "successful_extractions": 0, "avg_confidence": 0.0, "start_time": datetime.now() } print("✅ SkladBot Free AI готов к работе!") async def extract_warehouse_data(self, image, document_type="auto"): """Главная функция - извлечение данных из складских документов""" self.stats["total_requests"] += 1 try: # Конвертация изображения if isinstance(image, str): # Base64 строка image_data = base64.b64decode(image) image = Image.open(io.BytesIO(image_data)) # 1. Определение типа документа doc_type = await self.classify_document_type(image) if document_type != "auto": doc_type = document_type print(f"🔍 Определен тип документа: {doc_type}") # 2. Выбор стратегии обработки extraction_results = [] # Специальная обработка для таблиц if doc_type == "table" or document_type == "table": print("📊 Обнаружена таблица, запускаем специальную обработку...") table_result = await self.extract_table_data(image) # Подробное логирование результатов обработки таблицы print(f"📋 Результат обработки таблицы: {json.dumps(table_result, indent=2)}") # Добавляем результат в общий список extraction_results.append({ "method": table_result.get("model", "table_recognition"), "data": table_result, "confidence": table_result.get("confidence", 0.9) }) # Для таблиц возвращаем результат сразу # Преобразуем строки таблицы в извлеченные элементы items = [] if "rows" in table_result and table_result["rows"]: for row in table_result["rows"]: item = { "name": row.get("name", ""), "quantity": row.get("quantity", 0), "article": row.get("article", ""), "price": row.get("price", 0), "confidence": 0.9 } items.append(item) # Если строки извлечены успешно, возвращаем результат if items: print(f"✅ Извлечено {len(items)} товаров из таблицы") return { "success": True, "document_type": "table", "extracted_items": items, "suggestions": [], "raw_extractions": extraction_results, "confidence": table_result.get("confidence", 0.9), "processing_time": f"{datetime.now().timestamp():.2f}s", "cost": 0.0 # ВСЕГДА БЕСПЛАТНО! } # Стандартная обработка для других типов документов # TrOCR для печатного текста printed_text = await self.extract_printed_text(image) extraction_results.append({ "method": "trocr_printed", "text": printed_text, "confidence": 0.85 }) # TrOCR для рукописного текста (если нужно) if doc_type in ["handwritten", "mixed"]: handwritten_text = await self.extract_handwritten_text(image) extraction_results.append({ "method": "trocr_handwritten", "text": handwritten_text, "confidence": 0.80 }) # LayoutLM для структурированного понимания if doc_type in ["invoice", "table", "form"]: structured_data = await self.extract_structured_data(image, doc_type) extraction_results.append({ "method": "layoutlm", "data": structured_data, "confidence": 0.90 }) # 3. Объединение и обработка результатов final_result = await self.merge_extraction_results(extraction_results, doc_type) # 4. Парсинг складских команд warehouse_commands = await self.parse_warehouse_commands(final_result) # 5. Генерация предложений suggestions = await self.generate_smart_suggestions(warehouse_commands) self.stats["successful_extractions"] += 1 return { "success": True, "document_type": doc_type, "extracted_items": warehouse_commands, "suggestions": suggestions, "raw_extractions": extraction_results, "confidence": self.calculate_overall_confidence(extraction_results), "processing_time": f"{datetime.now().timestamp():.2f}s", "cost": 0.0 # ВСЕГДА БЕСПЛАТНО! } except Exception as e: print(f"❌ Ошибка обработки документа: {str(e)}") import traceback traceback.print_exc() return { "success": False, "error": str(e), "document_type": "unknown", "extracted_items": [], "suggestions": [], "confidence": 0.0, "cost": 0.0 } async def classify_document_type(self, image): """Определение типа документа по его содержимому""" try: # Проверяем, не является ли документ таблицей if self._is_likely_table(image): print("📊 Изображение похоже на таблицу на основе анализа структуры") return "table" # Анализируем изображение на наличие рукописного текста # Это приблизительная оценка, можно улучшить # Проверяем наличие печатного текста printed_text = await self.extract_printed_text(image) if printed_text and len(printed_text) > 50: # Проверяем наличие характерных для накладных или счетов ключевых слов invoice_keywords = ["счет", "накладная", "фактура", "товар", "поставщик", "итого", "сумма", "количество"] if any(keyword in printed_text.lower() for keyword in invoice_keywords): return "invoice" # Проверяем наличие ключевых слов для других типов документов form_keywords = ["форма", "заявка", "анкета", "заполните"] if any(keyword in printed_text.lower() for keyword in form_keywords): return "form" # По умолчанию для документов с текстом return "document" # По умолчанию, если не можем определить тип return "unknown" except Exception as e: print(f"⚠️ Ошибка при определении типа документа: {e}") return "unknown" def _is_likely_table(self, image): """Определение вероятности, что изображение содержит таблицу""" try: # Конвертируем изображение в numpy array import numpy as np from PIL import Image # Если это не PIL Image, конвертируем if not isinstance(image, Image.Image): if isinstance(image, str): # Обработка base64 if image.startswith('data:image'): image_data = base64.b64decode(image.split(',')[1]) image = Image.open(io.BytesIO(image_data)) else: # Обработка пути к файлу image = Image.open(image) else: # Если передан bytes image = Image.open(io.BytesIO(image)) # Конвертируем в grayscale if image.mode != 'L': img_gray = image.convert('L') else: img_gray = image # Преобразуем в numpy array img_array = np.array(img_gray) # 1. Проверка на наличие горизонтальных и вертикальных линий # Используем простой градиентный детектор # Горизонтальный градиент (для вертикальных линий) h_gradient = np.abs(np.diff(img_array, axis=1)) h_lines = np.sum(h_gradient > 50, axis=1) # Подсчет точек с большим градиентом # Вертикальный градиент (для горизонтальных линий) v_gradient = np.abs(np.diff(img_array, axis=0)) v_lines = np.sum(v_gradient > 50, axis=0) # Подсчет точек с большим градиентом # 2. Анализ равномерности распределения линий # Таблицы обычно имеют регулярно расположенные линии # Находим пики в распределении линий (возможные линии таблицы) h_peaks = np.sum(h_lines > np.mean(h_lines) + np.std(h_lines)) v_peaks = np.sum(v_lines > np.mean(v_lines) + np.std(v_lines)) # 3. Проверка на наличие достаточного количества линий min_lines_for_table = 3 # Минимум 3 линии для таблицы # Если обнаружено достаточное количество как горизонтальных, так и вертикальных линий if h_peaks >= min_lines_for_table and v_peaks >= min_lines_for_table: print(f"📊 Обнаружены линии таблицы: {h_peaks} горизонтальных, {v_peaks} вертикальных") return True # 4. Проверка на регулярную структуру (равномерные промежутки между линиями) # Это более сложный анализ, но можно упростить # 5. Соотношение сторон документа - многие таблицы имеют характерные пропорции aspect_ratio = image.width / image.height if 0.7 <= aspect_ratio <= 1.5: # Таблицы часто примерно квадратные или слегка прямоугольные # Дополнительный фактор в пользу таблицы if h_peaks >= min_lines_for_table - 1 or v_peaks >= min_lines_for_table - 1: print(f"📊 Соотношение сторон ({aspect_ratio:.2f}) и линии указывают на таблицу") return True return False except Exception as e: print(f"⚠️ Ошибка при анализе таблицы: {e}") return False async def extract_printed_text(self, image): """Извлечение печатного текста через TrOCR""" try: pixel_values = self.printed_processor(image, return_tensors="pt").pixel_values generated_ids = self.printed_model.generate(pixel_values) generated_text = self.printed_processor.batch_decode(generated_ids, skip_special_tokens=True)[0] return generated_text except Exception as e: print(f"❌ Ошибка TrOCR печатный: {e}") return "" async def extract_handwritten_text(self, image): """Извлечение рукописного текста через TrOCR""" try: pixel_values = self.handwritten_processor(image, return_tensors="pt").pixel_values generated_ids = self.handwritten_model.generate(pixel_values) generated_text = self.handwritten_processor.batch_decode(generated_ids, skip_special_tokens=True)[0] return generated_text except Exception as e: print(f"❌ Ошибка TrOCR рукописный: {e}") return "" async def extract_structured_data(self, image, doc_type): """Структурированное понимание документа через LayoutLM""" try: # Определяем вопросы на основе типа документа questions = self.get_document_questions(doc_type) results = {} for question in questions: try: result = self.document_qa(image=image, question=question) results[question] = result["answer"] except: results[question] = "" return results except Exception as e: print(f"❌ Ошибка LayoutLM: {e}") return {} async def extract_table_data(self, image): """Извлечение табличных данных через специализированные модели""" try: # Проверка наличия модели Surya Table if hasattr(self, 'surya_table_available') and self.surya_table_available: try: # Попытка использования Surya Table для структурированного распознавания таблиц print("🔍 Используем Surya Table для структурированного распознавания таблицы...") # Преобразуем PIL Image в формат, необходимый для модели if isinstance(image, str): # Если передан путь или base64 if image.startswith('data:image'): # Обработка base64 image_data = base64.b64decode(image.split(',')[1]) pil_image = Image.open(io.BytesIO(image_data)) else: # Обработка пути к файлу pil_image = Image.open(image) elif isinstance(image, Image.Image): pil_image = image else: # Если передан bytes pil_image = Image.open(io.BytesIO(image)) # Предобработка изображения для улучшения распознавания pil_image = self._preprocess_image_for_tables(pil_image) # Распознаем таблицу через Surya Table table_result = self.surya_table_model([pil_image]) # Выводим подробную информацию о результате для отладки print(f"📋 Результат Surya Table: {table_result}") # Подробная отладочная информация if isinstance(table_result, list): print(f"📊 Длина результата: {len(table_result)}") if len(table_result) > 0: print(f"📊 Тип первого элемента: {type(table_result[0])}") if isinstance(table_result[0], dict): print(f"📊 Ключи первого элемента: {table_result[0].keys()}") # Преобразуем результат в структурированный формат try: structured_rows = [] # Результат может быть в разных форматах if isinstance(table_result, list) and len(table_result) > 0: if isinstance(table_result[0], dict): if 'cells' in table_result[0]: structured_rows = self._parse_surya_table(table_result[0]) print(f"✅ Извлечено {len(structured_rows)} строк из таблицы") # Если у нас есть строки, возвращаем результат if structured_rows: return { "success": True, "type": "table", "model": "surya_table", "rows": structured_rows, "raw_text": self._extract_text_from_rows(structured_rows), "confidence": 0.95 } else: print("⚠️ Surya Table не смогла извлечь строки таблицы") except Exception as parse_error: print(f"⚠️ Ошибка парсинга результата Surya Table: {parse_error}") # Продолжаем с запасным вариантом except Exception as surya_error: print(f"⚠️ Ошибка Surya Table, используем запасной вариант: {surya_error}") # Запасной вариант: Table Transformer для определения местоположения таблиц print("🔍 Используем Table Transformer для определения местоположения таблиц...") table_detection = self.table_detector(image) # Если обнаружены таблицы, используем TrOCR для извлечения текста из них table_data = [] for detection in table_detection: if detection["label"] == "table" and detection["score"] > 0.7: # Вырезаем область таблицы table_data.append({ "box": detection["box"], "score": detection["score"], "type": "table" }) # Если таблицы обнаружены, возвращаем данные о них if table_data: return { "success": True, "type": "table", "model": "table_transformer", "rows": [], # Пока нет извлеченных строк "detection": table_data, "confidence": 0.75 } # Если таблицы не обнаружены, возвращаем пустой результат return { "success": True, "type": "table", "model": "fallback", "rows": [], "confidence": 0.5 } except Exception as e: print(f"❌ Ошибка распознавания таблицы: {e}") return { "success": False, "error": str(e), "type": "table", "model": "error", "rows": [], "confidence": 0.0 } def _preprocess_image_for_tables(self, image): """Предобработка изображения для улучшения распознавания таблиц""" try: # Конвертируем в RGB если нужно if image.mode != 'RGB': image = image.convert('RGB') # Применяем повышение контраста для лучшего распознавания линий таблицы import numpy as np from PIL import ImageEnhance # Увеличиваем контраст enhancer = ImageEnhance.Contrast(image) image = enhancer.enhance(1.5) # Увеличиваем контраст на 50% # Увеличиваем резкость enhancer = ImageEnhance.Sharpness(image) image = enhancer.enhance(1.5) # Увеличиваем резкость на 50% return image except Exception as e: print(f"⚠️ Ошибка предобработки изображения: {e}") return image def _parse_surya_table(self, surya_result): """Парсинг результата Surya в структурированные данные""" rows = [] headers = {} # Печатаем полную структуру результата для отладки print(f"🔍 Анализ структуры Surya результата: {surya_result.keys()}") # Проверяем наличие ключа cells if 'cells' not in surya_result: print("⚠️ В результате Surya нет ключа 'cells'") return rows cells = surya_result.get('cells', []) print(f"📊 Количество ячеек: {len(cells)}") if len(cells) > 0: # Печатаем структуру первой ячейки для отладки print(f"📊 Пример ячейки: {cells[0]}") # First pass: get headers for cell in cells: if cell.get('is_header', False): col_id = cell.get('col_id', 0) header_text = cell.get('text', '').lower() headers[col_id] = header_text print(f"📋 Найден заголовок: col_id={col_id}, text={header_text}") # Если заголовки не найдены, используем первую строку как заголовки if not headers: # Находим все ячейки в первой строке first_row_cells = [c for c in cells if c.get('row_id', 0) == 0] for cell in first_row_cells: col_id = cell.get('col_id', 0) header_text = cell.get('text', '').lower() headers[col_id] = header_text print(f"📋 Используем первую строку как заголовок: col_id={col_id}, text={header_text}") # Second pass: get rows row_dict = {} for cell in cells: row_id = cell.get('row_id', 0) # Пропускаем строки заголовков, если они есть if headers and cell.get('is_header', False): continue if row_id not in row_dict: row_dict[row_id] = {} col_id = cell.get('col_id', 0) header = headers.get(col_id, str(col_id)) value = cell.get('text', '') # Логируем обработку ячеек print(f"📊 Обработка ячейки: row_id={row_id}, col_id={col_id}, header={header}, value={value}") # Преобразуем заголовки к стандартным полям if any(keyword in header for keyword in ['товар', 'название', 'наимен']): row_dict[row_id]['name'] = value elif any(keyword in header for keyword in ['кол', 'шт', 'количество']): try: # Извлекаем числовое значение quantity = re.search(r'(\d+(?:\.\d+)?)', value) if quantity: row_dict[row_id]['quantity'] = float(quantity.group(1)) else: row_dict[row_id]['quantity'] = value except: row_dict[row_id]['quantity'] = value elif any(keyword in header for keyword in ['арт', 'артикул']): row_dict[row_id]['article'] = value elif any(keyword in header for keyword in ['цен', 'стоим', 'сумм']): # Извлекаем числовое значение цены price = re.search(r'(\d+(?:\.\d+)?)', value) if price: row_dict[row_id]['price'] = float(price.group(1)) else: row_dict[row_id]['price'] = value else: # Для прочих колонок используем оригинальное название row_dict[row_id][header] = value # Собираем все строки rows = list(row_dict.values()) # Отладочная информация о результатах print(f"📊 Извлечено {len(rows)} строк из таблицы") if rows: print(f"📊 Пример первой строки: {rows[0]}") return rows def _extract_text_from_rows(self, rows): """Извлечение текстового представления из структурированных строк таблицы""" text = "" for row in rows: row_text = " ".join([f"{k}: {v}" for k, v in row.items()]) text += row_text + "\n" return text def _extract_columns(self, line): """Извлечение колонок из строки таблицы""" # Простое разделение по табуляции или нескольким пробелам return re.split(r'\t| +', line.strip()) async def merge_extraction_results(self, extraction_results, doc_type): """Объединение результатов разных AI методов""" merged_text = "" structured_data = {} for result in extraction_results: if "text" in result: merged_text += f"{result['text']} " if "data" in result and isinstance(result["data"], dict): structured_data.update(result["data"]) return { "combined_text": merged_text.strip(), "structured_data": structured_data, "document_type": doc_type } async def parse_warehouse_commands(self, extraction_result): """Парсинг складских команд из извлеченного текста""" text = extraction_result.get("combined_text", "") # Используем NER для извлечения сущностей try: entities = self.ner_pipeline(text) except: entities = [] # Регулярные выражения для складских данных warehouse_items = [] # Поиск артикулов (F186, ST9, F186ST9) article_pattern = r'\b(?:F\d+(?:ST\d+)?|ST\d+)\b' articles = re.findall(article_pattern, text, re.IGNORECASE) # Поиск количеств quantity_pattern = r'\b(\d+)\s*(?:шт|лист|листов|кг|м2|м²)\b' quantities = re.findall(quantity_pattern, text, re.IGNORECASE) # Поиск цен price_pattern = r'\b(\d+(?:\.\d+)?)\s*(?:руб|₽|р)\b' prices = re.findall(price_pattern, text, re.IGNORECASE) # Объединение найденных данных max_items = max(len(articles), len(quantities), 1) for i in range(max_items): item = { "article": articles[i] if i < len(articles) else "", "quantity": int(quantities[i]) if i < len(quantities) else 0, "price": float(prices[i]) if i < len(prices) else 0.0, "name": self.extract_product_name(text, i), "confidence": 0.8 } if item["article"] or item["quantity"] > 0: warehouse_items.append(item) return warehouse_items def extract_product_name(self, text, index=0): """Извлечение названия товара из текста""" # Простая эвристика для извлечения названий words = text.split() # Ищем слова после артикулов или количеств product_keywords = ["лдсп", "мдф", "фанера", "дуб", "бук", "ясень", "орех", "чикаго"] for word in words: if any(keyword in word.lower() for keyword in product_keywords): return word.title() return "Товар" async def generate_smart_suggestions(self, warehouse_items): """Генерация умных предложений""" suggestions = [] for item in warehouse_items: if not item["article"]: suggestions.append({ "type": "missing_article", "message": f"Не найден артикул для товара '{item['name']}'", "action": "manual_input", "priority": "high" }) if item["quantity"] == 0: suggestions.append({ "type": "missing_quantity", "message": f"Не найдено количество для '{item['article'] or item['name']}'", "action": "manual_input", "priority": "medium" }) if item["price"] == 0: suggestions.append({ "type": "missing_price", "message": f"Не найдена цена для '{item['article'] or item['name']}'", "action": "suggest_price", "priority": "low" }) return suggestions def calculate_overall_confidence(self, extraction_results): """Расчет общей уверенности""" if not extraction_results: return 0.0 total_confidence = sum(result.get("confidence", 0) for result in extraction_results) return round(total_confidence / len(extraction_results), 2) def get_stats(self): """Статистика работы микросервиса""" return { "quota_used": f"{self.stats['total_requests']}/20000", "uptime_hours": (datetime.now() - self.stats["start_time"]).total_seconds() / 3600, "models_loaded": ["TrOCR", "LayoutLM", "TableTransformer", "RuBERT-NER", "SuryaTable"], "success_rate": self._calculate_success_rate() } def _calculate_success_rate(self): """Расчет успешного процента""" if self.stats["total_requests"] == 0: return 0.0 return round(self.stats["successful_extractions"] / self.stats["total_requests"] * 100, 1) # Инициализация AI ai_orchestrator = FreeAIOrchestrator() # Gradio интерфейс def process_warehouse_document(image, document_type): """Обработка складского документа через Gradio""" try: import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete( ai_orchestrator.extract_warehouse_data(image, document_type) ) return json.dumps(result, ensure_ascii=False, indent=2) except Exception as e: return json.dumps({ "success": False, "error": f"Ошибка обработки: {str(e)}", "cost": 0.0 }, ensure_ascii=False, indent=2) def get_service_stats(): """Получение статистики сервиса""" stats = ai_orchestrator.get_stats() return json.dumps(stats, ensure_ascii=False, indent=2) # Gradio интерфейс with gr.Blocks(title="SkladBot Free AI") as app: gr.Markdown("# 🤖 SkladBot Free AI Microservice") gr.Markdown("**БЕСПЛАТНАЯ** обработка складских документов через AI") with gr.Tab("Обработка документов"): image_input = gr.Image(type="pil", label="Загрузите изображение документа") doc_type = gr.Dropdown( choices=["auto", "invoice", "table", "form", "handwritten"], value="auto", label="Тип документа" ) process_btn = gr.Button("🔍 Обработать документ", variant="primary") result_output = gr.Textbox( label="Результат обработки", lines=20, max_lines=30 ) process_btn.click( process_warehouse_document, inputs=[image_input, doc_type], outputs=result_output ) with gr.Tab("Статистика"): stats_btn = gr.Button("📊 Обновить статистику") stats_output = gr.Textbox( label="Статистика сервиса", lines=10 ) stats_btn.click( get_service_stats, outputs=stats_output ) gr.Markdown("---") gr.Markdown("💰 **Стоимость**: $0 (100% бесплатно)") gr.Markdown("📊 **Лимит**: 20,000 запросов/месяц") gr.Markdown("🧠 **AI модели**: TrOCR, LayoutLM, Table Transformer, RuBERT, SuryaTable") if __name__ == "__main__": app.launch( server_name="0.0.0.0", server_port=7860, share=True, show_error=True )