| import os |
| import math |
| import hashlib |
| import numpy as np |
| import cv2 |
| from PIL import Image |
| from tqdm import tqdm |
| import gradio as gr |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
| |
| PADDING_TOKEN = b"<PAD>" |
| CHUNK_SIZE = 1000 * 1024 |
| FRAMES_DIR = './frames' |
| WIDTH, HEIGHT = 1920, 1080 |
| REQUIRED_LENGTH = WIDTH * HEIGHT |
| VIDEO_PATH = './output_video.mp4' |
|
|
| |
| if not os.path.exists(FRAMES_DIR): |
| os.makedirs(FRAMES_DIR) |
|
|
| def encode(file_path): |
| """Encode a file by splitting it into chunks, converting those chunks to images, and creating a video.""" |
| if not os.path.exists('./parts'): |
| os.makedirs('./parts') |
| file_size = os.path.getsize(file_path) |
| num_chunks = math.ceil(file_size / CHUNK_SIZE) |
| |
| with open(file_path, 'rb') as f: |
| for i in tqdm(range(num_chunks), desc="Splitting file", unit="chunk"): |
| chunk = f.read(CHUNK_SIZE) |
| |
| if len(chunk) < CHUNK_SIZE: |
| padding_size = CHUNK_SIZE - len(chunk) |
| padding = np.full(padding_size, PADDING_TOKEN, dtype='S1') |
| chunk = np.concatenate((np.frombuffer(chunk, dtype='S1'), padding)) |
| chunk_file_path = os.path.join('./parts', f"{i}.part") |
| with open(chunk_file_path, 'wb') as chunk_file: |
| chunk_file.write(chunk) |
| |
| chunk_files = [os.path.join('./parts', f"{i}.part") for i in range(num_chunks)] |
| with ThreadPoolExecutor() as executor: |
| futures = [executor.submit(convert_part_to_image, chunk_file, i) for i, chunk_file in enumerate(chunk_files)] |
| for future in tqdm(as_completed(futures), desc="Converting chunks to images", unit="chunk", total=len(chunk_files)): |
| future.result() |
| |
| create_video_from_frames(FRAMES_DIR, VIDEO_PATH) |
| return VIDEO_PATH |
|
|
| def decode(video_path, output_file_path): |
| """Decode a video back to the original file by extracting frames, converting them back to chunks, and merging them.""" |
| |
| extracted_frames_dir = './extracted_frames' |
| extract_frames_from_video(video_path, extracted_frames_dir) |
| |
| extracted_frame_files = sorted([os.path.join(extracted_frames_dir, f) for f in os.listdir(extracted_frames_dir) if f.endswith('.png')]) |
| |
| chunk_files = [] |
| with ThreadPoolExecutor() as executor: |
| futures = [executor.submit(convert_frame_to_chunk, frame_file, i) for i, frame_file in enumerate(extracted_frame_files)] |
| for future in tqdm(as_completed(futures), desc="Converting frames to chunks", unit="frame", total=len(extracted_frame_files)): |
| chunk_file_path = future.result() |
| chunk_files.append(chunk_file_path) |
| |
| merge_chunks(chunk_files, output_file_path) |
| |
| original_hash = calculate_sha256(output_file_path) |
| return output_file_path, f"SHA-256 hash of the decoded file: {original_hash}" |
|
|
| def convert_frame_to_chunk(frame_file, i): |
| binary_string = hd_rgb_image_to_binary(frame_file) |
| chunk_file_path = os.path.join('./parts', f"{i}.part") |
| with open(chunk_file_path, 'wb') as chunk_file: |
| chunk_file.write(int(binary_string, 2).to_bytes(len(binary_string) // 8, byteorder='big')) |
| return chunk_file_path |
|
|
| def binary_string_to_rgb_image(binary_string, width=1920, height=1080, output_path='output_rgb.png'): |
| """Convert a binary string to an RGB image and save it.""" |
| if len(binary_string) != REQUIRED_LENGTH * 3: |
| raise ValueError(f"Binary string must be exactly {REQUIRED_LENGTH * 3} bits for {width}x{height} resolution") |
| |
| image = Image.new('RGB', (width, height)) |
| |
| pixels = image.load() |
| for y in range(height): |
| for x in range(width): |
| index = (y * width + x) * 3 |
| r = int(binary_string[index:index+1], 2) * 255 |
| g = int(binary_string[index+1:index+2], 2) * 255 |
| b = int(binary_string[index+2:index+3], 2) * 255 |
| pixels[x, y] = (r, g, b) |
| |
| image.save(output_path) |
|
|
| def hd_rgb_image_to_binary(image_path): |
| """Convert an RGB image back to a binary string.""" |
| image = Image.open(image_path).convert('RGB') |
| width, height = image.size |
| binary_string = "" |
| pixels = image.load() |
| for y in range(height): |
| for x in range(width): |
| r, g, b = pixels[x, y] |
| binary_string += '1' if r == 255 else '0' |
| binary_string += '1' if g == 255 else '0' |
| binary_string += '1' if b == 255 else '0' |
| return binary_string |
|
|
| def convert_part_to_image(chunk_file_path, part_index, width=1920, height=1080): |
| """Convert a chunk file to an image and save it in the /frames directory.""" |
| with open(chunk_file_path, 'rb') as chunk_file: |
| chunk_data = chunk_file.read() |
| |
| binary_string = ''.join([f'{byte:08b}' for byte in chunk_data]) |
| |
| binary_string = binary_string[:REQUIRED_LENGTH * 3].ljust(REQUIRED_LENGTH * 3, '0') |
| image_path = os.path.join(FRAMES_DIR, f"frame_{part_index}.png") |
| binary_string_to_rgb_image(binary_string, width, height, image_path) |
|
|
| def merge_chunks(chunk_files, output_file): |
| """Merge chunk files into a single file, removing padding.""" |
| with open(output_file, 'wb') as f: |
| |
| for chunk_file in tqdm(chunk_files, desc="Merging chunks", unit="chunk"): |
| with open(chunk_file, 'rb') as chunk: |
| data = chunk.read() |
| |
| data = data.rstrip(PADDING_TOKEN) |
| f.write(data) |
|
|
| def calculate_sha256(file_path): |
| """Calculate the SHA-256 hash of a file.""" |
| sha256_hash = hashlib.sha256() |
| with open(file_path, 'rb') as f: |
| for byte_block in iter(lambda: f.read(4096), b""): |
| sha256_hash.update(byte_block) |
| return sha256_hash.hexdigest() |
|
|
| def create_video_from_frames(frame_folder, video_path, width=1920, height=1080, fps=30): |
| """Create a video from a folder of image frames.""" |
| frame_files = sorted([os.path.join(frame_folder, f) for f in os.listdir(frame_folder) if f.endswith('.png')]) |
| |
| fourcc = cv2.VideoWriter_fourcc(*'H264') |
| video_writer = cv2.VideoWriter(video_path, fourcc, fps, (width, height)) |
| |
| for frame_file in tqdm(frame_files, desc="Creating video", unit="frame"): |
| frame = cv2.imread(frame_file) |
| video_writer.write(frame) |
| video_writer.release() |
|
|
| def extract_frames_from_video(video_path, output_folder): |
| """Extract frames from a video and save them to the output folder.""" |
| if not os.path.exists(output_folder): |
| os.makedirs(output_folder) |
| cap = cv2.VideoCapture(video_path) |
| frame_index = 0 |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| |
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
| frame_path = os.path.join(output_folder, f"frame_{frame_index}.png") |
| cv2.imwrite(frame_path, frame) |
| frame_index += 1 |
| cap.release() |
|
|
| def cleanup(): |
| """Clean up temporary files and directories.""" |
| directories_to_remove = ['./parts', './frames', './extracted_frames'] |
| for directory in directories_to_remove: |
| if os.path.exists(directory): |
| for file_name in os.listdir(directory): |
| file_path = os.path.join(directory, file_name) |
| try: |
| if os.path.isfile(file_path) or os.path.islink(file_path): |
| os.unlink(file_path) |
| elif os.path.isdir(file_path): |
| os.rmdir(file_path) |
| except Exception as e: |
| print(f"Failed to delete {file_path}. Reason: {e}") |
| os.rmdir(directory) |
|
|
| def encode_interface(file): |
| video_path = encode(file.name) |
| return video_path |
|
|
| def decode_interface(video, output_file): |
| output_file_path, result = decode(video.name, output_file.name) |
| return output_file_path, result |
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# File Encoding/Decoding Tool") |
|
|
| with gr.Tab("Encode"): |
| gr.Markdown("### Steps to Encode a File") |
| gr.Markdown("1. Upload the file you want to encode.") |
| gr.Markdown("2. Click the 'Encode' button.") |
| gr.Markdown("3. Download the encoded video.") |
|
|
| file_input = gr.File(label="Upload file to encode") |
| encode_button = gr.Button("Encode") |
| encode_output = gr.Video(label="Encoded Video") |
| encode_button.click(encode_interface, inputs=file_input, outputs=encode_output) |
|
|
| with gr.Tab("Decode"): |
| gr.Markdown("### Steps to Decode a Video") |
| gr.Markdown("1. Upload the video you want to decode.") |
| gr.Markdown("2. Specify the output file path.") |
| gr.Markdown("3. Click the 'Decode' button.") |
| gr.Markdown("4. Download the decoded file.") |
| gr.Markdown("5. View the decoding result.") |
|
|
| video_input = gr.File(label="Upload video to decode") |
| output_file_input = gr.File(label="Output file path") |
| decode_button = gr.Button("Decode") |
| decode_output = gr.File(label="Decoded File", interactive=True) |
| decode_result = gr.Textbox(label="Decoding Result") |
| decode_button.click(decode_interface, inputs=[video_input, output_file_input], outputs=[decode_output, decode_result]) |
|
|
| demo.launch() |