Spaces:
Sleeping
Sleeping
""" | |
🤗 SkladBot Free AI Microservice | |
Hugging Face Spaces микросервис для БЕСПЛАТНОЙ обработки складских документов | |
Возможности: | |
- TrOCR для печатного и рукописного текста | |
- LayoutLM для понимания структуры документов | |
- Table Transformer для обработки таблиц | |
- Gradio API для REST запросов | |
- 100% БЕСПЛАТНО - 20k запросов/месяц | |
""" | |
import gradio as gr | |
import torch | |
import numpy as np | |
from PIL import Image | |
import io | |
import base64 | |
import json | |
import re | |
from datetime import datetime | |
from typing import Dict, List, Any, Optional | |
# Transformers models | |
from transformers import ( | |
TrOCRProcessor, VisionEncoderDecoderModel, | |
pipeline, | |
AutoTokenizer, AutoModelForTokenClassification | |
) | |
# Импортируем наш кастомный токенайзер | |
from custom_tokenizers import Byt5LangTokenizer | |
# Регистрируем кастомный токенайзер в transformers | |
from transformers import AutoConfig, AutoTokenizer | |
from transformers.models.auto.tokenization_auto import TOKENIZER_MAPPING | |
from transformers.tokenization_utils_base import TOKENIZER_CONFIG_FILE | |
# Регистрация кастомного токенайзера | |
if 'Byt5LangTokenizer' not in dir(): | |
try: | |
# Добавляем токенайзер в систему автоматического обнаружения transformers | |
from transformers.models.auto.tokenization_auto import TOKENIZER_MAPPING, TOKENIZER_MAPPING_NAMES | |
from transformers.tokenization_utils_base import TOKENIZER_CONFIG_FILE | |
print("🔄 Регистрируем кастомный токенайзер Byt5LangTokenizer...") | |
except ImportError as e: | |
print(f"⚠️ Предупреждение при импорте модулей трансформеров: {e}") | |
from surya.table_rec import TableRecPredictor | |
class FreeAIOrchestrator: | |
"""Координатор БЕСПЛАТНЫХ AI сервисов для складских документов""" | |
def __init__(self): | |
print("🚀 Инициализация SkladBot Free AI...") | |
# TrOCR для печатного текста (БЕСПЛАТНО) | |
self.printed_processor = TrOCRProcessor.from_pretrained("microsoft/trocr-base-printed") | |
self.printed_model = VisionEncoderDecoderModel.from_pretrained("microsoft/trocr-base-printed") | |
# TrOCR для рукописного текста (БЕСПЛАТНО) | |
self.handwritten_processor = TrOCRProcessor.from_pretrained("microsoft/trocr-base-handwritten") | |
self.handwritten_model = VisionEncoderDecoderModel.from_pretrained("microsoft/trocr-base-handwritten") | |
# LayoutLM для понимания документов (БЕСПЛАТНО) | |
self.document_qa = pipeline( | |
"document-question-answering", | |
model="impira/layoutlm-document-qa" | |
) | |
# Table Transformer для таблиц (БЕСПЛАТНО) | |
self.table_detector = pipeline( | |
"object-detection", | |
model="microsoft/table-transformer-structure-recognition" | |
) | |
# NEW: Добавляем интеграцию с Surya Table (БЕСПЛАТНО) | |
try: | |
# Регистрируем кастомный токенайзер перед загрузкой модели | |
print("🔄 Инициализация кастомного токенайзера для Surya Table...") | |
# Используем пайплайн с указанием нашего токенайзера | |
self.surya_table_model = TableRecPredictor() | |
print("✅ Surya Table модель загружена успешно") | |
self.surya_table_available = True | |
except Exception as e: | |
print(f"⚠️ Не удалось загрузить Surya Table: {e}") | |
self.surya_table_available = False | |
self.stats = { | |
"total_requests": 0, | |
"successful_extractions": 0, | |
"avg_confidence": 0.0, | |
"start_time": datetime.now() | |
} | |
print("✅ SkladBot Free AI готов к работе!") | |
async def extract_warehouse_data(self, image, document_type="auto"): | |
"""Главная функция - извлечение данных из складских документов""" | |
self.stats["total_requests"] += 1 | |
try: | |
# Конвертация изображения | |
if isinstance(image, str): | |
# Base64 строка | |
image_data = base64.b64decode(image) | |
image = Image.open(io.BytesIO(image_data)) | |
# 1. Определение типа документа | |
doc_type = await self.classify_document_type(image) | |
if document_type != "auto": | |
doc_type = document_type | |
print(f"🔍 Определен тип документа: {doc_type}") | |
# 2. Выбор стратегии обработки | |
extraction_results = [] | |
# Специальная обработка для таблиц | |
if doc_type == "table" or document_type == "table": | |
print("📊 Обнаружена таблица, запускаем специальную обработку...") | |
table_result = await self.extract_table_data(image) | |
# Подробное логирование результатов обработки таблицы | |
print(f"📋 Результат обработки таблицы: {json.dumps(table_result, indent=2)}") | |
# Добавляем результат в общий список | |
extraction_results.append({ | |
"method": table_result.get("model", "table_recognition"), | |
"data": table_result, | |
"confidence": table_result.get("confidence", 0.9) | |
}) | |
# Для таблиц возвращаем результат сразу | |
# Преобразуем строки таблицы в извлеченные элементы | |
items = [] | |
if "rows" in table_result and table_result["rows"]: | |
for row in table_result["rows"]: | |
item = { | |
"name": row.get("name", ""), | |
"quantity": row.get("quantity", 0), | |
"article": row.get("article", ""), | |
"price": row.get("price", 0), | |
"confidence": 0.9 | |
} | |
items.append(item) | |
# Если строки извлечены успешно, возвращаем результат | |
if items: | |
print(f"✅ Извлечено {len(items)} товаров из таблицы") | |
return { | |
"success": True, | |
"document_type": "table", | |
"extracted_items": items, | |
"suggestions": [], | |
"raw_extractions": extraction_results, | |
"confidence": table_result.get("confidence", 0.9), | |
"processing_time": f"{datetime.now().timestamp():.2f}s", | |
"cost": 0.0 # ВСЕГДА БЕСПЛАТНО! | |
} | |
# Стандартная обработка для других типов документов | |
# TrOCR для печатного текста | |
printed_text = await self.extract_printed_text(image) | |
extraction_results.append({ | |
"method": "trocr_printed", | |
"text": printed_text, | |
"confidence": 0.85 | |
}) | |
# TrOCR для рукописного текста (если нужно) | |
if doc_type in ["handwritten", "mixed"]: | |
handwritten_text = await self.extract_handwritten_text(image) | |
extraction_results.append({ | |
"method": "trocr_handwritten", | |
"text": handwritten_text, | |
"confidence": 0.80 | |
}) | |
# LayoutLM для структурированного понимания | |
if doc_type in ["invoice", "table", "form"]: | |
structured_data = await self.extract_structured_data(image, doc_type) | |
extraction_results.append({ | |
"method": "layoutlm", | |
"data": structured_data, | |
"confidence": 0.90 | |
}) | |
# 3. Объединение и обработка результатов | |
final_result = await self.merge_extraction_results(extraction_results, doc_type) | |
# 4. Парсинг складских команд | |
warehouse_commands = await self.parse_warehouse_commands(final_result) | |
# 5. Генерация предложений | |
suggestions = await self.generate_smart_suggestions(warehouse_commands) | |
self.stats["successful_extractions"] += 1 | |
return { | |
"success": True, | |
"document_type": doc_type, | |
"extracted_items": warehouse_commands, | |
"suggestions": suggestions, | |
"raw_extractions": extraction_results, | |
"confidence": self.calculate_overall_confidence(extraction_results), | |
"processing_time": f"{datetime.now().timestamp():.2f}s", | |
"cost": 0.0 # ВСЕГДА БЕСПЛАТНО! | |
} | |
except Exception as e: | |
print(f"❌ Ошибка обработки документа: {str(e)}") | |
import traceback | |
traceback.print_exc() | |
return { | |
"success": False, | |
"error": str(e), | |
"document_type": "unknown", | |
"extracted_items": [], | |
"suggestions": [], | |
"confidence": 0.0, | |
"cost": 0.0 | |
} | |
async def classify_document_type(self, image): | |
"""Определение типа документа по его содержимому""" | |
try: | |
# Проверяем, не является ли документ таблицей | |
if self._is_likely_table(image): | |
print("📊 Изображение похоже на таблицу на основе анализа структуры") | |
return "table" | |
# Анализируем изображение на наличие рукописного текста | |
# Это приблизительная оценка, можно улучшить | |
# Проверяем наличие печатного текста | |
printed_text = await self.extract_printed_text(image) | |
if printed_text and len(printed_text) > 50: | |
# Проверяем наличие характерных для накладных или счетов ключевых слов | |
invoice_keywords = ["счет", "накладная", "фактура", "товар", "поставщик", "итого", "сумма", "количество"] | |
if any(keyword in printed_text.lower() for keyword in invoice_keywords): | |
return "invoice" | |
# Проверяем наличие ключевых слов для других типов документов | |
form_keywords = ["форма", "заявка", "анкета", "заполните"] | |
if any(keyword in printed_text.lower() for keyword in form_keywords): | |
return "form" | |
# По умолчанию для документов с текстом | |
return "document" | |
# По умолчанию, если не можем определить тип | |
return "unknown" | |
except Exception as e: | |
print(f"⚠️ Ошибка при определении типа документа: {e}") | |
return "unknown" | |
def _is_likely_table(self, image): | |
"""Определение вероятности, что изображение содержит таблицу""" | |
try: | |
# Конвертируем изображение в numpy array | |
import numpy as np | |
from PIL import Image | |
# Если это не PIL Image, конвертируем | |
if not isinstance(image, Image.Image): | |
if isinstance(image, str): | |
# Обработка base64 | |
if image.startswith('data:image'): | |
image_data = base64.b64decode(image.split(',')[1]) | |
image = Image.open(io.BytesIO(image_data)) | |
else: | |
# Обработка пути к файлу | |
image = Image.open(image) | |
else: | |
# Если передан bytes | |
image = Image.open(io.BytesIO(image)) | |
# Конвертируем в grayscale | |
if image.mode != 'L': | |
img_gray = image.convert('L') | |
else: | |
img_gray = image | |
# Преобразуем в numpy array | |
img_array = np.array(img_gray) | |
# 1. Проверка на наличие горизонтальных и вертикальных линий | |
# Используем простой градиентный детектор | |
# Горизонтальный градиент (для вертикальных линий) | |
h_gradient = np.abs(np.diff(img_array, axis=1)) | |
h_lines = np.sum(h_gradient > 50, axis=1) # Подсчет точек с большим градиентом | |
# Вертикальный градиент (для горизонтальных линий) | |
v_gradient = np.abs(np.diff(img_array, axis=0)) | |
v_lines = np.sum(v_gradient > 50, axis=0) # Подсчет точек с большим градиентом | |
# 2. Анализ равномерности распределения линий | |
# Таблицы обычно имеют регулярно расположенные линии | |
# Находим пики в распределении линий (возможные линии таблицы) | |
h_peaks = np.sum(h_lines > np.mean(h_lines) + np.std(h_lines)) | |
v_peaks = np.sum(v_lines > np.mean(v_lines) + np.std(v_lines)) | |
# 3. Проверка на наличие достаточного количества линий | |
min_lines_for_table = 3 # Минимум 3 линии для таблицы | |
# Если обнаружено достаточное количество как горизонтальных, так и вертикальных линий | |
if h_peaks >= min_lines_for_table and v_peaks >= min_lines_for_table: | |
print(f"📊 Обнаружены линии таблицы: {h_peaks} горизонтальных, {v_peaks} вертикальных") | |
return True | |
# 4. Проверка на регулярную структуру (равномерные промежутки между линиями) | |
# Это более сложный анализ, но можно упростить | |
# 5. Соотношение сторон документа - многие таблицы имеют характерные пропорции | |
aspect_ratio = image.width / image.height | |
if 0.7 <= aspect_ratio <= 1.5: # Таблицы часто примерно квадратные или слегка прямоугольные | |
# Дополнительный фактор в пользу таблицы | |
if h_peaks >= min_lines_for_table - 1 or v_peaks >= min_lines_for_table - 1: | |
print(f"📊 Соотношение сторон ({aspect_ratio:.2f}) и линии указывают на таблицу") | |
return True | |
return False | |
except Exception as e: | |
print(f"⚠️ Ошибка при анализе таблицы: {e}") | |
return False | |
async def extract_printed_text(self, image): | |
"""Извлечение печатного текста через TrOCR""" | |
try: | |
pixel_values = self.printed_processor(image, return_tensors="pt").pixel_values | |
generated_ids = self.printed_model.generate(pixel_values) | |
generated_text = self.printed_processor.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
return generated_text | |
except Exception as e: | |
print(f"❌ Ошибка TrOCR печатный: {e}") | |
return "" | |
async def extract_handwritten_text(self, image): | |
"""Извлечение рукописного текста через TrOCR""" | |
try: | |
pixel_values = self.handwritten_processor(image, return_tensors="pt").pixel_values | |
generated_ids = self.handwritten_model.generate(pixel_values) | |
generated_text = self.handwritten_processor.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
return generated_text | |
except Exception as e: | |
print(f"❌ Ошибка TrOCR рукописный: {e}") | |
return "" | |
async def extract_structured_data(self, image, doc_type): | |
"""Структурированное понимание документа через LayoutLM""" | |
try: | |
# Определяем вопросы на основе типа документа | |
questions = self.get_document_questions(doc_type) | |
results = {} | |
for question in questions: | |
try: | |
result = self.document_qa(image=image, question=question) | |
results[question] = result["answer"] | |
except: | |
results[question] = "" | |
return results | |
except Exception as e: | |
print(f"❌ Ошибка LayoutLM: {e}") | |
return {} | |
async def extract_table_data(self, image): | |
"""Извлечение табличных данных через специализированные модели""" | |
try: | |
# Проверка наличия модели Surya Table | |
if hasattr(self, 'surya_table_available') and self.surya_table_available: | |
try: | |
# Попытка использования Surya Table для структурированного распознавания таблиц | |
print("🔍 Используем Surya Table для структурированного распознавания таблицы...") | |
# Преобразуем PIL Image в формат, необходимый для модели | |
if isinstance(image, str): | |
# Если передан путь или base64 | |
if image.startswith('data:image'): | |
# Обработка base64 | |
image_data = base64.b64decode(image.split(',')[1]) | |
pil_image = Image.open(io.BytesIO(image_data)) | |
else: | |
# Обработка пути к файлу | |
pil_image = Image.open(image) | |
elif isinstance(image, Image.Image): | |
pil_image = image | |
else: | |
# Если передан bytes | |
pil_image = Image.open(io.BytesIO(image)) | |
# Предобработка изображения для улучшения распознавания | |
pil_image = self._preprocess_image_for_tables(pil_image) | |
# Распознаем таблицу через Surya Table | |
table_result = self.surya_table_model([pil_image]) | |
# Выводим подробную информацию о результате для отладки | |
print(f"📋 Результат Surya Table: {table_result}") | |
# Подробная отладочная информация | |
if isinstance(table_result, list): | |
print(f"📊 Длина результата: {len(table_result)}") | |
if len(table_result) > 0: | |
print(f"📊 Тип первого элемента: {type(table_result[0])}") | |
if isinstance(table_result[0], dict): | |
print(f"📊 Ключи первого элемента: {table_result[0].keys()}") | |
# Преобразуем результат в структурированный формат | |
try: | |
structured_rows = [] | |
# Результат может быть в разных форматах | |
if isinstance(table_result, list) and len(table_result) > 0: | |
if isinstance(table_result[0], dict): | |
if 'cells' in table_result[0]: | |
structured_rows = self._parse_surya_table(table_result[0]) | |
print(f"✅ Извлечено {len(structured_rows)} строк из таблицы") | |
# Если у нас есть строки, возвращаем результат | |
if structured_rows: | |
return { | |
"success": True, | |
"type": "table", | |
"model": "surya_table", | |
"rows": structured_rows, | |
"raw_text": self._extract_text_from_rows(structured_rows), | |
"confidence": 0.95 | |
} | |
else: | |
print("⚠️ Surya Table не смогла извлечь строки таблицы") | |
except Exception as parse_error: | |
print(f"⚠️ Ошибка парсинга результата Surya Table: {parse_error}") | |
# Продолжаем с запасным вариантом | |
except Exception as surya_error: | |
print(f"⚠️ Ошибка Surya Table, используем запасной вариант: {surya_error}") | |
# Запасной вариант: Table Transformer для определения местоположения таблиц | |
print("🔍 Используем Table Transformer для определения местоположения таблиц...") | |
table_detection = self.table_detector(image) | |
# Если обнаружены таблицы, используем TrOCR для извлечения текста из них | |
table_data = [] | |
for detection in table_detection: | |
if detection["label"] == "table" and detection["score"] > 0.7: | |
# Вырезаем область таблицы | |
table_data.append({ | |
"box": detection["box"], | |
"score": detection["score"], | |
"type": "table" | |
}) | |
# Если таблицы обнаружены, возвращаем данные о них | |
if table_data: | |
return { | |
"success": True, | |
"type": "table", | |
"model": "table_transformer", | |
"rows": [], # Пока нет извлеченных строк | |
"detection": table_data, | |
"confidence": 0.75 | |
} | |
# Если таблицы не обнаружены, возвращаем пустой результат | |
return { | |
"success": True, | |
"type": "table", | |
"model": "fallback", | |
"rows": [], | |
"confidence": 0.5 | |
} | |
except Exception as e: | |
print(f"❌ Ошибка распознавания таблицы: {e}") | |
return { | |
"success": False, | |
"error": str(e), | |
"type": "table", | |
"model": "error", | |
"rows": [], | |
"confidence": 0.0 | |
} | |
def _preprocess_image_for_tables(self, image): | |
"""Предобработка изображения для улучшения распознавания таблиц""" | |
try: | |
# Конвертируем в RGB если нужно | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
# Применяем повышение контраста для лучшего распознавания линий таблицы | |
import numpy as np | |
from PIL import ImageEnhance | |
# Увеличиваем контраст | |
enhancer = ImageEnhance.Contrast(image) | |
image = enhancer.enhance(1.5) # Увеличиваем контраст на 50% | |
# Увеличиваем резкость | |
enhancer = ImageEnhance.Sharpness(image) | |
image = enhancer.enhance(1.5) # Увеличиваем резкость на 50% | |
return image | |
except Exception as e: | |
print(f"⚠️ Ошибка предобработки изображения: {e}") | |
return image | |
def _parse_surya_table(self, surya_result): | |
"""Парсинг результата Surya в структурированные данные""" | |
rows = [] | |
headers = {} | |
# Печатаем полную структуру результата для отладки | |
print(f"🔍 Анализ структуры Surya результата: {surya_result.keys()}") | |
# Проверяем наличие ключа cells | |
if 'cells' not in surya_result: | |
print("⚠️ В результате Surya нет ключа 'cells'") | |
return rows | |
cells = surya_result.get('cells', []) | |
print(f"📊 Количество ячеек: {len(cells)}") | |
if len(cells) > 0: | |
# Печатаем структуру первой ячейки для отладки | |
print(f"📊 Пример ячейки: {cells[0]}") | |
# First pass: get headers | |
for cell in cells: | |
if cell.get('is_header', False): | |
col_id = cell.get('col_id', 0) | |
header_text = cell.get('text', '').lower() | |
headers[col_id] = header_text | |
print(f"📋 Найден заголовок: col_id={col_id}, text={header_text}") | |
# Если заголовки не найдены, используем первую строку как заголовки | |
if not headers: | |
# Находим все ячейки в первой строке | |
first_row_cells = [c for c in cells if c.get('row_id', 0) == 0] | |
for cell in first_row_cells: | |
col_id = cell.get('col_id', 0) | |
header_text = cell.get('text', '').lower() | |
headers[col_id] = header_text | |
print(f"📋 Используем первую строку как заголовок: col_id={col_id}, text={header_text}") | |
# Second pass: get rows | |
row_dict = {} | |
for cell in cells: | |
row_id = cell.get('row_id', 0) | |
# Пропускаем строки заголовков, если они есть | |
if headers and cell.get('is_header', False): | |
continue | |
if row_id not in row_dict: | |
row_dict[row_id] = {} | |
col_id = cell.get('col_id', 0) | |
header = headers.get(col_id, str(col_id)) | |
value = cell.get('text', '') | |
# Логируем обработку ячеек | |
print(f"📊 Обработка ячейки: row_id={row_id}, col_id={col_id}, header={header}, value={value}") | |
# Преобразуем заголовки к стандартным полям | |
if any(keyword in header for keyword in ['товар', 'название', 'наимен']): | |
row_dict[row_id]['name'] = value | |
elif any(keyword in header for keyword in ['кол', 'шт', 'количество']): | |
try: | |
# Извлекаем числовое значение | |
quantity = re.search(r'(\d+(?:\.\d+)?)', value) | |
if quantity: | |
row_dict[row_id]['quantity'] = float(quantity.group(1)) | |
else: | |
row_dict[row_id]['quantity'] = value | |
except: | |
row_dict[row_id]['quantity'] = value | |
elif any(keyword in header for keyword in ['арт', 'артикул']): | |
row_dict[row_id]['article'] = value | |
elif any(keyword in header for keyword in ['цен', 'стоим', 'сумм']): | |
# Извлекаем числовое значение цены | |
price = re.search(r'(\d+(?:\.\d+)?)', value) | |
if price: | |
row_dict[row_id]['price'] = float(price.group(1)) | |
else: | |
row_dict[row_id]['price'] = value | |
else: | |
# Для прочих колонок используем оригинальное название | |
row_dict[row_id][header] = value | |
# Собираем все строки | |
rows = list(row_dict.values()) | |
# Отладочная информация о результатах | |
print(f"📊 Извлечено {len(rows)} строк из таблицы") | |
if rows: | |
print(f"📊 Пример первой строки: {rows[0]}") | |
return rows | |
def _extract_text_from_rows(self, rows): | |
"""Извлечение текстового представления из структурированных строк таблицы""" | |
text = "" | |
for row in rows: | |
row_text = " ".join([f"{k}: {v}" for k, v in row.items()]) | |
text += row_text + "\n" | |
return text | |
def _extract_columns(self, line): | |
"""Извлечение колонок из строки таблицы""" | |
# Простое разделение по табуляции или нескольким пробелам | |
return re.split(r'\t| +', line.strip()) | |
async def merge_extraction_results(self, extraction_results, doc_type): | |
"""Объединение результатов разных AI методов""" | |
merged_text = "" | |
structured_data = {} | |
for result in extraction_results: | |
if "text" in result: | |
merged_text += f"{result['text']} " | |
if "data" in result and isinstance(result["data"], dict): | |
structured_data.update(result["data"]) | |
return { | |
"combined_text": merged_text.strip(), | |
"structured_data": structured_data, | |
"document_type": doc_type | |
} | |
async def parse_warehouse_commands(self, extraction_result): | |
"""Парсинг складских команд из извлеченного текста""" | |
text = extraction_result.get("combined_text", "") | |
# Используем NER для извлечения сущностей | |
try: | |
entities = self.ner_pipeline(text) | |
except: | |
entities = [] | |
# Регулярные выражения для складских данных | |
warehouse_items = [] | |
# Поиск артикулов (F186, ST9, F186ST9) | |
article_pattern = r'\b(?:F\d+(?:ST\d+)?|ST\d+)\b' | |
articles = re.findall(article_pattern, text, re.IGNORECASE) | |
# Поиск количеств | |
quantity_pattern = r'\b(\d+)\s*(?:шт|лист|листов|кг|м2|м²)\b' | |
quantities = re.findall(quantity_pattern, text, re.IGNORECASE) | |
# Поиск цен | |
price_pattern = r'\b(\d+(?:\.\d+)?)\s*(?:руб|₽|р)\b' | |
prices = re.findall(price_pattern, text, re.IGNORECASE) | |
# Объединение найденных данных | |
max_items = max(len(articles), len(quantities), 1) | |
for i in range(max_items): | |
item = { | |
"article": articles[i] if i < len(articles) else "", | |
"quantity": int(quantities[i]) if i < len(quantities) else 0, | |
"price": float(prices[i]) if i < len(prices) else 0.0, | |
"name": self.extract_product_name(text, i), | |
"confidence": 0.8 | |
} | |
if item["article"] or item["quantity"] > 0: | |
warehouse_items.append(item) | |
return warehouse_items | |
def extract_product_name(self, text, index=0): | |
"""Извлечение названия товара из текста""" | |
# Простая эвристика для извлечения названий | |
words = text.split() | |
# Ищем слова после артикулов или количеств | |
product_keywords = ["лдсп", "мдф", "фанера", "дуб", "бук", "ясень", "орех", "чикаго"] | |
for word in words: | |
if any(keyword in word.lower() for keyword in product_keywords): | |
return word.title() | |
return "Товар" | |
async def generate_smart_suggestions(self, warehouse_items): | |
"""Генерация умных предложений""" | |
suggestions = [] | |
for item in warehouse_items: | |
if not item["article"]: | |
suggestions.append({ | |
"type": "missing_article", | |
"message": f"Не найден артикул для товара '{item['name']}'", | |
"action": "manual_input", | |
"priority": "high" | |
}) | |
if item["quantity"] == 0: | |
suggestions.append({ | |
"type": "missing_quantity", | |
"message": f"Не найдено количество для '{item['article'] or item['name']}'", | |
"action": "manual_input", | |
"priority": "medium" | |
}) | |
if item["price"] == 0: | |
suggestions.append({ | |
"type": "missing_price", | |
"message": f"Не найдена цена для '{item['article'] or item['name']}'", | |
"action": "suggest_price", | |
"priority": "low" | |
}) | |
return suggestions | |
def calculate_overall_confidence(self, extraction_results): | |
"""Расчет общей уверенности""" | |
if not extraction_results: | |
return 0.0 | |
total_confidence = sum(result.get("confidence", 0) for result in extraction_results) | |
return round(total_confidence / len(extraction_results), 2) | |
def get_stats(self): | |
"""Статистика работы микросервиса""" | |
return { | |
"quota_used": f"{self.stats['total_requests']}/20000", | |
"uptime_hours": (datetime.now() - self.stats["start_time"]).total_seconds() / 3600, | |
"models_loaded": ["TrOCR", "LayoutLM", "TableTransformer", "RuBERT-NER", "SuryaTable"], | |
"success_rate": self._calculate_success_rate() | |
} | |
def _calculate_success_rate(self): | |
"""Расчет успешного процента""" | |
if self.stats["total_requests"] == 0: | |
return 0.0 | |
return round(self.stats["successful_extractions"] / self.stats["total_requests"] * 100, 1) | |
# Инициализация AI | |
ai_orchestrator = FreeAIOrchestrator() | |
# Gradio интерфейс | |
def process_warehouse_document(image, document_type): | |
"""Обработка складского документа через Gradio""" | |
try: | |
import asyncio | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
result = loop.run_until_complete( | |
ai_orchestrator.extract_warehouse_data(image, document_type) | |
) | |
return json.dumps(result, ensure_ascii=False, indent=2) | |
except Exception as e: | |
return json.dumps({ | |
"success": False, | |
"error": f"Ошибка обработки: {str(e)}", | |
"cost": 0.0 | |
}, ensure_ascii=False, indent=2) | |
def get_service_stats(): | |
"""Получение статистики сервиса""" | |
stats = ai_orchestrator.get_stats() | |
return json.dumps(stats, ensure_ascii=False, indent=2) | |
# Gradio интерфейс | |
with gr.Blocks(title="SkladBot Free AI") as app: | |
gr.Markdown("# 🤖 SkladBot Free AI Microservice") | |
gr.Markdown("**БЕСПЛАТНАЯ** обработка складских документов через AI") | |
with gr.Tab("Обработка документов"): | |
image_input = gr.Image(type="pil", label="Загрузите изображение документа") | |
doc_type = gr.Dropdown( | |
choices=["auto", "invoice", "table", "form", "handwritten"], | |
value="auto", | |
label="Тип документа" | |
) | |
process_btn = gr.Button("🔍 Обработать документ", variant="primary") | |
result_output = gr.Textbox( | |
label="Результат обработки", | |
lines=20, | |
max_lines=30 | |
) | |
process_btn.click( | |
process_warehouse_document, | |
inputs=[image_input, doc_type], | |
outputs=result_output | |
) | |
with gr.Tab("Статистика"): | |
stats_btn = gr.Button("📊 Обновить статистику") | |
stats_output = gr.Textbox( | |
label="Статистика сервиса", | |
lines=10 | |
) | |
stats_btn.click( | |
get_service_stats, | |
outputs=stats_output | |
) | |
gr.Markdown("---") | |
gr.Markdown("💰 **Стоимость**: $0 (100% бесплатно)") | |
gr.Markdown("📊 **Лимит**: 20,000 запросов/месяц") | |
gr.Markdown("🧠 **AI модели**: TrOCR, LayoutLM, Table Transformer, RuBERT, SuryaTable") | |
if __name__ == "__main__": | |
app.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=True, | |
show_error=True | |
) | |