Spaces:
Sleeping
Sleeping
File size: 14,299 Bytes
d123ded 5523299 d123ded c72b017 5523299 414ff7f 5523299 414ff7f 5523299 414ff7f 5523299 9844f6e 5523299 414ff7f 762b856 c72b017 5523299 c72b017 414ff7f 5523299 9844f6e d123ded 5523299 c72b017 414ff7f 5523299 9844f6e 414ff7f 5523299 414ff7f 5523299 9844f6e 414ff7f 9844f6e 5523299 9844f6e 5523299 414ff7f 9844f6e 5523299 414ff7f 9844f6e 5523299 9844f6e 414ff7f 5523299 414ff7f 5523299 9844f6e 5523299 414ff7f 5523299 c72b017 5523299 d123ded c72b017 5523299 d123ded c72b017 d123ded 5523299 9844f6e d123ded 5523299 9844f6e 5523299 1a20ae9 5523299 9844f6e 5523299 414ff7f 5523299 698f752 9844f6e 5523299 9844f6e 5523299 9844f6e 5523299 9844f6e 5523299 9844f6e 5523299 9844f6e 5523299 9844f6e 5523299 9844f6e 5523299 9844f6e 5523299 9844f6e 5523299 9844f6e 5523299 9844f6e 5523299 9844f6e 5523299 9844f6e 5523299 9844f6e 5523299 414ff7f c72b017 5523299 9844f6e 5523299 414ff7f 9844f6e 1a20ae9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
import os
import io
import json
import base64
import struct
import logging
import gradio as gr
from PIL import Image
import numpy as np
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.exceptions import InvalidTag
# --- Constants ---
HEADER_BITS = 32 # 32 bits (4 bytes) to store the length of the crypto payload.
AES_GCM_NONCE_SIZE = 12 # AES-GCM uses a 12-byte (96-bit) nonce as recommended by NIST.
# --- Configure Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Load Keys and Config ---
# Load the private key from an environment variable for security.
KEYLOCK_PRIV_KEY = os.environ.get('KEYLOCK_PRIV_KEY')
# Load the public key from a file to display it in the documentation.
PUBLIC_KEY_PEM_STRING = ""
try:
with open("keylock_pub.pem", "r") as f:
PUBLIC_KEY_PEM_STRING = f.read()
logger.info("Successfully loaded public key from keylock_pub.pem for display.")
except FileNotFoundError:
logger.error("FATAL: keylock_pub.pem not found. The UI will show an error.")
PUBLIC_KEY_PEM_STRING = "Error: keylock_pub.pem file not found on the server. Make sure it's in your repository."
except Exception as e:
logger.error(f"Error loading public key: {e}")
PUBLIC_KEY_PEM_STRING = f"Error loading public key: {e}"
# --- Core Decryption Function ---
def decode_data(image_base64_string: str) -> dict:
"""
Decrypts a payload hidden in a PNG image via LSB steganography and hybrid encryption.
This function serves as the main API logic. It expects a base64-encoded string
of an image that was created with the corresponding public key.
Args:
image_base64_string: A string containing the base64-encoded PNG image.
Returns:
A dictionary containing the decrypted JSON data.
Raises:
gr.Error: If any part of the process fails, from decoding to decryption.
The error message is user-friendly and intended for the API consumer.
Binary Data Structure expected in the image's LSBs:
------------------------------------------------------------------------------------------
| Header (32 bits) | Encrypted AES Key Length (4 bytes) | Encrypted AES Key (variable) ...
------------------------------------------------------------------------------------------
... | AES Nonce (12 bytes) | AES-GCM Ciphertext + Tag (remaining bytes) |
------------------------------------------------------------------------
"""
if not KEYLOCK_PRIV_KEY:
error_msg = "Server Error: The API is not configured with a private key. Please contact the administrator."
logger.error(error_msg)
raise gr.Error(error_msg)
try:
# 1. Decode Base64 and load image data
image_buffer = base64.b64decode(image_base64_string)
img = Image.open(io.BytesIO(image_buffer)).convert("RGB")
pixel_data = np.array(img).ravel()
# 2. Extract data length from the LSB header
if pixel_data.size < HEADER_BITS:
raise ValueError(f"Image is too small to contain a valid header. Minimum pixel count: {HEADER_BITS}")
header_binary_string = "".join(str(pixel & 1) for pixel in pixel_data[:HEADER_BITS])
data_length = int(header_binary_string, 2)
if data_length == 0:
logger.info("Image decoded with zero data length. Returning empty dict.")
return {}
# 3. Extract the full crypto payload from LSBs
data_bits_count = data_length * 8
end_offset = HEADER_BITS + data_bits_count
if pixel_data.size < end_offset:
raise ValueError("Image data corrupt or truncated. The actual image size is smaller than specified in the header.")
data_binary_string = "".join(str(pixel & 1) for pixel in pixel_data[HEADER_BITS:end_offset])
crypto_payload = int(data_binary_string, 2).to_bytes(data_length, byteorder='big')
# 4. Load private key and unpack the crypto payload
private_key = serialization.load_pem_private_key(KEYLOCK_PRIV_KEY.encode(), password=None)
offset = 4 # First 4 bytes define the length of the encrypted AES key
encrypted_aes_key_len = struct.unpack('>I', crypto_payload[:offset])[0]
rsa_key_size_bytes = private_key.key_size // 8
if encrypted_aes_key_len != rsa_key_size_bytes:
raise ValueError(f"Key mismatch: The data was encrypted with a {encrypted_aes_key_len*8}-bit key, but the server uses a {private_key.key_size}-bit key.")
encrypted_aes_key = crypto_payload[offset : offset + encrypted_aes_key_len]
offset += encrypted_aes_key_len
nonce = crypto_payload[offset : offset + AES_GCM_NONCE_SIZE]
offset += AES_GCM_NONCE_SIZE
ciphertext_with_tag = crypto_payload[offset:]
# 5. Decrypt the data
recovered_aes_key = private_key.decrypt(
encrypted_aes_key,
padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
)
aesgcm = AESGCM(recovered_aes_key)
decrypted_bytes = aesgcm.decrypt(nonce, ciphertext_with_tag, None)
logger.info("Successfully decoded and decrypted a payload from an image.")
return json.loads(decrypted_bytes.decode('utf-8'))
except (ValueError, InvalidTag, TypeError, struct.error) as e:
logger.error(f"Decryption failed: {e}")
raise gr.Error(f"Decryption failed. The image may be corrupt, was not created by a compatible client, or was encrypted with the wrong public key. Details: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred during decoding: {e}", exc_info=True)
raise gr.Error(f"An unexpected server error occurred. Please check server logs. Details: {e}")
# ==============================================================================
# GRADIO INTERFACE (UI and API Endpoint Definition)
# ==============================================================================
with gr.Blocks(title="Secure Decoder API", theme=gr.themes.Soft()) as demo:
gr.Markdown("# π Secure KeyLock Decoder API")
gr.Markdown("This application provides a secure API endpoint to decrypt and extract JSON data hidden within PNG images. **See the deployment guide at the top of `app.py` for setup instructions.**")
with gr.Tabs():
with gr.TabItem("π Quick Start & Documentation"):
gr.Markdown("## What Is This?")
gr.Markdown(
"This service decodes messages that have been securely hidden inside images. It's designed for use cases where you need to transmit a small, sensitive JSON payload (like authentication tokens, user data, or configuration) "
"without it being easily detectable or readable. The process uses a combination of **steganography** (hiding data in the image's pixels) and **hybrid encryption** (the security of RSA combined with the speed of AES)."
)
with gr.Accordion("How It Works", open=False):
gr.Markdown(
"""
The security relies on a well-established cryptographic pattern called **Hybrid Encryption**. Hereβs the step-by-step process a client must follow to create a compatible image:
1. **Client Generates a One-Time Key**: The client creates a random, single-use 32-byte symmetric key for AES-GCM encryption.
2. **Client Encrypts Data**: The client takes the JSON payload and encrypts it using the one-time AES key.
3. **Client Encrypts the One-Time Key**: The client encrypts the AES key using this server's **Public RSA Key** (provided below).
4. **Client Creates the Payload**: The client bundles the encrypted AES key, nonce, and ciphertext into a single binary payload.
5. **Client Hides the Payload**: The client uses **Least Significant Bit (LSB) steganography** to hide the payload in a template image.
6. **Client Sends Image**: The final image is sent to this API endpoint.
7. **Server Decodes**: This server reverses the process: it extracts the payload, uses its private RSA key to decrypt the AES key, and then decrypts the final message.
"""
)
gr.Markdown("---")
gr.Markdown("## How to Use This API")
gr.Markdown("### Step 1: Get the Server's Public Key")
gr.Markdown("You must use this public key to encrypt data intended for this server. Save it to a file (e.g., `server_pub.pem`).")
gr.Code(value=PUBLIC_KEY_PEM_STRING, language="python", label="Server Public Key")
with gr.Accordion("Step 2: Create the Encrypted Image (Client-Side Python Example)", open=True):
gr.Markdown("Use the following Python code in your client application to create a KeyLock image.")
gr.Code(
language="python",
label="Client-Side Image Creation Script (create_keylock_image.py)",
value="""
import json; import os; import struct; from PIL import Image; import numpy as np
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
def create_keylock_image(public_key_pem: str, json_data: dict, template_image_path: str, output_path: str):
public_key = serialization.load_pem_public_key(public_key_pem.encode())
data_to_encrypt = json.dumps(json_data).encode('utf-8')
aes_key = AESGCM.generate_key(bit_length=256)
nonce = os.urandom(12)
ciphertext_with_tag = AESGCM(aes_key).encrypt(nonce, data_to_encrypt, None)
encrypted_aes_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
payload = struct.pack('>I', len(encrypted_aes_key)) + encrypted_aes_key + nonce + ciphertext_with_tag
img = Image.open(template_image_path).convert("RGB")
pixel_data = np.array(img)
full_binary_string = f'{len(payload):032b}' + ''.join(f'{byte:08b}' for byte in payload)
if len(full_binary_string) > pixel_data.size:
raise ValueError(f"Data is too large. Needs {len(full_binary_string)} pixels, image has {pixel_data.size}.")
flat_pixels = pixel_data.ravel()
for i in range(len(full_binary_string)):
flat_pixels[i] = (flat_pixels[i] & 0b11111110) | int(full_binary_string[i])
Image.fromarray(pixel_data).save(output_path, "PNG")
print(f"Successfully created encrypted image at {output_path}")
# --- Example Usage ---
if __name__ == '__main__':
SERVER_PUBLIC_KEY = \"\"\"-----BEGIN PUBLIC KEY-----
YOUR_SERVER_PUBLIC_KEY_HERE
-----END PUBLIC KEY-----\"\"\"
my_secret_data = {"user_id": "user-12345", "token": "abcdef-123456"}
create_keylock_image(SERVER_PUBLIC_KEY, my_secret_data, "template.png", "encrypted_image.png")
"""
)
gr.Markdown("### Step 3: Call the API Endpoint")
gr.Markdown(
"- **Method**: `POST`\n"
"- **Endpoint**: `/run/keylock-auth-decoder`\n"
"- **Body**: A JSON object containing a base64-encoded string of the image."
)
gr.Code(language="json", label="JSON Payload Schema", value='{\n "data": [\n "<base64_encoded_image_string>"\n ]\n}')
with gr.Accordion("Python `requests` Example", open=True):
gr.Code(
language="python",
label="Example API Call with Python `requests`",
value="""
import requests
import base64
IMAGE_PATH = "encrypted_image.png"
API_URL = "https://YOUR-SPACE-NAME.hf.space/run/keylock-auth-decoder" # <-- Change this URL
with open(IMAGE_PATH, "rb") as f:
b64_string = base64.b64encode(f.read()).decode('utf-8')
response = requests.post(API_URL, json={"data": [b64_string]})
if response.status_code == 200:
print("Success! Decrypted data:", response.json()['data'][0])
else:
print(f"Error: {response.status_code}", response.text)
"""
)
with gr.TabItem("βοΈ Server Status & Error Guide"):
gr.Markdown("## Server Status")
key_status = "β
Loaded successfully from secrets." if KEYLOCK_PRIV_KEY else "β NOT FOUND. The API is non-functional. The administrator must set the `KEYLOCK_PRIV_KEY` secret in the Space settings. See the deployment guide in `app.py`."
gr.Textbox(label="Private Key Status", value=key_status, interactive=False, lines=3)
gr.Markdown("---")
gr.Markdown("## Error Handling Guide")
gr.Markdown(
"""
- **`"Decryption failed. ... InvalidTag"`**: The most common error. It means the image was encrypted with a **different public key** or the image file was corrupted.
- **`"Decryption failed. ... Key mismatch"`**: The RSA key size used to encrypt the data does not match the server's key size. Use the public key provided by this service.
- **`"Image data corrupt or truncated..."`**: The image file is incomplete or damaged.
- **`"Server Error: ... not configured"`**: A server-side problem. The administrator has not set the `KEYLOCK_PRIV_KEY` secret correctly.
"""
)
# --- This hidden component creates the API endpoint ---
with gr.Row(visible=False):
api_input = gr.Textbox()
api_output = gr.JSON()
gr.Interface(
fn=decode_data,
inputs=api_input,
outputs=api_output,
api_name="keylock-auth-decoder",
)
if __name__ == "__main__":
# To run on a server, you might use: demo.launch(server_name="0.0.0.0")
# See the deployment guide at the top of this file for more details.
demo.launch(mcp_server=True) |