Spaces:
Running
Running
import json | |
import os | |
from pathlib import Path | |
import shutil | |
import warnings | |
from PIL import Image | |
from dawsonia import io | |
from dawsonia import digitize | |
from dawsonia.ml import ml | |
import gradio as gr | |
import numpy as np | |
from numpy.typing import NDArray | |
import pandas as pd | |
import pooch | |
from .visualizer_functions import Page, TableCell | |
# Max number of images a user can upload at once | |
MAX_IMAGES = int(os.environ.get("MAX_IMAGES", 5)) | |
# Setup the cache directory to point to the directory where the example images | |
# are located. The images must lay in the cache directory because otherwise they | |
# have to be reuploaded when drag-and-dropped to the input image widget. | |
GRADIO_CACHE = os.getenv("GRADIO_CACHE_DIR", ".gradio_cache") | |
DATA_CACHE = os.path.join(GRADIO_CACHE, "data") | |
EXAMPLES_DIRECTORY = os.path.join(os.getcwd(), "examples") | |
# Example books | |
PIPELINES: dict[str, dict[str, str]] = { | |
"bjuröklubb": dict( | |
url="https://git.smhi.se/ai-for-obs/data/-/raw/688c04f13e8e946962792fe4b4e0ded98800b154/raw_zarr/BJUR%C3%96KLUBB/DAGBOK_Bjur%C3%B6klubb_Station_Jan-Dec_1928.zarr.zip", | |
known_hash="sha256:6d87b7f79836ae6373cfab11260fe28787d93fe16199fefede6697ccd750f71a", | |
), | |
"härnösand": dict( | |
url="https://git.smhi.se/ai-for-obs/data/-/raw/688c04f13e8e946962792fe4b4e0ded98800b154/raw_zarr/H%C3%84RN%C3%96SAND/DAGBOK_H%C3%A4rn%C3%B6sand_Station_1934.zarr.zip", | |
known_hash="sha256:a58fdb6521214d0bd569c9325ce78d696738de28ce6ec869cde0d46616b697f2", | |
), | |
} | |
def run_dawsonia( | |
table_fmt_config_override, | |
first_page, | |
last_page, | |
prob_thresh, | |
book: io.Book, | |
book_path, | |
gallery, | |
progress=gr.Progress(), | |
): | |
if book is None: | |
raise ValueError("You need to select / upload the pages to digitize") | |
progress(0, desc="Dawsonia: starting") | |
model_path = Path("data/models/dawsonia/2024-07-02") | |
output_path = Path("output") | |
output_path.mkdir(exist_ok=True) | |
print("Dawsonia: digitizing", book) | |
table_fmt = book.table_format | |
final_output_path_book = output_path / book.station_name | |
output_path_book = Path(book_path).parent / "output" | |
output_path_book.mkdir(exist_ok=True, parents=True) | |
(output_path_book / "probablities").mkdir(exist_ok=True) | |
init_data: list[dict[str, NDArray]] = [ | |
{ | |
key: np.empty(len(table_fmt.rows), dtype="O") | |
for key in table_fmt.columns[table_idx] | |
} | |
for table_idx in table_fmt.preproc.idx_tables_size_verify | |
] | |
collection = [] | |
images = [] | |
with warnings.catch_warnings(): | |
warnings.simplefilter("ignore", FutureWarning) | |
for page_number in range(first_page, last_page): | |
output_path_page = output_path_book / str(page_number) | |
gr.Info(f"Digitizing {page_number = }") | |
if ( | |
not (output_path_book / str(page_number)) | |
.with_suffix(".parquet") | |
.exists() | |
): | |
digitize.digitize_page_and_write_output( | |
book, | |
init_data, | |
page_number=page_number, | |
date_str=f"0000-page-{page_number}", | |
model_path=model_path, | |
model_predict=ml.model_predict, | |
prob_thresh=prob_thresh, | |
output_path_page=output_path_page, | |
output_text_fmt=False, | |
debug=False, | |
) | |
_synctree(output_path_book, final_output_path_book) | |
progress_value = (page_number - first_page) / max(1, last_page - first_page) | |
# if final_output_path_book.exists(): | |
# shutil.rmtree(final_output_path_book) | |
# shutil.copytree(output_path_book, final_output_path_book) | |
for page_number, im_from_gallery in zip(range(first_page, last_page), gallery): | |
if results := read_page( | |
final_output_path_book, | |
str(page_number), | |
prob_thresh, | |
progress, | |
1.0, | |
table_fmt.preproc.idx_tables_size_verify, | |
): # , im_from_gallery[0]) | |
page, im = results | |
collection.append(page) | |
images.append(im) | |
yield collection, gr.skip() | |
else: | |
gr.Info(f"No tables detected in {page_number = }") | |
gr.Info("Pages were succesfully digitized ✨") | |
# yield collection, images | |
yield collection, gr.skip() | |
def _synctree(source_dir, dest_dir): | |
source_dir = Path(source_dir) | |
dest_dir = Path(dest_dir) | |
if not dest_dir.exists(): | |
dest_dir.mkdir(parents=True) | |
for root, _, files in os.walk(source_dir): | |
root = Path(root) | |
relative_root = root.relative_to(source_dir) | |
# Create subdirectories in the destination directory | |
dest_subdir_path = dest_dir / relative_root | |
if not dest_subdir_path.exists(): | |
dest_subdir_path.mkdir(parents=True, exist_ok=True) | |
for file_ in files: | |
source_file_path = root / file_ | |
dest_file_path = dest_subdir_path / file_ | |
# Copy only if the file does not already exist or is newer | |
if ( | |
not dest_file_path.exists() | |
or (source_file_path.stat().st_mtime - dest_file_path.stat().st_mtime) > 0 | |
): | |
shutil.copy2(source_file_path, dest_file_path) | |
def read_page( | |
output_path_book: Path, | |
prefix: str, | |
prob_thresh: float, | |
progress, | |
progress_value, | |
idx_tables_size_verify: list[int], | |
im_path_from_gallery: str = "", | |
): | |
stats = digitize.Statistics.from_json( | |
(output_path_book / "statistics" / prefix).with_suffix(".json") | |
) | |
print(stats) | |
progress(progress_value, desc=f"Dawsonia: {stats!s:.50}") | |
if stats.tables_detected > 0: | |
values_df = pd.read_parquet((output_path_book / prefix).with_suffix(".parquet")) | |
prob_df = pd.read_parquet( | |
(output_path_book / "probablities" / prefix).with_suffix(".parquet") | |
) | |
table_meta = json.loads( | |
(output_path_book / "table_meta" / prefix).with_suffix(".json").read_text() | |
) | |
with Image.open( | |
image_path := (output_path_book / "pages" / prefix).with_suffix(".webp") | |
) as im: | |
width = im.width | |
height = im.height | |
values_array = values_df.values.flatten() | |
prob_array = prob_df.values.flatten() | |
# FIXME: hardcoded to get upto 2 tables. Use idx_tables_size_verify and reconstruct bbox_array | |
try: | |
bbox_array = np.hstack(table_meta["table_positions"][:2]).reshape(-1, 4) | |
except ValueError: | |
bbox_array = np.reshape(table_meta["table_positions"][0], (-1, 4)) | |
cells = [ | |
make_cell(value, bbox) | |
for value, prob, bbox in zip(values_array, prob_array, bbox_array) | |
if prob > prob_thresh | |
] | |
return Page(width, height, cells, im_path_from_gallery or str(image_path)), im | |
def make_cell(value: str, bbox: NDArray[np.int64]): | |
y, x, h, w = bbox | |
xmin, ymin = x - w // 2, y - h // 2 | |
xmax, ymax = x + w // 2, y + h // 2 | |
polygon = (xmin, ymin), (xmax, ymin), (xmax, ymax), (xmin, ymax), (xmin, ymin) | |
return TableCell(polygon, text_x=x - w // 4, text_y=y, text=value) | |
def all_example_images() -> list[str]: | |
""" | |
Get paths to all example images. | |
""" | |
examples = [ | |
os.path.join(EXAMPLES_DIRECTORY, f"{pipeline}.png") for pipeline in PIPELINES | |
] | |
return examples | |
def get_selected_example_image( | |
first_page, last_page, event: gr.SelectData | |
) -> tuple[list[Image.Image], io.Book, str, str, str] | None: | |
""" | |
Get the name of the pipeline that corresponds to the selected image. | |
""" | |
orig_name = event.value["image"]["orig_name"] | |
# for name, details in PIPELINES.items(): | |
orig_path = Path(orig_name) | |
name = orig_path.name | |
for suffix in orig_path.suffixes[::-1]: | |
name = name.removesuffix(suffix) | |
station_tf = Path("table_formats", name).with_suffix(".toml") | |
if (last_page - first_page) > MAX_IMAGES: | |
error = f"Maximum images you can digitize is set to: {MAX_IMAGES}" | |
gr.Warning(error) | |
raise ValueError(error) | |
if name in PIPELINES: | |
book_path = pooch.retrieve(**PIPELINES[name], path=DATA_CACHE) | |
first, last, book = io.read_book(book_path) | |
book._name = name | |
book.size_cell = [1.0, 1.0, 1.0, 1.0] | |
return ( | |
[book.read_image(pg) for pg in range(first_page, last_page)], | |
book, | |
book_path, | |
station_tf.name, | |
station_tf.read_text(), | |
) | |
def move_uploaded_file(uploaded, table_fmt_filename): | |
current_directory = Path(uploaded).parent | |
# Define the target directory where you want to save the uploaded files | |
target_directory = current_directory / table_fmt_filename.removesuffix(".toml") | |
os.makedirs(target_directory, exist_ok=True) | |
# Move the uploaded file to the target directory | |
true_path = Path(target_directory / Path(uploaded).name) | |
# if true_path.exists(): | |
# true_path.unlink() | |
shutil.copy2(uploaded, true_path) | |
print(f"Copy created", true_path) | |
return str(true_path) | |
def get_uploaded_image( | |
first_page: int, last_page: int, table_fmt_filename: str, filename: str | |
) -> tuple[list[NDArray], io.Book, str, str] | None: | |
orig_path = Path(filename) | |
name = orig_path.name | |
for suffix in orig_path.suffixes[::-1]: | |
name = name.removesuffix(suffix) | |
station_tf = Path("table_formats", table_fmt_filename) | |
if not station_tf.exists(): | |
station_tf = Path("table_formats", "bjuröklubb.toml") | |
first, last, book = io.read_book(Path(filename)) | |
book._name = name | |
book.size_cell = [1.0, 1.0, 1.0, 1.0] | |
return ( | |
[book.read_page(pg) for pg in range(first_page, last_page)], | |
book, | |
filename, | |
station_tf.read_text(), | |
) | |
def overwrite_table_format_file(book: io.Book, book_path, table_fmt: str): | |
name = book.station_name | |
table_fmt_dir = Path("table_formats") | |
(table_fmt_dir / name).with_suffix(".toml").write_text(table_fmt) | |
book.table_format = io.read_specific_table_format(table_fmt_dir, Path(book_path)) | |
gr.Info(f"Overwritten table format file for {name}") | |
return book | |