Spaces:
Sleeping
Sleeping
import io | |
import json | |
import os | |
import struct | |
import logging | |
from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.hazmat.primitives import serialization | |
from cryptography.hazmat.primitives.asymmetric import rsa, padding | |
from PIL import Image, ImageDraw, ImageFont | |
import numpy as np | |
# --- Configure Logging --- | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# --- Constants --- | |
AES_KEY_SIZE_CRYPTO = 32 | |
AES_GCM_NONCE_SIZE_CRYPTO = 12 | |
LENGTH_HEADER_SIZE = 4 | |
def parse_kv_string_to_dict(kv_str: str) -> dict: | |
"""Parses a string of key:value or key=value pairs into a dictionary.""" | |
data_dict = {} | |
for line in kv_str.splitlines(): | |
line = line.strip() | |
if not line or line.startswith('#'): | |
continue | |
if '=' in line: | |
key, value = line.split('=', 1) | |
elif ':' in line: | |
key, value = line.split(':', 1) | |
else: | |
raise ValueError(f"Invalid format on line: '{line}'. Use 'key: value' or 'key = value'.") | |
data_dict[key.strip()] = value.strip().strip("'\"") | |
return data_dict | |
def encrypt_data_hybrid(data: bytes, public_key_pem: str) -> bytes: | |
"""Encrypts data using RSA-OAEP + AES-GCM hybrid encryption.""" | |
public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8')) | |
aes_key = os.urandom(AES_KEY_SIZE_CRYPTO) | |
nonce = os.urandom(AES_GCM_NONCE_SIZE_CRYPTO) | |
ciphertext_with_tag = AESGCM(aes_key).encrypt(nonce, data, None) | |
rsa_encrypted_aes_key = public_key.encrypt( | |
aes_key, | |
padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None) | |
) | |
encrypted_aes_key_len_bytes = struct.pack('>I', len(rsa_encrypted_aes_key)) | |
return encrypted_aes_key_len_bytes + rsa_encrypted_aes_key + nonce + ciphertext_with_tag | |
def embed_data_in_image(img_obj: Image.Image, data: bytes) -> Image.Image: | |
"""Embeds data into the LSB of an image.""" | |
img_rgb = img_obj.convert("RGB") | |
pixel_data = np.array(img_rgb).ravel() | |
data_length_header = struct.pack('>I', len(data)) | |
binary_payload = ''.join(format(byte, '08b') for byte in data_length_header + data) | |
if len(binary_payload) > pixel_data.size: | |
raise ValueError(f"Data too large for image capacity. Needs {len(binary_payload)} bits, image has {pixel_data.size}.") | |
for i in range(len(binary_payload)): | |
pixel_data[i] = (pixel_data[i] & 0xFE) | int(binary_payload[i]) | |
stego_pixels = pixel_data.reshape(img_rgb.size[1], img_rgb.size[0], 3) | |
return Image.fromarray(stego_pixels, 'RGB') | |
def create_carrier_image(width=800, height=600) -> Image.Image: | |
"""Generates a simple carrier image.""" | |
img = Image.new('RGB', (width, height), color = (73, 109, 137)) | |
d = ImageDraw.Draw(img) | |
try: | |
font = ImageFont.truetype("DejaVuSans.ttf", 40) | |
except IOError: | |
font = ImageFont.load_default() | |
d.text((width/2, height/2), "KeyLock Encrypted Data", fill=(255,255,0), font=font, anchor="ms") | |
return img | |
def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image: | |
""" | |
High-level orchestrator function to create the final encrypted image. | |
""" | |
logger.info("Starting image creation process...") | |
# 1. Parse and validate secret data | |
if not secret_data_str.strip(): | |
raise ValueError("Secret data cannot be empty.") | |
secret_dict = parse_kv_string_to_dict(secret_data_str) | |
if not secret_dict: | |
raise ValueError("No valid key-value pairs found in secret data.") | |
# 2. Serialize data to JSON bytes | |
json_bytes = json.dumps(secret_dict).encode('utf-8') | |
logger.info(f"Serialized {len(json_bytes)} bytes of data.") | |
# 3. Encrypt the data | |
encrypted_payload = encrypt_data_hybrid(json_bytes, public_key_pem) | |
logger.info(f"Encrypted payload created with size: {len(encrypted_payload)} bytes.") | |
# 4. Create a carrier image and embed the data | |
carrier_image = create_carrier_image() | |
stego_image = embed_data_in_image(carrier_image, encrypted_payload) | |
logger.info("Data successfully embedded into carrier image.") | |
return stego_image | |