import os import io import json import base64 import struct import logging import gradio as gr from PIL import Image import numpy as np from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import padding from cryptography.exceptions import InvalidTag # --- Constants --- HEADER_BITS = 32 # 32 bits (4 bytes) to store the length of the crypto payload. AES_GCM_NONCE_SIZE = 12 # AES-GCM uses a 12-byte (96-bit) nonce as recommended by NIST. # --- Configure Logging --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # --- Load Keys and Config --- # Load the private key from an environment variable for security. KEYLOCK_PRIV_KEY = os.environ.get('KEYLOCK_PRIV_KEY') # Load the public key from a file to display it in the documentation. PUBLIC_KEY_PEM_STRING = "" try: with open("keylock_pub.pem", "r") as f: PUBLIC_KEY_PEM_STRING = f.read() logger.info("Successfully loaded public key from keylock_pub.pem for display.") except FileNotFoundError: logger.error("FATAL: keylock_pub.pem not found. The UI will show an error.") PUBLIC_KEY_PEM_STRING = "Error: keylock_pub.pem file not found on the server. Make sure it's in your repository." except Exception as e: logger.error(f"Error loading public key: {e}") PUBLIC_KEY_PEM_STRING = f"Error loading public key: {e}" # --- Core Decryption Function --- def decode_data(image_base64_string: str) -> dict: """ Decrypts a payload hidden in a PNG image via LSB steganography and hybrid encryption. This function serves as the main API logic. It expects a base64-encoded string of an image that was created with the corresponding public key. Args: image_base64_string: A string containing the base64-encoded PNG image. Returns: A dictionary containing the decrypted JSON data. Raises: gr.Error: If any part of the process fails, from decoding to decryption. The error message is user-friendly and intended for the API consumer. Binary Data Structure expected in the image's LSBs: ------------------------------------------------------------------------------------------ | Header (32 bits) | Encrypted AES Key Length (4 bytes) | Encrypted AES Key (variable) ... ------------------------------------------------------------------------------------------ ... | AES Nonce (12 bytes) | AES-GCM Ciphertext + Tag (remaining bytes) | ------------------------------------------------------------------------ """ if not KEYLOCK_PRIV_KEY: error_msg = "Server Error: The API is not configured with a private key. Please contact the administrator." logger.error(error_msg) raise gr.Error(error_msg) try: # 1. Decode Base64 and load image data image_buffer = base64.b64decode(image_base64_string) img = Image.open(io.BytesIO(image_buffer)).convert("RGB") pixel_data = np.array(img).ravel() # 2. Extract data length from the LSB header if pixel_data.size < HEADER_BITS: raise ValueError(f"Image is too small to contain a valid header. Minimum pixel count: {HEADER_BITS}") header_binary_string = "".join(str(pixel & 1) for pixel in pixel_data[:HEADER_BITS]) data_length = int(header_binary_string, 2) if data_length == 0: logger.info("Image decoded with zero data length. Returning empty dict.") return {} # 3. Extract the full crypto payload from LSBs data_bits_count = data_length * 8 end_offset = HEADER_BITS + data_bits_count if pixel_data.size < end_offset: raise ValueError("Image data corrupt or truncated. The actual image size is smaller than specified in the header.") data_binary_string = "".join(str(pixel & 1) for pixel in pixel_data[HEADER_BITS:end_offset]) crypto_payload = int(data_binary_string, 2).to_bytes(data_length, byteorder='big') # 4. Load private key and unpack the crypto payload private_key = serialization.load_pem_private_key(KEYLOCK_PRIV_KEY.encode(), password=None) offset = 4 # First 4 bytes define the length of the encrypted AES key encrypted_aes_key_len = struct.unpack('>I', crypto_payload[:offset])[0] rsa_key_size_bytes = private_key.key_size // 8 if encrypted_aes_key_len != rsa_key_size_bytes: raise ValueError(f"Key mismatch: The data was encrypted with a {encrypted_aes_key_len*8}-bit key, but the server uses a {private_key.key_size}-bit key.") encrypted_aes_key = crypto_payload[offset : offset + encrypted_aes_key_len] offset += encrypted_aes_key_len nonce = crypto_payload[offset : offset + AES_GCM_NONCE_SIZE] offset += AES_GCM_NONCE_SIZE ciphertext_with_tag = crypto_payload[offset:] # 5. Decrypt the data recovered_aes_key = private_key.decrypt( encrypted_aes_key, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None) ) aesgcm = AESGCM(recovered_aes_key) decrypted_bytes = aesgcm.decrypt(nonce, ciphertext_with_tag, None) logger.info("Successfully decoded and decrypted a payload from an image.") return json.loads(decrypted_bytes.decode('utf-8')) except (ValueError, InvalidTag, TypeError, struct.error) as e: logger.error(f"Decryption failed: {e}") raise gr.Error(f"Decryption failed. The image may be corrupt, was not created by a compatible client, or was encrypted with the wrong public key. Details: {e}") except Exception as e: logger.error(f"An unexpected error occurred during decoding: {e}", exc_info=True) raise gr.Error(f"An unexpected server error occurred. Please check server logs. Details: {e}") # ============================================================================== # GRADIO INTERFACE (UI and API Endpoint Definition) # ============================================================================== with gr.Blocks(title="Secure Decoder API", theme=gr.themes.Soft()) as demo: gr.Markdown("# πŸ” Secure KeyLock Decoder API") gr.Markdown("This application provides a secure API endpoint to decrypt and extract JSON data hidden within PNG images. **See the deployment guide at the top of `app.py` for setup instructions.**") with gr.Tabs(): with gr.TabItem("πŸš€ Quick Start & Documentation"): gr.Markdown("## What Is This?") gr.Markdown( "This service decodes messages that have been securely hidden inside images. It's designed for use cases where you need to transmit a small, sensitive JSON payload (like authentication tokens, user data, or configuration) " "without it being easily detectable or readable. The process uses a combination of **steganography** (hiding data in the image's pixels) and **hybrid encryption** (the security of RSA combined with the speed of AES)." ) with gr.Accordion("How It Works", open=False): gr.Markdown( """ The security relies on a well-established cryptographic pattern called **Hybrid Encryption**. Here’s the step-by-step process a client must follow to create a compatible image: 1. **Client Generates a One-Time Key**: The client creates a random, single-use 32-byte symmetric key for AES-GCM encryption. 2. **Client Encrypts Data**: The client takes the JSON payload and encrypts it using the one-time AES key. 3. **Client Encrypts the One-Time Key**: The client encrypts the AES key using this server's **Public RSA Key** (provided below). 4. **Client Creates the Payload**: The client bundles the encrypted AES key, nonce, and ciphertext into a single binary payload. 5. **Client Hides the Payload**: The client uses **Least Significant Bit (LSB) steganography** to hide the payload in a template image. 6. **Client Sends Image**: The final image is sent to this API endpoint. 7. **Server Decodes**: This server reverses the process: it extracts the payload, uses its private RSA key to decrypt the AES key, and then decrypts the final message. """ ) gr.Markdown("---") gr.Markdown("## How to Use This API") gr.Markdown("### Step 1: Get the Server's Public Key") gr.Markdown("You must use this public key to encrypt data intended for this server. Save it to a file (e.g., `server_pub.pem`).") gr.Code(value=PUBLIC_KEY_PEM_STRING, language="python", label="Server Public Key") with gr.Accordion("Step 2: Create the Encrypted Image (Client-Side Python Example)", open=True): gr.Markdown("Use the following Python code in your client application to create a KeyLock image.") gr.Code( language="python", label="Client-Side Image Creation Script (create_keylock_image.py)", value=""" import json; import os; import struct; from PIL import Image; import numpy as np from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding def create_keylock_image(public_key_pem: str, json_data: dict, template_image_path: str, output_path: str): public_key = serialization.load_pem_public_key(public_key_pem.encode()) data_to_encrypt = json.dumps(json_data).encode('utf-8') aes_key = AESGCM.generate_key(bit_length=256) nonce = os.urandom(12) ciphertext_with_tag = AESGCM(aes_key).encrypt(nonce, data_to_encrypt, None) encrypted_aes_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)) payload = struct.pack('>I', len(encrypted_aes_key)) + encrypted_aes_key + nonce + ciphertext_with_tag img = Image.open(template_image_path).convert("RGB") pixel_data = np.array(img) full_binary_string = f'{len(payload):032b}' + ''.join(f'{byte:08b}' for byte in payload) if len(full_binary_string) > pixel_data.size: raise ValueError(f"Data is too large. Needs {len(full_binary_string)} pixels, image has {pixel_data.size}.") flat_pixels = pixel_data.ravel() for i in range(len(full_binary_string)): flat_pixels[i] = (flat_pixels[i] & 0b11111110) | int(full_binary_string[i]) Image.fromarray(pixel_data).save(output_path, "PNG") print(f"Successfully created encrypted image at {output_path}") # --- Example Usage --- if __name__ == '__main__': SERVER_PUBLIC_KEY = \"\"\"-----BEGIN PUBLIC KEY----- YOUR_SERVER_PUBLIC_KEY_HERE -----END PUBLIC KEY-----\"\"\" my_secret_data = {"user_id": "user-12345", "token": "abcdef-123456"} create_keylock_image(SERVER_PUBLIC_KEY, my_secret_data, "template.png", "encrypted_image.png") """ ) gr.Markdown("### Step 3: Call the API Endpoint") gr.Markdown( "- **Method**: `POST`\n" "- **Endpoint**: `/run/keylock-auth-decoder`\n" "- **Body**: A JSON object containing a base64-encoded string of the image." ) gr.Code(language="json", label="JSON Payload Schema", value='{\n "data": [\n ""\n ]\n}') with gr.Accordion("Python `requests` Example", open=True): gr.Code( language="python", label="Example API Call with Python `requests`", value=""" import requests import base64 IMAGE_PATH = "encrypted_image.png" API_URL = "https://YOUR-SPACE-NAME.hf.space/run/keylock-auth-decoder" # <-- Change this URL with open(IMAGE_PATH, "rb") as f: b64_string = base64.b64encode(f.read()).decode('utf-8') response = requests.post(API_URL, json={"data": [b64_string]}) if response.status_code == 200: print("Success! Decrypted data:", response.json()['data'][0]) else: print(f"Error: {response.status_code}", response.text) """ ) with gr.TabItem("βš™οΈ Server Status & Error Guide"): gr.Markdown("## Server Status") key_status = "βœ… Loaded successfully from secrets." if KEYLOCK_PRIV_KEY else "❌ NOT FOUND. The API is non-functional. The administrator must set the `KEYLOCK_PRIV_KEY` secret in the Space settings. See the deployment guide in `app.py`." gr.Textbox(label="Private Key Status", value=key_status, interactive=False, lines=3) gr.Markdown("---") gr.Markdown("## Error Handling Guide") gr.Markdown( """ - **`"Decryption failed. ... InvalidTag"`**: The most common error. It means the image was encrypted with a **different public key** or the image file was corrupted. - **`"Decryption failed. ... Key mismatch"`**: The RSA key size used to encrypt the data does not match the server's key size. Use the public key provided by this service. - **`"Image data corrupt or truncated..."`**: The image file is incomplete or damaged. - **`"Server Error: ... not configured"`**: A server-side problem. The administrator has not set the `KEYLOCK_PRIV_KEY` secret correctly. """ ) # --- This hidden component creates the API endpoint --- with gr.Row(visible=False): api_input = gr.Textbox() api_output = gr.JSON() gr.Interface( fn=decode_data, inputs=api_input, outputs=api_output, api_name="keylock-auth-decoder", ) if __name__ == "__main__": # To run on a server, you might use: demo.launch(server_name="0.0.0.0") # See the deployment guide at the top of this file for more details. demo.launch(mcp_server=True)