broadfield-dev's picture
Update core.py
4210680 verified
import io
import json
import os
import struct
import logging
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from PIL import Image, ImageDraw, ImageFont
import numpy as np
# --- Configure Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Constants ---
AES_KEY_SIZE_CRYPTO = 32
AES_GCM_NONCE_SIZE_CRYPTO = 12
LENGTH_HEADER_SIZE = 4
def parse_kv_string_to_dict(kv_str: str) -> dict:
"""Parses a string of key:value or key=value pairs into a dictionary."""
data_dict = {}
for line in kv_str.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
elif ':' in line:
key, value = line.split(':', 1)
else:
raise ValueError(f"Invalid format on line: '{line}'. Use 'key: value' or 'key = value'.")
data_dict[key.strip()] = value.strip().strip("'\"")
return data_dict
def encrypt_data_hybrid(data: bytes, public_key_pem: str) -> bytes:
"""Encrypts data using RSA-OAEP + AES-GCM hybrid encryption."""
public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
aes_key = os.urandom(AES_KEY_SIZE_CRYPTO)
nonce = os.urandom(AES_GCM_NONCE_SIZE_CRYPTO)
ciphertext_with_tag = AESGCM(aes_key).encrypt(nonce, data, None)
rsa_encrypted_aes_key = public_key.encrypt(
aes_key,
padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
)
encrypted_aes_key_len_bytes = struct.pack('>I', len(rsa_encrypted_aes_key))
return encrypted_aes_key_len_bytes + rsa_encrypted_aes_key + nonce + ciphertext_with_tag
def embed_data_in_image(img_obj: Image.Image, data: bytes) -> Image.Image:
"""Embeds data into the LSB of an image."""
img_rgb = img_obj.convert("RGB")
pixel_data = np.array(img_rgb).ravel()
data_length_header = struct.pack('>I', len(data))
binary_payload = ''.join(format(byte, '08b') for byte in data_length_header + data)
if len(binary_payload) > pixel_data.size:
raise ValueError(f"Data too large for image capacity. Needs {len(binary_payload)} bits, image has {pixel_data.size}.")
for i in range(len(binary_payload)):
pixel_data[i] = (pixel_data[i] & 0xFE) | int(binary_payload[i])
stego_pixels = pixel_data.reshape(img_rgb.size[1], img_rgb.size[0], 3)
return Image.fromarray(stego_pixels, 'RGB')
def create_carrier_image(width=800, height=600) -> Image.Image:
"""Generates a simple carrier image."""
img = Image.new('RGB', (width, height), color = (73, 109, 137))
d = ImageDraw.Draw(img)
try:
font = ImageFont.truetype("DejaVuSans.ttf", 40)
except IOError:
font = ImageFont.load_default()
d.text((width/2, height/2), "KeyLock Encrypted Data", fill=(255,255,0), font=font, anchor="ms")
return img
def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image:
"""
High-level orchestrator function to create the final encrypted image.
"""
logger.info("Starting image creation process...")
# 1. Parse and validate secret data
if not secret_data_str.strip():
raise ValueError("Secret data cannot be empty.")
secret_dict = parse_kv_string_to_dict(secret_data_str)
if not secret_dict:
raise ValueError("No valid key-value pairs found in secret data.")
# 2. Serialize data to JSON bytes
json_bytes = json.dumps(secret_dict).encode('utf-8')
logger.info(f"Serialized {len(json_bytes)} bytes of data.")
# 3. Encrypt the data
encrypted_payload = encrypt_data_hybrid(json_bytes, public_key_pem)
logger.info(f"Encrypted payload created with size: {len(encrypted_payload)} bytes.")
# 4. Create a carrier image and embed the data
carrier_image = create_carrier_image()
stego_image = embed_data_in_image(carrier_image, encrypted_payload)
logger.info("Data successfully embedded into carrier image.")
return stego_image