Spaces:
Sleeping
Sleeping
import os | |
import io | |
import json | |
import base64 | |
import struct | |
import logging | |
import gradio as gr | |
from PIL import Image | |
import numpy as np | |
from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.hazmat.primitives import serialization | |
from cryptography.hazmat.primitives.asymmetric import padding | |
from cryptography.exceptions import InvalidTag | |
# --- Constants --- | |
HEADER_BITS = 32 # 32 bits (4 bytes) to store the length of the crypto payload. | |
AES_GCM_NONCE_SIZE = 12 # AES-GCM uses a 12-byte (96-bit) nonce as recommended by NIST. | |
# --- Configure Logging --- | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# --- Load Keys and Config --- | |
# Load the private key from an environment variable for security. | |
KEYLOCK_PRIV_KEY = os.environ.get('KEYLOCK_PRIV_KEY') | |
# Load the public key from a file to display it in the documentation. | |
PUBLIC_KEY_PEM_STRING = "" | |
try: | |
with open("keylock_pub.pem", "r") as f: | |
PUBLIC_KEY_PEM_STRING = f.read() | |
logger.info("Successfully loaded public key from keylock_pub.pem for display.") | |
except FileNotFoundError: | |
logger.error("FATAL: keylock_pub.pem not found. The UI will show an error.") | |
PUBLIC_KEY_PEM_STRING = "Error: keylock_pub.pem file not found on the server. Make sure it's in your repository." | |
except Exception as e: | |
logger.error(f"Error loading public key: {e}") | |
PUBLIC_KEY_PEM_STRING = f"Error loading public key: {e}" | |
# --- Core Decryption Function --- | |
def decode_data(image_base64_string: str) -> dict: | |
""" | |
Decrypts a payload hidden in a PNG image via LSB steganography and hybrid encryption. | |
This function serves as the main API logic. It expects a base64-encoded string | |
of an image that was created with the corresponding public key. | |
Args: | |
image_base64_string: A string containing the base64-encoded PNG image. | |
Returns: | |
A dictionary containing the decrypted JSON data. | |
Raises: | |
gr.Error: If any part of the process fails, from decoding to decryption. | |
The error message is user-friendly and intended for the API consumer. | |
Binary Data Structure expected in the image's LSBs: | |
------------------------------------------------------------------------------------------ | |
| Header (32 bits) | Encrypted AES Key Length (4 bytes) | Encrypted AES Key (variable) ... | |
------------------------------------------------------------------------------------------ | |
... | AES Nonce (12 bytes) | AES-GCM Ciphertext + Tag (remaining bytes) | | |
------------------------------------------------------------------------ | |
""" | |
if not KEYLOCK_PRIV_KEY: | |
error_msg = "Server Error: The API is not configured with a private key. Please contact the administrator." | |
logger.error(error_msg) | |
raise gr.Error(error_msg) | |
try: | |
# 1. Decode Base64 and load image data | |
image_buffer = base64.b64decode(image_base64_string) | |
img = Image.open(io.BytesIO(image_buffer)).convert("RGB") | |
pixel_data = np.array(img).ravel() | |
# 2. Extract data length from the LSB header | |
if pixel_data.size < HEADER_BITS: | |
raise ValueError(f"Image is too small to contain a valid header. Minimum pixel count: {HEADER_BITS}") | |
header_binary_string = "".join(str(pixel & 1) for pixel in pixel_data[:HEADER_BITS]) | |
data_length = int(header_binary_string, 2) | |
if data_length == 0: | |
logger.info("Image decoded with zero data length. Returning empty dict.") | |
return {} | |
# 3. Extract the full crypto payload from LSBs | |
data_bits_count = data_length * 8 | |
end_offset = HEADER_BITS + data_bits_count | |
if pixel_data.size < end_offset: | |
raise ValueError("Image data corrupt or truncated. The actual image size is smaller than specified in the header.") | |
data_binary_string = "".join(str(pixel & 1) for pixel in pixel_data[HEADER_BITS:end_offset]) | |
crypto_payload = int(data_binary_string, 2).to_bytes(data_length, byteorder='big') | |
# 4. Load private key and unpack the crypto payload | |
private_key = serialization.load_pem_private_key(KEYLOCK_PRIV_KEY.encode(), password=None) | |
offset = 4 # First 4 bytes define the length of the encrypted AES key | |
encrypted_aes_key_len = struct.unpack('>I', crypto_payload[:offset])[0] | |
rsa_key_size_bytes = private_key.key_size // 8 | |
if encrypted_aes_key_len != rsa_key_size_bytes: | |
raise ValueError(f"Key mismatch: The data was encrypted with a {encrypted_aes_key_len*8}-bit key, but the server uses a {private_key.key_size}-bit key.") | |
encrypted_aes_key = crypto_payload[offset : offset + encrypted_aes_key_len] | |
offset += encrypted_aes_key_len | |
nonce = crypto_payload[offset : offset + AES_GCM_NONCE_SIZE] | |
offset += AES_GCM_NONCE_SIZE | |
ciphertext_with_tag = crypto_payload[offset:] | |
# 5. Decrypt the data | |
recovered_aes_key = private_key.decrypt( | |
encrypted_aes_key, | |
padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None) | |
) | |
aesgcm = AESGCM(recovered_aes_key) | |
decrypted_bytes = aesgcm.decrypt(nonce, ciphertext_with_tag, None) | |
logger.info("Successfully decoded and decrypted a payload from an image.") | |
return json.loads(decrypted_bytes.decode('utf-8')) | |
except (ValueError, InvalidTag, TypeError, struct.error) as e: | |
logger.error(f"Decryption failed: {e}") | |
raise gr.Error(f"Decryption failed. The image may be corrupt, was not created by a compatible client, or was encrypted with the wrong public key. Details: {e}") | |
except Exception as e: | |
logger.error(f"An unexpected error occurred during decoding: {e}", exc_info=True) | |
raise gr.Error(f"An unexpected server error occurred. Please check server logs. Details: {e}") | |
# ============================================================================== | |
# GRADIO INTERFACE (UI and API Endpoint Definition) | |
# ============================================================================== | |
with gr.Blocks(title="Secure Decoder API", theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# π Secure KeyLock Decoder API") | |
gr.Markdown("This application provides a secure API endpoint to decrypt and extract JSON data hidden within PNG images. **See the deployment guide at the top of `app.py` for setup instructions.**") | |
with gr.Tabs(): | |
with gr.TabItem("π Quick Start & Documentation"): | |
gr.Markdown("## What Is This?") | |
gr.Markdown( | |
"This service decodes messages that have been securely hidden inside images. It's designed for use cases where you need to transmit a small, sensitive JSON payload (like authentication tokens, user data, or configuration) " | |
"without it being easily detectable or readable. The process uses a combination of **steganography** (hiding data in the image's pixels) and **hybrid encryption** (the security of RSA combined with the speed of AES)." | |
) | |
with gr.Accordion("How It Works", open=False): | |
gr.Markdown( | |
""" | |
The security relies on a well-established cryptographic pattern called **Hybrid Encryption**. Hereβs the step-by-step process a client must follow to create a compatible image: | |
1. **Client Generates a One-Time Key**: The client creates a random, single-use 32-byte symmetric key for AES-GCM encryption. | |
2. **Client Encrypts Data**: The client takes the JSON payload and encrypts it using the one-time AES key. | |
3. **Client Encrypts the One-Time Key**: The client encrypts the AES key using this server's **Public RSA Key** (provided below). | |
4. **Client Creates the Payload**: The client bundles the encrypted AES key, nonce, and ciphertext into a single binary payload. | |
5. **Client Hides the Payload**: The client uses **Least Significant Bit (LSB) steganography** to hide the payload in a template image. | |
6. **Client Sends Image**: The final image is sent to this API endpoint. | |
7. **Server Decodes**: This server reverses the process: it extracts the payload, uses its private RSA key to decrypt the AES key, and then decrypts the final message. | |
""" | |
) | |
gr.Markdown("---") | |
gr.Markdown("## How to Use This API") | |
gr.Markdown("### Step 1: Get the Server's Public Key") | |
gr.Markdown("You must use this public key to encrypt data intended for this server. Save it to a file (e.g., `server_pub.pem`).") | |
gr.Code(value=PUBLIC_KEY_PEM_STRING, language="python", label="Server Public Key") | |
with gr.Accordion("Step 2: Create the Encrypted Image (Client-Side Python Example)", open=True): | |
gr.Markdown("Use the following Python code in your client application to create a KeyLock image.") | |
gr.Code( | |
language="python", | |
label="Client-Side Image Creation Script (create_keylock_image.py)", | |
value=""" | |
import json; import os; import struct; from PIL import Image; import numpy as np | |
from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
from cryptography.hazmat.primitives import hashes, serialization | |
from cryptography.hazmat.primitives.asymmetric import padding | |
def create_keylock_image(public_key_pem: str, json_data: dict, template_image_path: str, output_path: str): | |
public_key = serialization.load_pem_public_key(public_key_pem.encode()) | |
data_to_encrypt = json.dumps(json_data).encode('utf-8') | |
aes_key = AESGCM.generate_key(bit_length=256) | |
nonce = os.urandom(12) | |
ciphertext_with_tag = AESGCM(aes_key).encrypt(nonce, data_to_encrypt, None) | |
encrypted_aes_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)) | |
payload = struct.pack('>I', len(encrypted_aes_key)) + encrypted_aes_key + nonce + ciphertext_with_tag | |
img = Image.open(template_image_path).convert("RGB") | |
pixel_data = np.array(img) | |
full_binary_string = f'{len(payload):032b}' + ''.join(f'{byte:08b}' for byte in payload) | |
if len(full_binary_string) > pixel_data.size: | |
raise ValueError(f"Data is too large. Needs {len(full_binary_string)} pixels, image has {pixel_data.size}.") | |
flat_pixels = pixel_data.ravel() | |
for i in range(len(full_binary_string)): | |
flat_pixels[i] = (flat_pixels[i] & 0b11111110) | int(full_binary_string[i]) | |
Image.fromarray(pixel_data).save(output_path, "PNG") | |
print(f"Successfully created encrypted image at {output_path}") | |
# --- Example Usage --- | |
if __name__ == '__main__': | |
SERVER_PUBLIC_KEY = \"\"\"-----BEGIN PUBLIC KEY----- | |
YOUR_SERVER_PUBLIC_KEY_HERE | |
-----END PUBLIC KEY-----\"\"\" | |
my_secret_data = {"user_id": "user-12345", "token": "abcdef-123456"} | |
create_keylock_image(SERVER_PUBLIC_KEY, my_secret_data, "template.png", "encrypted_image.png") | |
""" | |
) | |
gr.Markdown("### Step 3: Call the API Endpoint") | |
gr.Markdown( | |
"- **Method**: `POST`\n" | |
"- **Endpoint**: `/run/keylock-auth-decoder`\n" | |
"- **Body**: A JSON object containing a base64-encoded string of the image." | |
) | |
gr.Code(language="json", label="JSON Payload Schema", value='{\n "data": [\n "<base64_encoded_image_string>"\n ]\n}') | |
with gr.Accordion("Python `requests` Example", open=True): | |
gr.Code( | |
language="python", | |
label="Example API Call with Python `requests`", | |
value=""" | |
import requests | |
import base64 | |
IMAGE_PATH = "encrypted_image.png" | |
API_URL = "https://YOUR-SPACE-NAME.hf.space/run/keylock-auth-decoder" # <-- Change this URL | |
with open(IMAGE_PATH, "rb") as f: | |
b64_string = base64.b64encode(f.read()).decode('utf-8') | |
response = requests.post(API_URL, json={"data": [b64_string]}) | |
if response.status_code == 200: | |
print("Success! Decrypted data:", response.json()['data'][0]) | |
else: | |
print(f"Error: {response.status_code}", response.text) | |
""" | |
) | |
with gr.TabItem("βοΈ Server Status & Error Guide"): | |
gr.Markdown("## Server Status") | |
key_status = "β Loaded successfully from secrets." if KEYLOCK_PRIV_KEY else "β NOT FOUND. The API is non-functional. The administrator must set the `KEYLOCK_PRIV_KEY` secret in the Space settings. See the deployment guide in `app.py`." | |
gr.Textbox(label="Private Key Status", value=key_status, interactive=False, lines=3) | |
gr.Markdown("---") | |
gr.Markdown("## Error Handling Guide") | |
gr.Markdown( | |
""" | |
- **`"Decryption failed. ... InvalidTag"`**: The most common error. It means the image was encrypted with a **different public key** or the image file was corrupted. | |
- **`"Decryption failed. ... Key mismatch"`**: The RSA key size used to encrypt the data does not match the server's key size. Use the public key provided by this service. | |
- **`"Image data corrupt or truncated..."`**: The image file is incomplete or damaged. | |
- **`"Server Error: ... not configured"`**: A server-side problem. The administrator has not set the `KEYLOCK_PRIV_KEY` secret correctly. | |
""" | |
) | |
# --- This hidden component creates the API endpoint --- | |
with gr.Row(visible=False): | |
api_input = gr.Textbox() | |
api_output = gr.JSON() | |
gr.Interface( | |
fn=decode_data, | |
inputs=api_input, | |
outputs=api_output, | |
api_name="keylock-auth-decoder", | |
) | |
if __name__ == "__main__": | |
# To run on a server, you might use: demo.launch(server_name="0.0.0.0") | |
# See the deployment guide at the top of this file for more details. | |
demo.launch(mcp_server=True) |