|
from PIL import Image, ImageFilter |
|
import cv2 |
|
import pytesseract |
|
from pytesseract import Output |
|
from os import listdir |
|
from os.path import isfile, join |
|
import numpy as np |
|
import json |
|
import matplotlib.pyplot as plt |
|
from pdf2image import convert_from_path |
|
from matplotlib import pyplot as plt |
|
import re |
|
|
|
def processFiles(pdfs, verbose = False) : |
|
images_per_pdf_2d = [convert_from_path(file) for file in pdfs] |
|
|
|
images_per_pdf = [] |
|
docfilenames = [] |
|
pagenames = [] |
|
fileindices = [] |
|
for i in range(len(images_per_pdf_2d)) : |
|
docfilenames.append(pdfs[i][:-4]) |
|
pageindices = [] |
|
for j in range(len(images_per_pdf_2d[i])) : |
|
images_per_pdf.append(images_per_pdf_2d[i][j]) |
|
pagenames.append(pdfs[i][:-4] + '_page_' + str(j)) |
|
pageindices.append(len(pagenames) - 1) |
|
|
|
|
|
fileindices.append(pageindices) |
|
|
|
gray_images_per_pdf_cropped = [] |
|
for i in range(len(images_per_pdf)) : |
|
image = images_per_pdf[i] |
|
crop = image.convert("L").crop(( |
|
750, 150, |
|
1654, 850 |
|
)) |
|
gray_images_per_pdf_cropped.append(crop) |
|
|
|
texts = [pytesseract.image_to_string(image, lang='rus') for image in gray_images_per_pdf_cropped] |
|
fulltexts = [pytesseract.image_to_string(image, lang='rus') for image in images_per_pdf] |
|
|
|
cropped_images = gray_images_per_pdf_cropped |
|
init_size = cropped_images[0].size |
|
thresh_imgs = [ |
|
image.resize( |
|
(init_size[0] //4, init_size[1] // 4) |
|
).point( |
|
lambda x: 0 if x < 220 else 255 |
|
).filter( |
|
ImageFilter.MedianFilter(5) |
|
).filter( |
|
ImageFilter.MinFilter(15) |
|
) for i,(name,image) in enumerate(zip(pagenames, cropped_images)) |
|
] |
|
|
|
masks = thresh_imgs |
|
masks_arr = [np.array(img) for img in masks] |
|
mask_shape = masks_arr[0].shape |
|
|
|
str_size = 7 |
|
masks = [] |
|
masks_bw = [] |
|
for name, mask in zip(pagenames, masks_arr): |
|
cleaned_mask = mask.copy() |
|
|
|
for iter in range(mask_shape[0] // str_size): |
|
temp_mean = int(cleaned_mask[iter*str_size : iter*str_size + str_size, :].mean()) |
|
|
|
if (temp_mean < 49) or (temp_mean > 160): |
|
cleaned_mask[iter*str_size : iter*str_size + str_size, :] = 255 |
|
|
|
vertical_threshold = 200 |
|
|
|
for i in range(mask_shape[1] // str_size + 1): |
|
if (i*str_size + str_size) > mask_shape[1]: |
|
temp_mean_vertical = int(cleaned_mask[:, i*str_size : mask_shape[1]].mean()) |
|
|
|
if temp_mean_vertical > vertical_threshold: |
|
cleaned_mask[:, i*str_size : mask_shape[1]] = 255 |
|
else: |
|
temp_mean_vertical = int(cleaned_mask[:, i*str_size : i*str_size + str_size].mean()) |
|
|
|
if temp_mean_vertical > vertical_threshold: |
|
cleaned_mask[:, i*str_size : i*str_size + str_size] = 255 |
|
|
|
image = Image.fromarray(cleaned_mask).filter( |
|
ImageFilter.MedianFilter(13) |
|
).filter( |
|
ImageFilter.MinFilter(25) |
|
) |
|
masks.append(image) |
|
masks_bw.append(image.convert('1')) |
|
|
|
masks_bw_arr = [np.array(img) for img in masks_bw] |
|
|
|
|
|
|
|
addressexists = [bool((~mask_bw).sum()) for mask_bw in masks_bw_arr] |
|
|
|
|
|
|
|
CBnames = [ |
|
'цб рф', |
|
'центральный банк', |
|
'центрального банка', |
|
'банк россии', |
|
'банка россии', |
|
] |
|
|
|
|
|
|
|
toCB = [] |
|
for i in range(len(addressexists)) : |
|
iftoCB = False |
|
for j in range(len(CBnames)) : |
|
if addressexists[i] and CBnames[j] in texts[i].lower() : |
|
iftoCB = True |
|
break |
|
|
|
toCB.append(iftoCB) |
|
|
|
|
|
|
|
docindices = [] |
|
doctypes = [] |
|
for i in range(len(fileindices)) : |
|
docs = [] |
|
types = [] |
|
pages = [] |
|
doctype = False |
|
for j in range(len(fileindices[i])) : |
|
index = fileindices[i][j] |
|
ifaddress = addressexists[index] |
|
iftoCB = toCB[index] |
|
if ifaddress : |
|
if len(pages) > 0 : |
|
docs.append(pages) |
|
types.append(doctype) |
|
|
|
pages = [] |
|
doctype = iftoCB |
|
|
|
pages.append(index) |
|
|
|
docs.append(pages) |
|
types.append(doctype) |
|
docindices.append(docs) |
|
doctypes.append(types) |
|
|
|
cropped = cropped_images |
|
orig_size = cropped[0].size |
|
masks = [mask.convert('L').resize((orig_size)) for mask in masks] |
|
|
|
if verbose : |
|
for i in range(len(masks)) : |
|
img = np.array(masks[i]) |
|
out = np.array(cropped[i]) |
|
|
|
bw = cv2.inRange(img, 0, 12) |
|
contours, hierarchy = cv2.findContours(bw, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) |
|
|
|
aaa = cv2.drawContours(out, contours, -1, (0, 255, 0), 5, cv2.LINE_AA, hierarchy, 1) |
|
|
|
print() |
|
print(pagenames[i]) |
|
print('Address exists :', addressexists[i]) |
|
print('To CB :', toCB[i]) |
|
|
|
|
|
|
|
|
|
plt.imshow(Image.fromarray(aaa)) |
|
plt.show() |
|
|
|
|
|
|
|
docs_info = [] |
|
for i in range(len(docindices)) : |
|
docs = [] |
|
if verbose : |
|
print('File =', docfilenames[i]) |
|
|
|
for j in range(len(docindices[i])) : |
|
doc = {} |
|
doctype = 'Сопроводительное письмо' |
|
if doctypes[i][j] : |
|
doctype = 'Обращение' |
|
|
|
doc['Тип документа'] = doctype |
|
text = '' |
|
if verbose : |
|
print('Doc =', j, 'Type =', doctype) |
|
|
|
for k in range(len(docindices[i][j])) : |
|
index = docindices[i][j][k] |
|
text += fulltexts[index] |
|
if verbose : |
|
print('Page =', pagenames[index]) |
|
print(fulltexts[index]) |
|
print('--- end of page ---') |
|
print() |
|
|
|
text = re.sub(r'\n +', r'\n', text) |
|
text = re.sub(r'\n+', r'\n', text) |
|
doc['Текст документа'] = text |
|
docs.append(doc) |
|
|
|
docs_info.append(docs) |
|
|
|
for i in range(len(docindices)) : |
|
for j in range(len(docindices[i])) : |
|
for k in range(len(docindices[i][j])) : |
|
index = docindices[i][j][k] |
|
if toCB[index] : |
|
if verbose : |
|
print('Page =', pagenames[index]) |
|
print(texts[index].strip()) |
|
print('------------------------') |
|
print() |
|
|
|
return docs_info |
|
|
|
def processSingleFile(file) : |
|
return processFiles([file]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|