KeyLock-RSA-BU / core.py
broadfield-dev's picture
Update core.py
c0c053d verified
# keylock/core.py
import io
import json
import os
import struct
import logging
import traceback
import base64
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.exceptions import InvalidTag, InvalidSignature, InvalidKey
from PIL import Image, ImageDraw, ImageFont
import numpy as np
logger = logging.getLogger(__name__)
# Configure logger if not already configured
if not logger.hasHandlers():
handler = logging.StreamHandler()
# Using the format from the original code for consistency
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(module)s - %(lineno)d - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG) # Keep DEBUG level for now to aid troubleshooting
# Constants from the original code (kept for clarity, though some are specific to older crypto)
SALT_SIZE = 16 # Not used in hybrid AESGCM
NONCE_SIZE = 12 # Matches AES_GCM_NONCE_SIZE_CRYPTO
TAG_SIZE = 16 # Matches AES_GCM_TAG_SIZE_CRYPTO
PBKDF2_ITERATIONS = 390_000 # Not used in hybrid RSA-AES
LENGTH_HEADER_SIZE = 4 # The 4-byte header added by the LSB embedder, indicates size of crypto payload *following* this header
# Constants for the NEW hybrid encryption payload structure (what gets embedded AFTER the LSB header)
# Crypto Payload Structure:
# [Encrypted AES Key Length (4 bytes)] [Encrypted AES Key (RSA_Key_Size_Bytes)] [AES Nonce (12 bytes)] [AES GCM Ciphertext] [AES GCM Tag (16 bytes)]
ENCRYPTED_AES_KEY_LEN_SIZE_CRYPTO = 4 # Bytes (Size of the length field *within* the crypto payload)
AES_GCM_NONCE_SIZE_CRYPTO = 12 # Bytes (Size of the Nonce *within* the crypto payload)
AES_GCM_TAG_SIZE_CRYPTO = 16 # Bytes (Size of the Tag *within* the crypto payload)
AES_KEY_SIZE_CRYPTO = 32 # Bytes (for AES-256, the size of the key that gets RSA encrypted)
RSA_KEY_SIZE_DEFAULT = 2048 # Bits (Default size for RSA key generation)
PREFERRED_FONTS = ["Verdana", "Arial", "DejaVu Sans", "Calibri", "Helvetica", "Roboto-Regular", "sans-serif"]
MAX_KEYS_TO_DISPLAY_OVERLAY = 12
def _get_font(preferred_fonts, base_size):
"""
Attempts to load a TrueType font from a list of preferred fonts.
Falls back to PIL's default font if preferred fonts are not found.
Ensures the font is loaded at the specified base_size.
"""
fp = None
# Ensure base_size is a positive integer for ImageFont.truetype
safe_base_size = int(base_size)
if safe_base_size <= 0:
safe_base_size = 10 # Default to a small positive size
# First, try to find a usable font path or name
for n in preferred_fonts:
try:
# Test loading with a small size to find the file path without large memory allocation
# The actual font object will be created with safe_base_size later.
ImageFont.truetype(n.lower()+".ttf", 10) # Check .ttf first
fp = n.lower() + ".ttf"
break
except IOError:
try:
ImageFont.truetype(n, 10) # Check bare name (might be system font)
fp = n
break
except IOError:
continue # Try next font
# If a font path was found, attempt to load it with the desired size
if fp:
try:
logger.debug(f"Found font path/name '{fp}'. Attempting to load with size {safe_base_size}.")
return ImageFont.truetype(fp, safe_base_size)
except IOError as e:
logger.warning(f"Font '{fp}' load failed with size {safe_base_size}: {e}. Attempting default font.")
except Exception as e:
logger.warning(f"Unexpected error loading font '{fp}' with size {safe_base_size}: {e}. Attempting default font.", exc_info=True)
# Fallback to PIL's default font
logger.warning("Could not load preferred or system font. Using PIL's load_default.")
try:
# Try to load default with size if Pillow version supports it (Pillow 10+)
try:
default_font = ImageFont.load_default(size=safe_base_size)
logger.debug(f"Loaded default font with size {safe_base_size}.")
return default_font
except TypeError: # Older Pillow doesn't support 'size' argument for load_default
logger.warning("Pillow version does not support sizing load_default. Using unsized default.")
default_font = ImageFont.load_default()
# Note: Unsized default font is often a small bitmap font.
return default_font
except Exception as e:
logger.error(f"Failed to load PIL's default font: {e}", exc_info=True)
# As a last resort, raise an error as text drawing is likely impossible
raise IOError("Failed to load any font for image overlay.")
def set_pil_image_format_to_png(image:Image.Image)->Image.Image:
"""Ensures image format is set to PNG by saving/reloading. PNG preserves LSBs."""
buf=io.BytesIO()
try:
# Save the image to an in-memory buffer as PNG
image.save(buf, format='PNG')
buf.seek(0) # Rewind the buffer to the beginning
# Open the image from the buffer
reloaded = Image.open(buf)
# Explicitly set format attribute, although Image.open often sets it
reloaded.format = "PNG"
logger.debug(f"Converted image to PNG format. Original mode: {image.mode}, New mode: {reloaded.mode}")
return reloaded
except Exception as e:
logger.error(f"Failed to convert image to PNG format: {e}", exc_info=True)
# Re-raise the exception as PNG conversion is crucial for LSB preservation
raise RuntimeError(f"Failed to process image for PNG conversion: {e}")
# --- RSA Key Management ---
def generate_rsa_key_pair(key_size_bits: int = RSA_KEY_SIZE_DEFAULT) -> tuple[bytes, bytes]:
"""Generates a new RSA private and public key pair in PEM format."""
logger.debug(f"Generating RSA key pair with size: {key_size_bits} bits.")
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size_bits
)
public_key = private_key.public_key()
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8, # PKCS8 is recommended
encryption_algorithm=serialization.NoEncryption() # No password for simplicity in this app's context; user must secure file/paste.
)
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
logger.debug("RSA key pair generated successfully.")
return private_pem, public_pem
def _clean_pem_bytes(pem_data: bytes | str) -> bytes:
"""Strips leading/trailing whitespace and converts string to bytes if needed."""
if isinstance(pem_data, str):
# Remove common surrounding strings like backticks if present (heuristic)
pem_data = pem_data.strip().strip('```')
pem_data = pem_data.encode('utf-8')
else:
pem_data = pem_data.strip()
return pem_data
def load_rsa_private_key_from_pem(pem_bytes: bytes | str) -> rsa.RSAPrivateKey:
"""Loads an RSA private key from PEM bytes, stripping surrounding data/whitespace."""
try:
# Clean input bytes/string
cleaned_pem = _clean_pem_bytes(pem_bytes)
logger.debug(f"Cleaned private key PEM data (first 50 bytes): {cleaned_pem[:50]}...")
# Attempt to load the key
try:
private_key = serialization.load_pem_private_key(
cleaned_pem,
password=None # Assumes key file is not password-protected
)
except (TypeError, ValueError, InvalidKey) as e:
# Catch cryptography errors specifically related to format or encryption
err_str = str(e).lower()
if "password" in err_str or "decryption failed" in err_str:
logger.error(f"Failed to load RSA private key: It appears password-protected. {e}")
raise ValueError("Private key appears to be password-protected. KeyLock requires an unencrypted key.")
if "malformed" in err_str or "invalid pem" in err_str or "unsupported key type" in err_str:
logger.error(f"Failed to parse RSA private key (parsing error): {e}", exc_info=True)
raise ValueError(f"Failed to parse RSA private key. Ensure it is a valid, unencrypted PKCS8 or traditional PEM format key: {e}")
logger.error(f"Failed to load RSA private key (cryptography error): {e}", exc_info=True)
raise ValueError(f"Failed to load RSA private key due to a cryptography error: {e}")
except Exception as e:
logger.error(f"Unexpected error loading RSA private key: {e}", exc_info=True)
raise RuntimeError(f"An unexpected error occurred loading the private key: {e}")
if not isinstance(private_key, rsa.RSAPrivateKey):
logger.error("Loaded object is not an RSA private key instance.")
raise TypeError("Loaded key is not a valid RSA private key.")
logger.debug(f"RSA private key loaded successfully (size: {private_key.key_size} bits).")
return private_key
except (ValueError, TypeError, RuntimeError, InvalidKey) as e:
# Re-raise exceptions with specific messages if they originated from this function
if isinstance(e, (ValueError, TypeError, RuntimeError)) and (
"password" in str(e).lower() or
"parse" in str(e).lower() or
"private key appears" in str(e) or
"cryptography error" in str(e) or
"Loaded key is not a valid RSA private key" in str(e)
):
raise e
# Catch anything else and wrap in a generic error
logger.error(f"Final exception during private key load: {e}", exc_info=True)
raise ValueError(f"Failed to load RSA private key: {e}")
def load_rsa_public_key_from_pem(pem_bytes: bytes | str) -> rsa.RSAPublicKey:
"""Loads an RSA public key from PEM bytes, stripping surrounding data/whitespace."""
try:
# Clean input bytes/string
cleaned_pem = _clean_pem_bytes(pem_bytes)
logger.debug(f"Cleaned public key PEM data (first 50 bytes): {cleaned_pem[:50]}...")
# Attempt to load the key
try:
public_key = serialization.load_pem_public_key(cleaned_pem)
except (ValueError, InvalidKey) as e:
# Catch cryptography errors specifically related to format
err_str = str(e).lower()
if "malformed" in err_str or "invalid pem" in err_str or "unsupported key type" in err_str:
logger.error(f"Failed to parse RSA public key (parsing error): {e}", exc_info=True)
raise ValueError(f"Failed to parse RSA public key. Ensure it is a valid PEM format key: {e}")
logger.error(f"Failed to load RSA public key (cryptography error): {e}", exc_info=True)
raise ValueError(f"Failed to load RSA public key due to a cryptography error: {e}")
except Exception as e:
logger.error(f"Unexpected error loading RSA public key: {e}", exc_info=True)
raise RuntimeError(f"An unexpected error occurred loading the public key: {e}")
if not isinstance(public_key, rsa.RSAPublicKey):
logger.error("Loaded object is not an RSA public key instance.")
raise TypeError("Loaded key is not a valid RSA public key.")
logger.debug(f"RSA public key loaded successfully (size: {public_key.key_size} bits).")
return public_key
except (ValueError, TypeError, RuntimeError, InvalidKey) as e:
# Re-raise exceptions with specific messages if they originated from this function
if isinstance(e, (ValueError, TypeError, RuntimeError)) and (
"parse" in str(e).lower() or
"cryptography error" in str(e) or
"Loaded key is not a valid RSA public key" in str(e)
):
raise e
# Catch anything else and wrap in a generic error
logger.error(f"Final exception during public key load: {e}", exc_info=True)
raise ValueError(f"Failed to load RSA public key: {e}")
# --- Hybrid Encryption/Decryption ---
def encrypt_data_hybrid(data: bytes, recipient_public_key_pem: bytes | str) -> bytes:
"""
Encrypts data using AES-GCM with a random key, then encrypts the AES key
using the recipient's RSA public key. Returns the combined crypto payload:
[Encrypted AES Key Length (4 bytes)] [Encrypted AES Key] [AES Nonce (12 bytes)] [AES GCM Ciphertext] [AES GCM Tag (16 bytes)]
"""
logger.debug(f"Starting hybrid encryption of {len(data)} bytes.")
try:
# Ensure public key is loaded correctly, handling potential string input and cleaning
public_key = load_rsa_public_key_from_pem(recipient_public_key_pem)
logger.debug(f"Using RSA public key size: {public_key.key_size} bits.")
# 1. Generate a random AES key (AES-256 requires 32 bytes)
aes_key = os.urandom(AES_KEY_SIZE_CRYPTO)
logger.debug(f"Generated random AES key ({AES_KEY_SIZE_CRYPTO} bytes).")
# 2. Encrypt data using AES-GCM with the random key
aesgcm = AESGCM(aes_key)
# Generate a unique nonce for this encryption operation
nonce = os.urandom(AES_GCM_NONCE_SIZE_CRYPTO)
logger.debug(f"Generated AES-GCM nonce ({AES_GCM_NONCE_SIZE_CRYPTO} bytes).")
# AESGCM.encrypt(nonce, plaintext, associated_data=None) returns ciphertext + tag
ciphertext_with_tag = aesgcm.encrypt(nonce, data, None) # No AAD used here
logger.debug(f"AES encrypted data (ciphertext+tag) size: {len(ciphertext_with_tag)} bytes.")
# 3. Encrypt the random AES key using the recipient's RSA public key
try:
rsa_encrypted_aes_key = public_key.encrypt(
aes_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None # Optional: Could add a label here if needed, but requires recipient knowing it
)
)
logger.debug(f"RSA encrypted AES key size: {len(rsa_encrypted_aes_key)} bytes.")
except Exception as e:
logger.error(f"RSA encryption of AES key failed: {e}", exc_info=True)
# Catch specific RSA errors during encryption (less common than decryption failures)
if "data too large" in str(e).lower():
# OAEP padding adds overhead. Key size must be large enough for AES_KEY_SIZE_CRYPTO + padding.
# RSA key size in bits must be > (AES_KEY_SIZE_CRYPTO + 2*SHA256_digest_size + 2) * 8
# For AES-256 (32B) and SHA256 (32B digest), this is > (32 + 2*32 + 2) * 8 = (32 + 64 + 2) * 8 = 98 * 8 = 784 bits.
# 2048 bits (256 bytes) is ample. This error is more likely if the RSA key size is somehow misreported or corrupted.
raise ValueError(f"RSA encryption of symmetric key failed: The symmetric key size ({AES_KEY_SIZE_CRYPTO}B) might be too large for the RSA key size ({public_key.key_size} bits) with OAEP padding. Use a larger RSA key. Original error: {e}")
raise ValueError(f"RSA encryption of the symmetric key failed: {e}. Check public key validity or RSA key size.")
# 4. Combine the crypto payload structure for embedding
# Structure: [Encrypted AES Key Length (4 bytes)] [Encrypted AES Key] [AES Nonce (12 bytes)] [AES GCM Ciphertext + AES GCM Tag (16 bytes)]
encrypted_aes_key_len_bytes = struct.pack('>I', len(rsa_encrypted_aes_key))
logger.debug(f"Packed encrypted AES key length ({len(rsa_encrypted_aes_key)} bytes). Header bytes size: {len(encrypted_aes_key_len_bytes)}.")
# The combined payload includes:
# 1. Length of the RSA encrypted key (4 bytes)
# 2. The RSA encrypted key bytes
# 3. The AES Nonce (12 bytes) -> Use the 'nonce' variable directly
# 4. The AES Ciphertext + Tag bytes -> Use 'ciphertext_with_tag'
combined_crypto_payload = encrypted_aes_key_len_bytes + rsa_encrypted_aes_key + nonce + ciphertext_with_tag
logger.info(f"Hybrid encryption successful. Total combined crypto payload size: {len(combined_crypto_payload)} bytes.")
return combined_crypto_payload
except (ValueError, RuntimeError, InvalidKey) as e:
logger.error(f"Hybrid encryption error (caught specific exception): {e}", exc_info=True)
raise e # Re-raise value/runtime/invalidkey errors from sub-functions
except Exception as e:
# Catch any other unexpected errors
logger.error(f"Unexpected hybrid encryption error: {e}", exc_info=True)
raise RuntimeError(f"An unexpected error occurred during encryption: {e}")
def decrypt_data_hybrid(crypto_payload_from_lsb: bytes, recipient_private_key_pem: bytes | str) -> bytes:
"""
Decrypts the crypto payload (extracted from LSB, *without* its header)
using the recipient's RSA private key.
"""
logger.debug(f"Starting hybrid decryption. Received crypto_payload_from_lsb size: {len(crypto_payload_from_lsb)} bytes.")
# The input 'crypto_payload_from_lsb' is expected to be the raw bytes extracted
# by the LSB layer *after* its 4-byte header.
# This payload should have the structure:
# [Encrypted AES Key Length (4 bytes)] [Encrypted AES Key] [AES Nonce (12 bytes)] [AES GCM Ciphertext] [AES GCM Tag (16 bytes)]
try:
# Ensure private key is loaded correctly, handling potential string input and cleaning
private_key = load_rsa_private_key_from_pem(recipient_private_key_pem)
logger.debug(f"Using RSA private key with size: {private_key.key_size} bits for decryption.")
# 1. Parse the combined crypto payload
# Minimum size check needs to be based on the *crypto* payload structure now
# It needs 4 bytes for encrypted_aes_key_len + min RSA output size + AES nonce + AES tag
# The encrypted key size will be exactly the RSA key size in bytes (e.g., 256 bytes for 2048 bits).
rsa_key_size_bytes = private_key.key_size // 8
# The minimum payload must contain the len field, the encrypted key, the nonce, and the tag.
min_expected_crypto_payload_size = ENCRYPTED_AES_KEY_LEN_SIZE_CRYPTO + rsa_key_size_bytes + AES_GCM_NONCE_SIZE_CRYPTO + AES_GCM_TAG_SIZE_CRYPTO
logger.debug(f"Expected min crypto payload size based on private key ({rsa_key_size_bytes}B RSA key): {min_expected_crypto_payload_size} bytes.")
if len(crypto_payload_from_lsb) < min_expected_crypto_payload_size:
logger.error(f"Crypto payload too short. Size: {len(crypto_payload_from_lsb)}, Expected min: {min_expected_crypto_payload_size}")
raise ValueError(f"Crypto payload extracted from image is too short ({len(crypto_payload_from_lsb)} bytes). Expected at least {min_expected_crypto_payload_size} bytes. Image likely corrupted or does not contain a valid RSA-encrypted payload.")
# Extract the length of the encrypted AES key (first 4 bytes of the crypto payload)
encrypted_aes_key_len_bytes = crypto_payload_from_lsb[:ENCRYPTED_AES_KEY_LEN_SIZE_CRYPTO]
try:
encrypted_aes_key_len = struct.unpack('>I', encrypted_aes_key_len_bytes)[0]
except struct.error as e:
logger.error(f"Failed to unpack encrypted AES key length header: {e}", exc_info=True)
raise ValueError(f"Failed to read encrypted AES key length header from crypto payload: {e}. Payload corrupted.")
logger.debug(f"Parsed encrypted AES key length: {encrypted_aes_key_len} bytes.")
# Validate the indicated encrypted_aes_key_len
# It should be equal to the RSA key size in bytes used during encryption
expected_rsa_encrypted_aes_key_size = private_key.key_size // 8
if encrypted_aes_key_len != expected_rsa_encrypted_aes_key_size:
logger.error(f"Parsed encrypted AES key length ({encrypted_aes_key_len}B) does not match private key size in bytes ({expected_rsa_encrypted_aes_key_size}B).")
# This check is crucial and can catch key mismatches or corruption early
raise ValueError(f"Encrypted AES key length mismatch ({encrypted_aes_key_len} bytes). Expected {expected_rsa_encrypted_aes_key_size} bytes based on private key size ({private_key.key_size} bits). Indicates incorrect key pair or data corruption.")
# Calculate offsets for parsing the rest of the payload
rsa_encrypted_aes_key_start = ENCRYPTED_AES_KEY_LEN_SIZE_CRYPTO # 4
rsa_encrypted_aes_key_end = rsa_encrypted_aes_key_start + encrypted_aes_key_len
aes_nonce_start = rsa_encrypted_aes_key_end
aes_nonce_end = aes_nonce_start + AES_GCM_NONCE_SIZE_CRYPTO # 12 bytes after encrypted key
aes_ciphertext_with_tag_start = aes_nonce_end
logger.debug(f"Calculated offsets for parsing crypto payload: RSA Encrypted Key starts at {rsa_encrypted_aes_key_start}, ends at {rsa_encrypted_aes_key_end}. AES Nonce starts at {aes_nonce_start}, ends at {aes_nonce_end}. AES Ciphertext+Tag starts at {aes_ciphertext_with_tag_start}.")
# Check if the payload is long enough for the declared encrypted key length + nonce + tag
# Note: Ciphertext can be 0 length, but tag is always 16 bytes for AES-GCM
required_min_payload_after_encrypted_key = AES_GCM_NONCE_SIZE_CRYPTO + AES_GCM_TAG_SIZE_CRYPTO
if len(crypto_payload_from_lsb) < rsa_encrypted_aes_key_end + required_min_payload_after_encrypted_key:
logger.error(f"Crypto payload too short based on parsed lengths and minimum requirements. Size: {len(crypto_payload_from_lsb)}, Required min based on parsed length: {rsa_encrypted_aes_key_end + required_min_payload_after_encrypted_key} ({ENCRYPTED_AES_KEY_LEN_SIZE_CRYPTO}B len + {encrypted_aes_key_len}B RSA key + 12B Nonce + 16B Tag).")
raise ValueError("Crypto payload extracted from image is truncated or corrupted based on parsed lengths.")
# Extract the RSA-encrypted AES key
rsa_encrypted_aes_key = crypto_payload_from_lsb[rsa_encrypted_aes_key_start : rsa_encrypted_aes_key_end]
logger.debug(f"Extracted RSA encrypted AES key. Size: {len(rsa_encrypted_aes_key)} bytes.")
# Extract the AES Nonce
aes_nonce = crypto_payload_from_lsb[aes_nonce_start : aes_nonce_end]
logger.debug(f"Extracted AES Nonce. Size: {len(aes_nonce)} bytes.")
# Explicit check for nonce size (Added for robustness)
if len(aes_nonce) != AES_GCM_NONCE_SIZE_CRYPTO:
logger.error(f"Extracted AES nonce has incorrect size: {len(aes_nonce)} bytes. Expected: {AES_GCM_NONCE_SIZE_CRYPTO} bytes.")
raise ValueError(f"Extracted AES nonce has incorrect size ({len(aes_nonce)} bytes). Expected {AES_GCM_NONCE_SIZE_CRYPTO} bytes. Indicates payload corruption or incorrect parsing.")
# Extract the AES ciphertext and tag
aes_ciphertext_with_tag = crypto_payload_from_lsb[aes_ciphertext_with_tag_start:]
logger.debug(f"Extracted AES ciphertext+tag. Size: {len(aes_ciphertext_with_tag)} bytes.")
# Check minimum size for tag (ciphertext can be empty, but tag is always 16B)
if len(aes_ciphertext_with_tag) < AES_GCM_TAG_SIZE_CRYPTO:
logger.error(f"AES ciphertext+tag part too short ({len(aes_ciphertext_with_tag)}B). Expected min {AES_GCM_TAG_SIZE_CRYPTO}B (tag).")
raise ValueError("AES ciphertext+tag part is too short (missing tag).")
# 2. Decrypt the encrypted AES key using the recipient's RSA private key
try:
recovered_aes_key = private_key.decrypt(
rsa_encrypted_aes_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
logger.debug(f"RSA decryption of AES key successful. Recovered key size: {len(recovered_aes_key)} bytes.")
except Exception as e:
logger.error(f"RSA decryption of AES key failed: {e}", exc_info=True)
# Specific error for RSA decryption failure
# Catch common RSA decryption errors and map to a helpful message
err_msg = str(e).lower()
if "decryption failed" in err_msg or "data too large" in err_msg or "oaep" in err_msg or "private key" in err_msg or "signature" in err_msg:
raise ValueError(f"RSA decryption of the symmetric key failed. This typically means the incorrect private key was used, or the embedded encrypted key data is corrupted. Original error: {e}")
else:
raise RuntimeError(f"An unexpected error occurred during RSA decryption: {e}")
# Check the length of the recovered key - it must be AES_KEY_SIZE_CRYPTO
if len(recovered_aes_key) != AES_KEY_SIZE_CRYPTO:
logger.error(f"RSA decryption resulted in key of unexpected length ({len(recovered_aes_key)}B). Expected {AES_KEY_SIZE_CRYPTO}B.")
# This is a strong indicator of a key pair mismatch or severe corruption
raise ValueError(f"RSA decryption produced an AES key of unexpected length ({len(recovered_aes_key)} bytes). This strongly suggests the private key used does not match the public key used for encryption, or the data is corrupted.")
# 3. Decrypt the data using the recovered AES key and nonce/tag
logger.debug("Attempting AES-GCM decryption of data...")
aesgcm = AESGCM(recovered_aes_key)
# Note: cryptography's AESGCM.decrypt expects nonce, ciphertext+tag, aad
try:
decrypted_data = aesgcm.decrypt(aes_nonce, aes_ciphertext_with_tag, None) # No AAD used here
except InvalidTag:
# This is the primary way AES-GCM indicates authentication/decryption failure
logger.error("AES-GCM decryption failed: Invalid tag.", exc_info=True)
raise ValueError("Decryption failed (InvalidTag): The private key used is likely incorrect or the image data is corrupted.")
except Exception as e:
logger.error(f"Unexpected error during AES-GCM decryption: {e}", exc_info=True)
raise RuntimeError(f"An unexpected error occurred during AES-GCM decryption: {e}")
logger.info("Hybrid decryption process completed successfully.")
return decrypted_data
except (ValueError, InvalidTag, InvalidKey, TypeError, RuntimeError) as e:
# Catch specific exceptions and re-raise with more context
logger.error(f"Hybrid decryption error (caught specific exception): {e}", exc_info=True)
# Specific error messages should already be set if they originated from sub-functions we wrapped
raise e # Re-raise the exception as is
except Exception as e:
# Catch any other unexpected errors
logger.error(f"Unexpected hybrid decryption error: {e}", exc_info=True)
raise RuntimeError(f"An unexpected error occurred during decryption: {e}")
# --- LSB Steganography ---
def _d2b(d:bytes)->str:
"""Convert bytes to a binary string."""
return ''.join(format(b,'08b') for b in d)
def _b2B(b:str)->bytes:
"""Convert a binary string to bytes."""
# The binary string should ideally be a multiple of 8 bits.
# If not, it indicates corruption or an issue with extraction.
# We will NOT pad here during conversion, as it would hide truncation errors.
if len(b)%8!=0:
logger.error(f"Binary string length ({len(b)}) is not a multiple of 8 during _b2B conversion.")
# Raise an error instead of padding, as padding could mask corruption
raise ValueError(f"Binary string length ({len(b)}) is not a multiple of 8. Data corruption suspected.")
try:
return bytes(int(b[i:i+8],2) for i in range(0,len(b),8))
except ValueError as e:
logger.error(f"Failed to convert binary string to bytes: {e}. Input binary string length: {len(b)}")
# Provide part of the binary string for debugging
log_b = b[:100] + ('...' if len(b)>100 else '')
logger.debug(f"Input binary string (first 100 chars): {log_b}")
raise ValueError(f"Failed to convert binary string to bytes: {e}. Data might be corrupted.")
def embed_data_in_image(img_obj:Image.Image,data:bytes)->Image.Image:
"""
Embeds raw data bytes (`data`) into the LSB of the image's RGB channels,
prefixed with a 4-byte length header. The `data` should be the output
of the crypto layer (e.g., the combined_crypto_payload from encrypt_data_hybrid).
"""
logger.debug(f"LSB Embedding: Received {len(data)} bytes of data from crypto layer (the crypto payload).")
# Create a copy and ensure image is RGB
# Using 'RGB' mode ensures 3 channels. If input is RGBA, A channel is dropped.
# This is crucial for predictable LSB embedding across different input image types.
try:
img = img_obj.convert("RGB")
except Exception as e:
logger.error(f"Failed to convert image to RGB before LSB embedding: {e}", exc_info=True)
raise RuntimeError(f"Image processing failed: Could not convert image to RGB for embedding: {e}")
try:
px = np.array(img)
fpx = px.ravel() # Flatten pixel array (R, G, B, R, G, B, ...)
except Exception as e:
logger.error(f"Failed to convert image to numpy array: {e}", exc_info=True)
raise RuntimeError(f"Image processing failed: Could not convert image to numpy array for embedding: {e}")
# Convert the data length to a 4-byte binary header
# This header indicates the length of the `data` block being embedded.
data_length = len(data)
# Check if data_length exceeds max value of an unsigned 32-bit integer (2^32 - 1)
max_uint32 = (2**32) - 1
if data_length > max_uint32:
# This case is highly unlikely given typical image capacity, but worth checking defensively
logger.error(f"LSB Embedding: Data length ({data_length}) exceeds maximum value representable by 4-byte unsigned integer header ({max_uint32}).")
raise ValueError(f"Data length ({data_length} bytes) exceeds maximum size embeddable with the 4-byte header.")
data_length_header = struct.pack('>I', data_length) # 4 bytes = 32 bits
data_length_binary = _d2b(data_length_header) # 32 bits
logger.debug(f"LSB Embedding: Data length: {data_length} bytes. Header bytes size: {len(data_length_header)}. Header binary size: {len(data_length_binary)} bits.")
# The full binary payload to embed is the header bits + data bits
data_binary = _d2b(data)
full_binary_payload = data_length_binary + data_binary
total_payload_bits = len(full_binary_payload)
total_payload_bytes = total_payload_bits // 8 # Should be LENGTH_HEADER_SIZE + len(data)
logger.debug(f"LSB Embedding: Total binary payload size for embedding: {total_payload_bits} bits / {total_payload_bytes} bytes.")
# Check capacity (1 bit per pixel channel in RGB)
# Total available LSBs = total number of elements in the flattened pixel array (width * height * 3)
total_capacity_bits = len(fpx)
if total_payload_bits > total_capacity_bits:
logger.error(f"LSB Embedding: Data too large for image capacity. Needed: {total_payload_bits} bits, Available: {total_capacity_bits} bits ({total_capacity_bits // 8} bytes). Image WxH*3 LSBs.")
raise ValueError(f"Data too large for image capacity: {total_payload_bits} bits ({total_payload_bytes} bytes) needed, {total_capacity_bits} bits available ({total_capacity_bits // 8} bytes - Image WxH*3 LSBs).")
logger.debug(f"LSB Embedding: Embedding {total_payload_bits} bits into {total_capacity_bits} available pixel channels.")
# Embed the full binary payload into the LSBs
try:
for i in range(total_payload_bits):
# Clear the last bit (AND with 0xFE, which is 11111110)
# Set the last bit to the i-th bit of the binary payload (OR with 0 or 1)
# Ensure pixel value is treated as int before bitwise operations
fpx[i] = (int(fpx[i]) & 0xFE) | int(full_binary_payload[i])
except IndexError:
# Should be caught by explicit capacity check, but defensive programming
logger.error("LSB Embedding: Index error during embedding loop. Payload size exceeds image capacity.", exc_info=True)
raise RuntimeError("LSB Embedding failed: Attempted to write beyond image capacity.")
except Exception as e:
logger.error(f"Unexpected error during LSB embedding loop: {e}", exc_info=True)
raise RuntimeError(f"An unexpected error occurred during LSB embedding: {e}")
# Reshape flattened pixel array back to image dimensions and create new Image object
try:
stego_pixels = fpx.reshape(px.shape)
stego_img = Image.fromarray(stego_pixels.astype(np.uint8),'RGB')
except Exception as e:
logger.error(f"Failed to reshape or create image from modified numpy array: {e}", exc_info=True)
raise RuntimeError(f"Image processing failed: Could not create image from stego pixels: {e}")
logger.debug("LSB Embedding: Data embedding complete.")
# Ensure output is PNG format to preserve LSBs
return set_pil_image_format_to_png(stego_img)
# RESTORED TO ORIGINAL EXTRACT LOGIC
def extract_data_from_image(img_obj:Image.Image)->bytes:
"""
Extracts raw data bytes from the LSB of the image's RGB channels,
using the leading 4-byte length header. Returns *only* the data payload
that followed the header.
"""
logger.debug("LSB Extraction: Starting data extraction from image LSB.")
# Ensure image is RGB and convert to numpy array
try:
img = img_obj.convert("RGB")
except Exception as e:
logger.error(f"Failed to convert image to RGB before LSB extraction: {e}", exc_info=True)
raise RuntimeError(f"Image processing failed: Could not convert image to RGB for extraction: {e}")
try:
px = np.array(img)
fpx = px.ravel() # Flatten pixel array (R, G, B, R, G, B, ...)
total_image_pixels = len(fpx) # Total number of LSBs available (W * H * 3)
except Exception as e:
logger.error(f"Failed to convert image to numpy array: {e}", exc_info=True)
raise RuntimeError(f"Image processing failed: Could not convert image to numpy array for extraction: {e}")
logger.debug(f"LSB Extraction: Flattened image pixel array size: {total_image_pixels}.")
# Extract the data length header (first 4 bytes = 32 bits)
header_bits_count = LENGTH_HEADER_SIZE * 8 # 32 bits
if total_image_pixels < header_bits_count:
logger.error(f"LSB Extraction: Image too small for LSB header. Size: {total_image_pixels} LSB bits, Header needs {header_bits_count} bits ({LENGTH_HEADER_SIZE} bytes).")
raise ValueError(f"Image is too small ({total_image_pixels} LSB bits available) to contain the LSB data length header ({LENGTH_HEADER_SIZE} bytes / {header_bits_count} bits).")
logger.debug(f"LSB Extraction: Extracting LSB header ({header_bits_count} bits) from first {header_bits_count} pixel channels LSBs.")
try:
data_length_binary = "".join(str(int(fpx[i]) & 1) for i in range(header_bits_count)) # Ensure int() cast for safety
except IndexError:
# Should be caught by explicit size check, but defensive
logger.error("LSB Extraction: Index error during header extraction loop.", exc_info=True)
raise RuntimeError("LSB Extraction failed: Image size inconsistent during header read.")
except Exception as e:
logger.error(f"Unexpected error during LSB header extraction loop: {e}", exc_info=True)
raise RuntimeError(f"An unexpected error occurred during LSB header extraction: {e}")
try:
# The header stores the *length of the data FOLLOWING the header*
data_length_header_bytes = _b2B(data_length_binary)
if len(data_length_header_bytes) != LENGTH_HEADER_SIZE:
# Should not happen if header_bits_count is correct and _b2B gets 32 bits, but a safety check
logger.error(f"LSB Extraction: Decoded header bytes have incorrect size: {len(data_length_header_bytes)}. Expected {LENGTH_HEADER_SIZE}.")
raise ValueError(f"Decoded LSB header has incorrect byte size ({len(data_length_header_bytes)}). Expected {LENGTH_HEADER_SIZE}. Data corruption or not a KeyLock image.")
data_length = struct.unpack('>I', data_length_header_bytes)[0]
logger.debug(f"LSB Extraction: Parsed data length from LSB header: {data_length} bytes (This is the size of the crypto payload).")
except ValueError as e:
# Re-raise value errors from _b2B or struct.unpack
raise ValueError(f"Failed to decode data length header from image LSB: {e}. Image may not be a valid KeyLock file or is corrupted.")
except Exception as e:
logger.error(f"LSB Extraction: Failed to decode data length header: {e}", exc_info=True)
raise ValueError(f"Failed to decode data length header from image LSB: {e}. Image may not be a valid KeyLock file or is corrupted.")
# Check if the indicated data length is reasonable within the image capacity
# Remaining capacity after the header in bits
remaining_capacity_bits = total_image_pixels - header_bits_count
max_possible_data_bits = remaining_capacity_bits
max_possible_data_bytes = max_possible_data_bits // 8
if data_length > max_possible_data_bytes:
logger.error(f"LSB Extraction: Indicated data length ({data_length}B) exceeds remaining image capacity ({max_possible_data_bytes}B).")
raise ValueError(f"Indicated data length ({data_length} bytes) in LSB header exceeds remaining image capacity ({max_possible_data_bytes} bytes after header). Image likely truncated or corrupted.")
if data_length == 0:
logger.info("LSB Extraction: LSB header indicates zero data length embedded. Returning empty bytes.")
return b"" # Return empty bytes as per original logic for zero length
# Extract the actual data bits based on the length from the header
data_bits_count = data_length * 8
start_offset = header_bits_count # Start extraction *after* the header bits
end_offset = start_offset + data_bits_count # End after the data bits
logger.debug(f"LSB Extraction: Extracting data bits. Start offset: {start_offset} (after {LENGTH_HEADER_SIZE}B header), Data bits: {data_bits_count}, End offset: {end_offset}. Total image LSB bits: {total_image_pixels}.")
if total_image_pixels < end_offset:
logger.error(f"LSB Extraction: Image truncated or corrupted. Cannot extract full data. Image LSB bits: {total_image_pixels}, Required end offset: {end_offset}.")
raise ValueError("Image is truncated or corrupted. Cannot extract full data based on header length.")
try:
data_binary = "".join(str(int(fpx[i]) & 1) for i in range(start_offset, end_offset)) # Ensure int() cast for safety
except IndexError:
# Should be caught by the total_image_pixels < end_offset check, but defensive
logger.error("LSB Extraction: Index error during data extraction loop. Data length inconsistent with image size.", exc_info=True)
raise RuntimeError("LSB Extraction failed: Data length inconsistent with image capacity.")
except Exception as e:
logger.error(f"Unexpected error during LSB data extraction loop: {e}", exc_info=True)
raise RuntimeError(f"An unexpected error occurred during LSB data extraction: {e}")
# Convert binary string back to bytes
try:
extracted_bytes = _b2B(data_binary)
if len(extracted_bytes) != data_length:
# This should only happen if data_bits_count wasn't a multiple of 8, or _b2B failed
logger.error(f"LSB Extraction: Extracted bytes length ({len(extracted_bytes)}) does not match expected data length ({data_length}) from header. Binary string length: {len(data_binary)} bits.")
# This indicates _b2B or bit extraction issue, likely corruption
raise ValueError(f"Extracted bytes length ({len(extracted_bytes)}) does not match expected length ({data_length}) from header. Data corruption.")
logger.debug(f"LSB Extraction: Successfully extracted {len(extracted_bytes)} bytes of data (the crypto payload).")
# Return *only* the extracted data bytes, NOT including the LSB header.
return extracted_bytes
except ValueError as e:
# Re-raise value errors from _b2B or length mismatch check
raise e
except Exception as e:
logger.error(f"LSB Extraction: Failed to convert extracted bits to bytes: {e}", exc_info=True)
raise ValueError(f"Failed to convert extracted bits to bytes: {e}")
# --- Utility Functions ---
def parse_kv_string_to_dict(kv_str:str)->dict:
"""Parses a string of key:value or key=value pairs into a dictionary."""
if not kv_str or not kv_str.strip():
logger.debug("Input KV string is empty or whitespace. Returning empty dict.")
return {}
dd={}
for ln,ol in enumerate(kv_str.splitlines(),1):
l=ol.strip()
if not l or l.startswith('#'): # Skip empty lines or comments
continue
lc=l.split('#',1)[0].strip() # Remove inline comments
if not lc: # Skip line if only contained comment/whitespace after comment removal
continue
# Use partition to handle values containing '=' or ':'
if '=' in lc:
p = lc.partition('=')
elif ':' in lc:
p = lc.partition(':')
else:
# If no separator is found, consider it an invalid format as per original logic
logger.warning(f"L{ln}: Invalid format '{ol}'. Must contain '=' or ':'.")
raise ValueError(f"Line {ln}: Invalid format '{ol}'. Each line must contain '=' or ':'.")
k, sep, v = p # sep will be '=' or ':'
k, v = k.strip(), v.strip()
if not k:
logger.warning(f"L{ln}: Empty key found in '{ol}'.")
raise ValueError(f"Line {ln}: Empty key found in '{ol}'.")
# Remove surrounding quotes if present, *only* if they are a matched pair
if len(v) >= 2 and v[0] == v[-1] and v[0] in ("'",'"'):
v=v[1:-1]
# If a line was "key:", the value will be empty string, which is fine.
dd[k]=v
logger.debug(f"Parsed KV pair: '{k}'='{v}' from line {ln}")
return dd
def generate_keylock_carrier_image(w=800,h=600,msg="KeyLock Wallet")->Image.Image:
"""Generates a simple gradient image with a KeyLock logo."""
# Ensure width and height are positive integers
w = max(100, int(w))
h = max(100, int(h))
cs,ce=(30,40,50),(70,80,90) # Start and end colors for gradient
img=Image.new("RGB",(w,h),cs) # Create base image with start color
draw=ImageDraw.Draw(img)
# Draw gradient lines
for y_ in range(h):
# Calculate interpolation factor (0 at top, 1 at bottom)
i = y_ / (h - 1) if h > 1 else 0.5 # Handle h=1 edge case
# Interpolate colors for the current line
r_,g_,b_ = (int(s_ * (1 - i) + e_ * i) for s_,e_ in zip(cs,ce))
# Draw a horizontal line
draw.line([(0,y_),(w,y_)], fill=(r_,g_,b_))
# Draw KeyLock logo (circle and rectangle)
ib = min(w,h) // 7 # Base size for logo elements
icx,icy = w // 2, h // 3 # Center coordinates for the circle (top part of logo)
cr = ib // 2 # Circle radius
# Circle bounding box (left, top, right, bottom)
cb = [(icx - cr, icy - cr), (icx + cr, icy + cr)]
rw = ib // 4 # Rectangle width
rh = ib // 2 # Rectangle height
rty = icy + int(cr * 0.2) # Top Y for rectangle (positioned below circle center)
# Rectangle bounding box (left, top, right, bottom)
rb = [(icx - rw // 2, rty), (icx + rw // 2, rty + rh)]
kc,ko = (190,195,200),(120,125,130) # Key color and outline color
ow = max(1, int(ib / 30)) # Outline width, ensure minimum 1px
# Draw circle and rectangle with outline
draw.ellipse(cb, fill=kc, outline=ko, width=ow)
draw.rectangle(rb, fill=kc, outline=ko, width=ow)
# Draw message text below the logo
# Adjust font size calculation slightly for potentially larger text
# Base calculation: max(18 minimum, scale based on width/height ratios)
fs = max(18, min(int(w/15), h//7)) # Increase min size and adjust ratios slightly
fnt = _get_font(PREFERRED_FONTS, fs) # Get the font object
tc = (225,230,235) # Text color
sc = (max(0,s_-20) for s_ in cs) # Shadow color (slightly darker than start gradient color)
tx,ty = w/2, h*.68 # Target center position for the text
so = max(1, int(fs/25)) # Shadow offset, ensure minimum 1px
try:
# Use anchor="mm" (middle, middle) to position the text centered at (tx, ty)
draw.text((tx+so,ty+so), msg, font=fnt, fill=tuple(sc), anchor="mm") # Draw shadow text
draw.text((tx,ty), msg, font=fnt, fill=tc, anchor="mm") # Draw main text
logger.debug(f"Drew message text '{msg}' with anchor='mm'. Font size: {fs}")
except AttributeError: # Fallback for older PIL versions without anchor support
logger.warning("PIL version does not support text anchor. Using manual centering fallback.")
try:
# Need to calculate text position manually for centering around (tx, ty)
# Use _get_text_measurement for robust size calculation
tw, th = _get_text_measurement(draw, msg, fnt)
# Calculate top-left corner (ax, ay) to center the text block (tw, th) around (tx, ty)
ax, ay = tx - tw / 2.0, ty - th / 2.0
draw.text((ax + so, ay + so), msg, font=fnt, fill=tuple(sc)) # Draw shadow text
draw.text((ax, ay), msg, font=fnt, fill=tc) # Draw main text
logger.debug(f"Drew message text '{msg}' with manual centering. Font size: {fs}")
except Exception as e:
logger.error(f"Fallback manual text centering failed: {e}", exc_info=True)
# As a last resort, draw at a fixed position or raise error
try:
# Try drawing at a standard top-left position with padding as an absolute fallback
logger.warning("Manual centering failed, attempting fallback text draw at margin position.")
draw.text((margin, margin), msg, font=fnt, fill=tc)
except Exception as draw_e:
logger.error(f"Failed to draw text overlay even at fixed position: {draw_e}", exc_info=True)
# Image generation might proceed without text if drawing fails completely
return img
def _get_text_measurement(draw_obj, text_str, font_obj):
"""
Returns (width, height) of text using the best available Pillow method.
Handles potential errors and provides fallbacks.
"""
if not text_str: return 0, 0
try:
# textbbox is most accurate for TrueType fonts (Pillow 8.0.0+)
# Returns (x1, y1, x2, y2) bounding box relative to origin (0,0)
bbox = draw_obj.textbbox((0, 0), text_str, font=font_obj)
width = bbox[2] - bbox[0]
height = bbox[3] - bbox[1]
# Basic sanity check: if width or height are zero for non-empty string, use fallback
if width > 0 and height > 0:
return width, height
else:
logger.debug(f"textbbox returned zero dimensions for '{text_str[:20]}...'. Attempting fallback.")
raise ValueError("textbbox returned zero dimensions.") # Force fallback
except Exception:
try:
# textsize is another method (often less accurate than textbbox) - Older Pillow versions
width, height = draw_obj.textsize(text_str, font=font_obj)
if width > 0 and height > 0:
return width, height
else:
logger.debug(f"textsize returned zero dimensions for '{text_str[:20]}...'. Attempting fallback.")
raise ValueError("textsize returned zero dimensions.") # Force fallback
except (AttributeError, ValueError):
try:
# getsize is also an older method - potentially on the font object itself
width, height = font_obj.getsize(text_str)
if width > 0 and height > 0:
return width, height
else:
logger.debug(f"getsize returned zero dimensions for '{text_str[:20]}...'. Attempting fallback.")
raise ValueError("getsize returned zero dimensions.") # Force fallback
except (AttributeError, ValueError):
try:
# Fallback to simple estimation based on font size (very rough)
# This assumes monospaced or average character width
char_width_approx = font_obj.size * 0.6 # Estimate character width
char_height_approx = font_obj.size # Estimate line height
width_est = int(len(text_str) * char_width_approx)
height_est = int(char_height_approx)
logger.warning(f"Using estimated text size for '{text_str[:20]}...': ({width_est}, {height_est}). Font: {getattr(font_obj, 'path', 'default')}, Size: {font_obj.size}")
# Ensure estimate is positive for non-empty string
if width_est > 0 and height_est > 0:
return width_est, height_est
else:
logger.debug(f"Estimation returned zero dimensions for '{text_str[:20]}...'. Using final hardcoded fallback.")
raise ValueError("Estimation returned zero dimensions.") # Force final fallback
except Exception:
logger.warning("Failed to estimate text size using any method for '{text_str[:20]}...'. Returning hardcoded default.")
# Final hardcoded fallback if all else fails
# Return minimum 1 for dimensions for non-empty string
return max(1, len(text_str) * 8), max(1, 10 + font_obj.size // 5)
def draw_key_list_dropdown_overlay(image: Image.Image, keys: list[str] = None, title: str = "Data Embedded (RSA Encrypted)") -> Image.Image:
"""
Draws a visual overlay on the image, including a title and optional list of keys.
Uses the top-right corner.
"""
# Return original image if nothing to draw
if not title and (keys is None or not keys):
logger.debug("No title or keys to display overlay. Returning original image.")
return set_pil_image_format_to_png(image.copy())
# Create a copy and convert to RGBA for alpha blending
img_overlayed = image.copy()
if img_overlayed.mode != 'RGBA':
img_overlayed = img_overlayed.convert('RGBA')
draw = ImageDraw.Draw(img_overlayed)
margin = 10 # Margin from image edges
# Padding inside overlay boxes (pixels)
padding = {
'title_h_pad': 6, # Vertical padding inside title bar
'title_w_pad': 10, # Horizontal padding inside title bar
'key_h_pad': 5, # Vertical padding around each key line
'key_w_pad': 10 # Horizontal padding around each key line
}
line_spacing = 4 # Extra space between key lines in the list
title_bg_color=(60,60,60,180) # Dark grey, semi-transparent (RGBA)
title_text_color=(230,230,90) # Yellowish
key_list_bg_color=(50,50,50,160) # Slightly darker grey, semi-transparent (RGBA)
key_text_color=(210,210,210) # Light grey
ellipsis_color=(170,170,170) # Grey for ellipsis
# Calculate overlay box width based on image width, within min/max limits
OVERLAY_TARGET_WIDTH_RATIO = 0.30
MIN_OVERLAY_WIDTH_PX = 180
MAX_OVERLAY_WIDTH_PX = 500
final_overlay_box_width = int(image.width * OVERLAY_TARGET_WIDTH_RATIO)
final_overlay_box_width = max(MIN_OVERLAY_WIDTH_PX, final_overlay_box_width)
final_overlay_box_width = min(MAX_OVERLAY_WIDTH_PX, final_overlay_box_width)
final_overlay_box_width = min(final_overlay_box_width, image.width - 2 * margin) # Ensure it fits with margins
if final_overlay_box_width <= 2 * max(padding['title_w_pad'], padding['key_w_pad']): # Ensure enough space for padding
logger.warning("Calculated overlay box width is too small for horizontal padding. Cannot draw overlay.")
if img_overlayed.mode == 'RGBA': img_overlayed = img_overlayed.convert('RGB')
return set_pil_image_format_to_png(img_overlayed)
logger.debug(f"Overlay box width calculated: {final_overlay_box_width} px.")
# Calculate font sizes based on image/overlay dimensions
TITLE_FONT_HEIGHT_RATIO = 0.030 # Relative to image height
MIN_TITLE_FONT_SIZE = 14
MAX_TITLE_FONT_SIZE = 28
title_font_size = int(image.height * TITLE_FONT_HEIGHT_RATIO)
# Refine based on required characters per line in the overlay width if title is very long
if title:
# Estimate average character width relative to font size (e.g., 0.6)
estimated_char_width_ratio = 0.6
# Calculate required font size if title text must fit horizontally within padding
available_width_for_title_text = final_overlay_box_width - 2 * padding['title_w_pad']
# Avoid division by zero if len(title) is 0 or est_char_width_ratio is 0
if len(title) > 0 and estimated_char_width_ratio > 0:
required_font_size_for_width = available_width_for_title_text / (len(title) * estimated_char_width_ratio)
title_font_size = min(title_font_size, int(required_font_size_for_width))
title_font_size = max(MIN_TITLE_FONT_SIZE, title_font_size)
title_font_size = min(MAX_TITLE_FONT_SIZE, title_font_size)
title_font = _get_font(PREFERRED_FONTS, title_font_size)
logger.debug(f"Title font size selected: {title_font_size}")
KEY_FONT_HEIGHT_RATIO = 0.025 # Relative to image height
MIN_KEY_FONT_SIZE = 12
MAX_KEY_FONT_SIZE = 22
key_font_size = int(image.height * KEY_FONT_HEIGHT_RATIO)
# Refine key font size based on average expected character width relative to overlay width
key_font_size_from_overlay_width = int(final_overlay_box_width * 0.07) # heuristic ratio
key_font_size = min(key_font_size, key_font_size_from_overlay_width)
key_font_size = max(MIN_KEY_FONT_SIZE, key_font_size)
key_font_size = min(MAX_KEY_FONT_SIZE, key_font_size)
key_font = _get_font(PREFERRED_FONTS, key_font_size)
logger.debug(f"Key list font size selected: {key_font_size}")
# Calculate text dimensions for initial layout planning
actual_title_w, actual_title_h = _get_text_measurement(draw, title, title_font)
logger.debug(f"Measured title size: ({actual_title_w}, {actual_title_h})")
disp_keys, actual_key_text_widths, key_line_heights = [], [], []
total_keys_render_h_estimate = 0 # Estimate total height needed *before* truncation logic
if keys:
# Limit keys to display (includes space for ellipsis line if needed)
limited_keys = keys[:MAX_KEYS_TO_DISPLAY_OVERLAY-1] if len(keys) > MAX_KEYS_TO_DISPLAY_OVERLAY else keys
for kt in limited_keys:
disp_keys.append(kt)
kw, kh = _get_text_measurement(draw, kt, key_font)
actual_key_text_widths.append(kw)
key_line_heights.append(kh) # Store height based on original text
total_keys_render_h_estimate += kh
# Add ellipsis line text if keys were truncated
if len(keys) > MAX_KEYS_TO_DISPLAY_OVERLAY:
ellipsis_text = f"... ({len(keys)-(MAX_KEYS_TO_DISPLAY_OVERLAY-1)} more)"
disp_keys.append(ellipsis_text)
ew, eh = _get_text_measurement(draw, ellipsis_text, key_font)
actual_key_text_widths.append(ew)
key_line_heights.append(eh) # Store height for ellipsis line
total_keys_render_h_estimate += eh
# Add line spacing height estimate
if len(disp_keys) > 1: total_keys_render_h_estimate += line_spacing * (len(disp_keys) - 1)
# --- Draw Title Bar (Top-Right) ---
title_bar_h = actual_title_h + 2 * padding['title_h_pad']
title_bar_x1 = image.width - margin # Right edge
title_bar_x0 = title_bar_x1 - final_overlay_box_width # Left edge using final_overlay_box_width
title_bar_y0 = margin # Top edge
title_bar_y1 = title_bar_y0 + title_bar_h # Bottom edge
# Ensure title bar has positive dimensions
if title_bar_x0 >= title_bar_x1 or title_bar_y0 >= title_bar_y1:
logger.warning("Calculated title bar dimensions are invalid. Cannot draw title overlay.")
else:
draw.rectangle([(title_bar_x0,title_bar_y0),(title_bar_x1,title_bar_y1)],fill=title_bg_color)
# Draw title text (centered horizontally in the available space)
available_width_for_title_text = final_overlay_box_width - 2 * padding['title_w_pad']
title_text_draw_x = title_bar_x0 + padding['title_w_pad'] + max(0, (available_width_for_title_text - actual_title_w) / 2)
title_text_draw_y = title_bar_y0 + padding['title_h_pad']
draw.text((title_text_draw_x, title_text_draw_y), title, font=title_font, fill=title_text_color)
logger.debug(f"Drew title '{title}' at ({title_text_draw_x}, {title_text_draw_y})")
# --- Draw Key List Box ---
if disp_keys:
key_list_box_h_ideal = total_keys_render_h_estimate + 2 * padding['key_h_pad']
key_list_x0, key_list_x1 = title_bar_x0, title_bar_x1 # Align key list box with title bar
key_list_y0 = title_bar_y1 # Start immediately below title bar
# Calculate the actual height needed considering image bottom boundary
max_key_list_box_h = image.height - margin - key_list_y0
current_key_list_box_h = min(key_list_box_h_ideal, max_key_list_box_h)
key_list_y1 = key_list_y0 + current_key_list_box_h
# Check if there's enough vertical space to draw at least the first key line (including padding)
min_required_key_list_height = (key_line_heights[0] if key_line_heights else 0) + 2 * padding['key_h_pad']
if current_key_list_box_h < min_required_key_list_height and len(disp_keys) > 0:
logger.warning(f"Not enough vertical space ({current_key_list_box_h}px) to draw key list overlay (needed min {min_required_key_list_height}px).")
# Convert back to RGB if it was converted
if img_overlayed.mode == 'RGBA': img_overlayed = img_overlayed.convert('RGB')
return set_pil_image_format_to_png(img_overlayed)
# Ensure key list box has positive dimensions
if key_list_x0 >= key_list_x1 or key_list_y0 >= key_list_y1:
logger.warning("Calculated key list box dimensions are invalid. Cannot draw key list overlay.")
else:
draw.rectangle([(key_list_x0,key_list_y0),(key_list_x1,key_list_y1)],fill=key_list_bg_color)
logger.debug(f"Drew key list box from ({key_list_x0},{key_list_y0}) to ({key_list_x1},{key_list_y1})")
current_text_y = key_list_y0 + padding['key_h_pad']
available_text_width_for_keys = final_overlay_box_width - 2 * padding['key_w_pad']
for i, key_text_item in enumerate(disp_keys):
# Get height of the *original* item text for initial vertical check
# This uses the pre-calculated height which should be accurate based on the original string
estimated_line_h = key_line_heights[i] if i < len(key_line_heights) else _get_text_measurement(draw, key_text_item, key_font)[1]
if estimated_line_h <= 0 and len(key_text_item) > 0: estimated_line_h = key_font.size # Fallback height
# Calculate the space needed *including* line spacing for the *next* line IF there is one
space_needed_for_this_line_and_next_gap = estimated_line_h + (line_spacing if i < len(disp_keys)-1 else 0)
# Check vertical fit *before* potentially drawing this line
# This checks if adding this line and the potential space after it would exceed the bounds
if current_text_y + space_needed_for_this_line_and_next_gap > key_list_y1 - padding['key_h_pad']:
# This line and the gap *after* it won't fit.
# Check if we can draw *just* an ellipsis for truncation indication at the current Y.
if not key_text_item.endswith("..."): # Avoid drawing ellipsis twice if it's already the final "..." line
ellipsis_str = "..."
ellipsis_w, ellipsis_h = _get_text_measurement(draw, ellipsis_str, key_font)
# Check if just the ellipsis fits vertically within the remaining space
if current_text_y + ellipsis_h <= key_list_y1 - padding['key_h_pad']:
# Draw the ellipsis centered horizontally
ellipsis_draw_x = key_list_x0 + padding['key_w_pad'] + max(0, (available_text_width_for_keys - ellipsis_w) / 2)
draw.text((ellipsis_draw_x, current_text_y), ellipsis_str, font=key_font, fill=ellipsis_color)
logger.debug(f"Drew final ellipsis at ({ellipsis_draw_x}, {current_text_y}) due to vertical space limit.")
break # Stop processing/drawing further lines
# If it fits vertically: Determine text to draw with horizontal truncation.
# --- Determine text_to_draw and its dimensions AFTER horizontal truncation ---
original_key_text_w = actual_key_text_widths[i] if i < len(actual_key_text_widths) else _get_text_measurement(draw, key_text_item, key_font)[0]
text_to_draw = key_text_item # Start assuming no truncation
if original_key_text_w > available_text_width_for_keys and not key_text_item.startswith("..."):
temp_text = key_text_item
ellipsis_str = "..."
# Recalculate ellipsis width based on key_font
ellipsis_w, _ = _get_text_measurement(draw, ellipsis_str, key_font)
# Ensure space_for_text is not negative
space_for_text = max(0, available_text_width_for_keys - ellipsis_w)
truncated = False
# Truncate from the end until `temp_text + ellipsis_str` fits horizontally
# Use _get_text_measurement for width calculation
while _get_text_measurement(draw, temp_text + ellipsis_str, key_font)[0] > available_text_width_for_keys and len(temp_text) > 0:
temp_text = temp_text[:-1]
truncated = True
# Add ellipsis if truncation happened
text_to_draw = temp_text + ellipsis_str if truncated else temp_text
# Re-calculate actual dimensions *after* truncation for drawing and Y update
final_key_text_w, current_line_actual_h = _get_text_measurement(draw, text_to_draw, key_font)
# Aggressive fallback truncation if needed (less likely now with robust _get_text_measurement)
# Check again if the truncated text + ellipsis fits horizontally
if final_key_text_w > available_text_width_for_keys:
logger.warning(f"Aggressive truncation needed for key: '{key_text_item[:20]}...'. Final text '{text_to_draw[:20]}...' still too wide ({final_key_text_w}px > {available_text_width_for_keys}px). Re-truncating.")
# Estimate max characters that fit using a rough ratio
est_chars_that_fit = int(available_text_width_for_keys / max(1, key_font_size * 0.5)) # Use a slightly smaller ratio for safety
text_to_draw = key_text_item[:max(0, est_chars_that_fit - len(ellipsis_str))] + ellipsis_str
# Recalculate dimensions after aggressive truncation
final_key_text_w, current_line_actual_h = _get_text_measurement(draw, text_to_draw, key_font)
else: # No horizontal truncation needed for this specific item or it's an ellipsis line
text_to_draw = key_text_item # Use the original text item
# Get actual dimensions for this line (might be different from estimated if _get_text_measurement varies)
final_key_text_w, current_line_actual_h = _get_text_measurement(draw, text_to_draw, key_font)
# Ensure actual height is positive for non-empty string
if current_line_actual_h <= 0 and len(text_to_draw) > 0:
logger.warning(f"Measured height zero for text '{text_to_draw[:20]}...'. Using estimated height: {key_font.size}")
current_line_actual_h = key_font.size # Fallback to font size as height estimate
# --- DRAW THE TEXT ---
# Center text horizontally within the available padding space
key_text_draw_x = key_list_x0 + padding['key_w_pad'] + max(0, (available_text_width_for_keys - final_key_text_w) / 2)
# Determine text color: ellipsis color if it's the final "more" ellipsis line
# or if truncation resulted in adding "..." to a regular key name.
text_color_to_use = key_text_color
if key_text_item.startswith("...") or text_to_draw.endswith("...") and text_to_draw != key_text_item:
text_color_to_use = ellipsis_color
draw.text((key_text_draw_x, current_text_y), text_to_draw, font=key_font, fill=text_color_to_use)
logger.debug(f"Drew key item '{text_to_draw}' at ({key_text_draw_x}, {current_text_y}). Measured height: {current_line_actual_h}")
# --- UPDATE Y POSITION FOR NEXT LINE ---
# Move down by the *actual* height of the line just drawn
current_text_y += current_line_actual_h
# Add spacing only if there are more lines to draw AND we haven't hit the vertical boundary
# The boundary check at the start of the loop should prevent drawing the spacing if the next line won't fit
if i < len(disp_keys)-1:
current_text_y += line_spacing
# Convert back to RGB if it was converted to RGBA
# This flattens the layers, making the alpha blending permanent.
if img_overlayed.mode == 'RGBA':
try:
img_overlayed = img_overlayed.convert('RGB')
logger.debug("Converted overlayed image back to RGB.")
except Exception as e:
logger.error(f"Failed to convert overlayed image back to RGB: {e}", exc_info=True)
# Decide how to handle this failure - maybe return RGBA or raise error
# For now, let's proceed with RGBA if RGB conversion fails, though it might not be desired.
pass # Keep as RGBA if conversion fails
# Final step: Ensure the output is in PNG format to preserve LSBs
# This step also handles the RGB -> PNG conversion robustly.
return set_pil_image_format_to_png(img_overlayed)