|
import shutil |
|
from custom_logger import logger_config |
|
import os |
|
from pathlib import Path |
|
|
|
class MainBase: |
|
def __init__(self): |
|
self.max_file_size = 10 * 1024 * 1024 |
|
|
|
def upload_validate(self, image): |
|
|
|
image.file.seek(0) |
|
contents = image.file.read() |
|
|
|
image.file.seek(0) |
|
|
|
if len(contents) > self.max_file_size: |
|
raise ValueError(f"{image.filename} is too large. Max size: {self.max_file_size // (1024*1024)}MB.") |
|
|
|
def upload(self, id, image): |
|
self.upload_validate(image) |
|
|
|
file_path = os.path.join(self.input_dir, id) |
|
|
|
with open(file_path, "wb") as buffer: |
|
shutil.copyfileobj(image.file, buffer) |
|
|
|
logger_config.success(f"{image.filename} uploaded successfully to {file_path}") |
|
|
|
def _generate_output_path(self, output_format=None) -> str: |
|
if output_format: |
|
new_ext = self.supported_formats.get(output_format, output_format).lower() |
|
name_without_ext = os.path.splitext(self.input_file_name)[0] |
|
return f"{self.output_dir}/{name_without_ext}.{new_ext}" |
|
return f"{self.output_dir}/{self.input_file_name}" |
|
|
|
def download_url(self, file_id): |
|
self.delete([file_id], only_input=True) |
|
|
|
file_path = f'{self.output_dir}/{file_id}' |
|
if os.path.exists(file_path): |
|
return file_path |
|
raise ValueError("File not found") |
|
|
|
def delete(self, ids, only_input=False): |
|
for file_id in ids: |
|
name_without_ext = os.path.splitext(file_id)[0] |
|
for filename in os.listdir(self.input_dir): |
|
if name_without_ext in filename: |
|
file_path = os.path.join(self.input_dir, filename) |
|
path_obj = Path(file_path) |
|
path_obj.unlink(missing_ok=True) |
|
|
|
if not only_input: |
|
for filename in os.listdir(self.output_dir): |
|
if name_without_ext in filename: |
|
file_path = os.path.join(self.output_dir, filename) |
|
path_obj = Path(file_path) |
|
path_obj.unlink(missing_ok=True) |