import io import json import os import struct import logging from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa, padding from PIL import Image, ImageDraw, ImageFont import numpy as np # --- Configure Logging --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # --- Constants --- AES_KEY_SIZE_CRYPTO = 32 AES_GCM_NONCE_SIZE_CRYPTO = 12 LENGTH_HEADER_SIZE = 4 def parse_kv_string_to_dict(kv_str: str) -> dict: """Parses a string of key:value or key=value pairs into a dictionary.""" data_dict = {} for line in kv_str.splitlines(): line = line.strip() if not line or line.startswith('#'): continue if '=' in line: key, value = line.split('=', 1) elif ':' in line: key, value = line.split(':', 1) else: raise ValueError(f"Invalid format on line: '{line}'. Use 'key: value' or 'key = value'.") data_dict[key.strip()] = value.strip().strip("'\"") return data_dict def encrypt_data_hybrid(data: bytes, public_key_pem: str) -> bytes: """Encrypts data using RSA-OAEP + AES-GCM hybrid encryption.""" public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8')) aes_key = os.urandom(AES_KEY_SIZE_CRYPTO) nonce = os.urandom(AES_GCM_NONCE_SIZE_CRYPTO) ciphertext_with_tag = AESGCM(aes_key).encrypt(nonce, data, None) rsa_encrypted_aes_key = public_key.encrypt( aes_key, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None) ) encrypted_aes_key_len_bytes = struct.pack('>I', len(rsa_encrypted_aes_key)) return encrypted_aes_key_len_bytes + rsa_encrypted_aes_key + nonce + ciphertext_with_tag def embed_data_in_image(img_obj: Image.Image, data: bytes) -> Image.Image: """Embeds data into the LSB of an image.""" img_rgb = img_obj.convert("RGB") pixel_data = np.array(img_rgb).ravel() data_length_header = struct.pack('>I', len(data)) binary_payload = ''.join(format(byte, '08b') for byte in data_length_header + data) if len(binary_payload) > pixel_data.size: raise ValueError(f"Data too large for image capacity. Needs {len(binary_payload)} bits, image has {pixel_data.size}.") for i in range(len(binary_payload)): pixel_data[i] = (pixel_data[i] & 0xFE) | int(binary_payload[i]) stego_pixels = pixel_data.reshape(img_rgb.size[1], img_rgb.size[0], 3) return Image.fromarray(stego_pixels, 'RGB') def create_carrier_image(width=800, height=600) -> Image.Image: """Generates a simple carrier image.""" img = Image.new('RGB', (width, height), color = (73, 109, 137)) d = ImageDraw.Draw(img) try: font = ImageFont.truetype("DejaVuSans.ttf", 40) except IOError: font = ImageFont.load_default() d.text((width/2, height/2), "KeyLock Encrypted Data", fill=(255,255,0), font=font, anchor="ms") return img def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image: """ High-level orchestrator function to create the final encrypted image. """ logger.info("Starting image creation process...") # 1. Parse and validate secret data if not secret_data_str.strip(): raise ValueError("Secret data cannot be empty.") secret_dict = parse_kv_string_to_dict(secret_data_str) if not secret_dict: raise ValueError("No valid key-value pairs found in secret data.") # 2. Serialize data to JSON bytes json_bytes = json.dumps(secret_dict).encode('utf-8') logger.info(f"Serialized {len(json_bytes)} bytes of data.") # 3. Encrypt the data encrypted_payload = encrypt_data_hybrid(json_bytes, public_key_pem) logger.info(f"Encrypted payload created with size: {len(encrypted_payload)} bytes.") # 4. Create a carrier image and embed the data carrier_image = create_carrier_image() stego_image = embed_data_in_image(carrier_image, encrypted_payload) logger.info("Data successfully embedded into carrier image.") return stego_image