File size: 4,362 Bytes
e160fa0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
af348d7
4210680
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import io
import json
import os
import struct
import logging

from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from PIL import Image, ImageDraw, ImageFont
import numpy as np

# --- Configure Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- Constants ---
AES_KEY_SIZE_CRYPTO = 32
AES_GCM_NONCE_SIZE_CRYPTO = 12
LENGTH_HEADER_SIZE = 4

def parse_kv_string_to_dict(kv_str: str) -> dict:
    """Parses a string of key:value or key=value pairs into a dictionary."""
    data_dict = {}
    for line in kv_str.splitlines():
        line = line.strip()
        if not line or line.startswith('#'):
            continue
        if '=' in line:
            key, value = line.split('=', 1)
        elif ':' in line:
            key, value = line.split(':', 1)
        else:
            raise ValueError(f"Invalid format on line: '{line}'. Use 'key: value' or 'key = value'.")
        data_dict[key.strip()] = value.strip().strip("'\"")
    return data_dict

def encrypt_data_hybrid(data: bytes, public_key_pem: str) -> bytes:
    """Encrypts data using RSA-OAEP + AES-GCM hybrid encryption."""
    public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
    aes_key = os.urandom(AES_KEY_SIZE_CRYPTO)
    nonce = os.urandom(AES_GCM_NONCE_SIZE_CRYPTO)
    
    ciphertext_with_tag = AESGCM(aes_key).encrypt(nonce, data, None)
    
    rsa_encrypted_aes_key = public_key.encrypt(
        aes_key,
        padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
    )
    
    encrypted_aes_key_len_bytes = struct.pack('>I', len(rsa_encrypted_aes_key))
    return encrypted_aes_key_len_bytes + rsa_encrypted_aes_key + nonce + ciphertext_with_tag

def embed_data_in_image(img_obj: Image.Image, data: bytes) -> Image.Image:
    """Embeds data into the LSB of an image."""
    img_rgb = img_obj.convert("RGB")
    pixel_data = np.array(img_rgb).ravel()
    
    data_length_header = struct.pack('>I', len(data))
    binary_payload = ''.join(format(byte, '08b') for byte in data_length_header + data)
    
    if len(binary_payload) > pixel_data.size:
        raise ValueError(f"Data too large for image capacity. Needs {len(binary_payload)} bits, image has {pixel_data.size}.")
        
    for i in range(len(binary_payload)):
        pixel_data[i] = (pixel_data[i] & 0xFE) | int(binary_payload[i])
        
    stego_pixels = pixel_data.reshape(img_rgb.size[1], img_rgb.size[0], 3)
    return Image.fromarray(stego_pixels, 'RGB')

def create_carrier_image(width=800, height=600) -> Image.Image:
    """Generates a simple carrier image."""
    img = Image.new('RGB', (width, height), color = (73, 109, 137))
    d = ImageDraw.Draw(img)
    try:
        font = ImageFont.truetype("DejaVuSans.ttf", 40)
    except IOError:
        font = ImageFont.load_default()
    d.text((width/2, height/2), "KeyLock Encrypted Data", fill=(255,255,0), font=font, anchor="ms")
    return img

def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image:
    """
    High-level orchestrator function to create the final encrypted image.
    """
    logger.info("Starting image creation process...")
    
    # 1. Parse and validate secret data
    if not secret_data_str.strip():
        raise ValueError("Secret data cannot be empty.")
    secret_dict = parse_kv_string_to_dict(secret_data_str)
    if not secret_dict:
        raise ValueError("No valid key-value pairs found in secret data.")
    
    # 2. Serialize data to JSON bytes
    json_bytes = json.dumps(secret_dict).encode('utf-8')
    logger.info(f"Serialized {len(json_bytes)} bytes of data.")

    # 3. Encrypt the data
    encrypted_payload = encrypt_data_hybrid(json_bytes, public_key_pem)
    logger.info(f"Encrypted payload created with size: {len(encrypted_payload)} bytes.")

    # 4. Create a carrier image and embed the data
    carrier_image = create_carrier_image()
    stego_image = embed_data_in_image(carrier_image, encrypted_payload)
    logger.info("Data successfully embedded into carrier image.")
    
    return stego_image