# keylock/core.py import io import json import os import struct import logging import traceback import base64 from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.exceptions import InvalidTag, InvalidSignature, InvalidKey from PIL import Image, ImageDraw, ImageFont import numpy as np logger = logging.getLogger(__name__) # Configure logger if not already configured if not logger.hasHandlers(): handler = logging.StreamHandler() # Using the format from the original code for consistency formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(module)s - %(lineno)d - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.DEBUG) # Keep DEBUG level for now to aid troubleshooting # Constants from the original code (kept for clarity, though some are specific to older crypto) SALT_SIZE = 16 # Not used in hybrid AESGCM NONCE_SIZE = 12 # Matches AES_GCM_NONCE_SIZE_CRYPTO TAG_SIZE = 16 # Matches AES_GCM_TAG_SIZE_CRYPTO PBKDF2_ITERATIONS = 390_000 # Not used in hybrid RSA-AES LENGTH_HEADER_SIZE = 4 # The 4-byte header added by the LSB embedder, indicates size of crypto payload *following* this header # Constants for the NEW hybrid encryption payload structure (what gets embedded AFTER the LSB header) # Crypto Payload Structure: # [Encrypted AES Key Length (4 bytes)] [Encrypted AES Key (RSA_Key_Size_Bytes)] [AES Nonce (12 bytes)] [AES GCM Ciphertext] [AES GCM Tag (16 bytes)] ENCRYPTED_AES_KEY_LEN_SIZE_CRYPTO = 4 # Bytes (Size of the length field *within* the crypto payload) AES_GCM_NONCE_SIZE_CRYPTO = 12 # Bytes (Size of the Nonce *within* the crypto payload) AES_GCM_TAG_SIZE_CRYPTO = 16 # Bytes (Size of the Tag *within* the crypto payload) AES_KEY_SIZE_CRYPTO = 32 # Bytes (for AES-256, the size of the key that gets RSA encrypted) RSA_KEY_SIZE_DEFAULT = 2048 # Bits (Default size for RSA key generation) PREFERRED_FONTS = ["Verdana", "Arial", "DejaVu Sans", "Calibri", "Helvetica", "Roboto-Regular", "sans-serif"] MAX_KEYS_TO_DISPLAY_OVERLAY = 12 def _get_font(preferred_fonts, base_size): """ Attempts to load a TrueType font from a list of preferred fonts. Falls back to PIL's default font if preferred fonts are not found. Ensures the font is loaded at the specified base_size. """ fp = None # Ensure base_size is a positive integer for ImageFont.truetype safe_base_size = int(base_size) if safe_base_size <= 0: safe_base_size = 10 # Default to a small positive size # First, try to find a usable font path or name for n in preferred_fonts: try: # Test loading with a small size to find the file path without large memory allocation # The actual font object will be created with safe_base_size later. ImageFont.truetype(n.lower()+".ttf", 10) # Check .ttf first fp = n.lower() + ".ttf" break except IOError: try: ImageFont.truetype(n, 10) # Check bare name (might be system font) fp = n break except IOError: continue # Try next font # If a font path was found, attempt to load it with the desired size if fp: try: logger.debug(f"Found font path/name '{fp}'. Attempting to load with size {safe_base_size}.") return ImageFont.truetype(fp, safe_base_size) except IOError as e: logger.warning(f"Font '{fp}' load failed with size {safe_base_size}: {e}. Attempting default font.") except Exception as e: logger.warning(f"Unexpected error loading font '{fp}' with size {safe_base_size}: {e}. Attempting default font.", exc_info=True) # Fallback to PIL's default font logger.warning("Could not load preferred or system font. Using PIL's load_default.") try: # Try to load default with size if Pillow version supports it (Pillow 10+) try: default_font = ImageFont.load_default(size=safe_base_size) logger.debug(f"Loaded default font with size {safe_base_size}.") return default_font except TypeError: # Older Pillow doesn't support 'size' argument for load_default logger.warning("Pillow version does not support sizing load_default. Using unsized default.") default_font = ImageFont.load_default() # Note: Unsized default font is often a small bitmap font. return default_font except Exception as e: logger.error(f"Failed to load PIL's default font: {e}", exc_info=True) # As a last resort, raise an error as text drawing is likely impossible raise IOError("Failed to load any font for image overlay.") def set_pil_image_format_to_png(image:Image.Image)->Image.Image: """Ensures image format is set to PNG by saving/reloading. PNG preserves LSBs.""" buf=io.BytesIO() try: # Save the image to an in-memory buffer as PNG image.save(buf, format='PNG') buf.seek(0) # Rewind the buffer to the beginning # Open the image from the buffer reloaded = Image.open(buf) # Explicitly set format attribute, although Image.open often sets it reloaded.format = "PNG" logger.debug(f"Converted image to PNG format. Original mode: {image.mode}, New mode: {reloaded.mode}") return reloaded except Exception as e: logger.error(f"Failed to convert image to PNG format: {e}", exc_info=True) # Re-raise the exception as PNG conversion is crucial for LSB preservation raise RuntimeError(f"Failed to process image for PNG conversion: {e}") # --- RSA Key Management --- def generate_rsa_key_pair(key_size_bits: int = RSA_KEY_SIZE_DEFAULT) -> tuple[bytes, bytes]: """Generates a new RSA private and public key pair in PEM format.""" logger.debug(f"Generating RSA key pair with size: {key_size_bits} bits.") private_key = rsa.generate_private_key( public_exponent=65537, key_size=key_size_bits ) public_key = private_key.public_key() private_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, # PKCS8 is recommended encryption_algorithm=serialization.NoEncryption() # No password for simplicity in this app's context; user must secure file/paste. ) public_pem = public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) logger.debug("RSA key pair generated successfully.") return private_pem, public_pem def _clean_pem_bytes(pem_data: bytes | str) -> bytes: """Strips leading/trailing whitespace and converts string to bytes if needed.""" if isinstance(pem_data, str): # Remove common surrounding strings like backticks if present (heuristic) pem_data = pem_data.strip().strip('```') pem_data = pem_data.encode('utf-8') else: pem_data = pem_data.strip() return pem_data def load_rsa_private_key_from_pem(pem_bytes: bytes | str) -> rsa.RSAPrivateKey: """Loads an RSA private key from PEM bytes, stripping surrounding data/whitespace.""" try: # Clean input bytes/string cleaned_pem = _clean_pem_bytes(pem_bytes) logger.debug(f"Cleaned private key PEM data (first 50 bytes): {cleaned_pem[:50]}...") # Attempt to load the key try: private_key = serialization.load_pem_private_key( cleaned_pem, password=None # Assumes key file is not password-protected ) except (TypeError, ValueError, InvalidKey) as e: # Catch cryptography errors specifically related to format or encryption err_str = str(e).lower() if "password" in err_str or "decryption failed" in err_str: logger.error(f"Failed to load RSA private key: It appears password-protected. {e}") raise ValueError("Private key appears to be password-protected. KeyLock requires an unencrypted key.") if "malformed" in err_str or "invalid pem" in err_str or "unsupported key type" in err_str: logger.error(f"Failed to parse RSA private key (parsing error): {e}", exc_info=True) raise ValueError(f"Failed to parse RSA private key. Ensure it is a valid, unencrypted PKCS8 or traditional PEM format key: {e}") logger.error(f"Failed to load RSA private key (cryptography error): {e}", exc_info=True) raise ValueError(f"Failed to load RSA private key due to a cryptography error: {e}") except Exception as e: logger.error(f"Unexpected error loading RSA private key: {e}", exc_info=True) raise RuntimeError(f"An unexpected error occurred loading the private key: {e}") if not isinstance(private_key, rsa.RSAPrivateKey): logger.error("Loaded object is not an RSA private key instance.") raise TypeError("Loaded key is not a valid RSA private key.") logger.debug(f"RSA private key loaded successfully (size: {private_key.key_size} bits).") return private_key except (ValueError, TypeError, RuntimeError, InvalidKey) as e: # Re-raise exceptions with specific messages if they originated from this function if isinstance(e, (ValueError, TypeError, RuntimeError)) and ( "password" in str(e).lower() or "parse" in str(e).lower() or "private key appears" in str(e) or "cryptography error" in str(e) or "Loaded key is not a valid RSA private key" in str(e) ): raise e # Catch anything else and wrap in a generic error logger.error(f"Final exception during private key load: {e}", exc_info=True) raise ValueError(f"Failed to load RSA private key: {e}") def load_rsa_public_key_from_pem(pem_bytes: bytes | str) -> rsa.RSAPublicKey: """Loads an RSA public key from PEM bytes, stripping surrounding data/whitespace.""" try: # Clean input bytes/string cleaned_pem = _clean_pem_bytes(pem_bytes) logger.debug(f"Cleaned public key PEM data (first 50 bytes): {cleaned_pem[:50]}...") # Attempt to load the key try: public_key = serialization.load_pem_public_key(cleaned_pem) except (ValueError, InvalidKey) as e: # Catch cryptography errors specifically related to format err_str = str(e).lower() if "malformed" in err_str or "invalid pem" in err_str or "unsupported key type" in err_str: logger.error(f"Failed to parse RSA public key (parsing error): {e}", exc_info=True) raise ValueError(f"Failed to parse RSA public key. Ensure it is a valid PEM format key: {e}") logger.error(f"Failed to load RSA public key (cryptography error): {e}", exc_info=True) raise ValueError(f"Failed to load RSA public key due to a cryptography error: {e}") except Exception as e: logger.error(f"Unexpected error loading RSA public key: {e}", exc_info=True) raise RuntimeError(f"An unexpected error occurred loading the public key: {e}") if not isinstance(public_key, rsa.RSAPublicKey): logger.error("Loaded object is not an RSA public key instance.") raise TypeError("Loaded key is not a valid RSA public key.") logger.debug(f"RSA public key loaded successfully (size: {public_key.key_size} bits).") return public_key except (ValueError, TypeError, RuntimeError, InvalidKey) as e: # Re-raise exceptions with specific messages if they originated from this function if isinstance(e, (ValueError, TypeError, RuntimeError)) and ( "parse" in str(e).lower() or "cryptography error" in str(e) or "Loaded key is not a valid RSA public key" in str(e) ): raise e # Catch anything else and wrap in a generic error logger.error(f"Final exception during public key load: {e}", exc_info=True) raise ValueError(f"Failed to load RSA public key: {e}") # --- Hybrid Encryption/Decryption --- def encrypt_data_hybrid(data: bytes, recipient_public_key_pem: bytes | str) -> bytes: """ Encrypts data using AES-GCM with a random key, then encrypts the AES key using the recipient's RSA public key. Returns the combined crypto payload: [Encrypted AES Key Length (4 bytes)] [Encrypted AES Key] [AES Nonce (12 bytes)] [AES GCM Ciphertext] [AES GCM Tag (16 bytes)] """ logger.debug(f"Starting hybrid encryption of {len(data)} bytes.") try: # Ensure public key is loaded correctly, handling potential string input and cleaning public_key = load_rsa_public_key_from_pem(recipient_public_key_pem) logger.debug(f"Using RSA public key size: {public_key.key_size} bits.") # 1. Generate a random AES key (AES-256 requires 32 bytes) aes_key = os.urandom(AES_KEY_SIZE_CRYPTO) logger.debug(f"Generated random AES key ({AES_KEY_SIZE_CRYPTO} bytes).") # 2. Encrypt data using AES-GCM with the random key aesgcm = AESGCM(aes_key) # Generate a unique nonce for this encryption operation nonce = os.urandom(AES_GCM_NONCE_SIZE_CRYPTO) logger.debug(f"Generated AES-GCM nonce ({AES_GCM_NONCE_SIZE_CRYPTO} bytes).") # AESGCM.encrypt(nonce, plaintext, associated_data=None) returns ciphertext + tag ciphertext_with_tag = aesgcm.encrypt(nonce, data, None) # No AAD used here logger.debug(f"AES encrypted data (ciphertext+tag) size: {len(ciphertext_with_tag)} bytes.") # 3. Encrypt the random AES key using the recipient's RSA public key try: rsa_encrypted_aes_key = public_key.encrypt( aes_key, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None # Optional: Could add a label here if needed, but requires recipient knowing it ) ) logger.debug(f"RSA encrypted AES key size: {len(rsa_encrypted_aes_key)} bytes.") except Exception as e: logger.error(f"RSA encryption of AES key failed: {e}", exc_info=True) # Catch specific RSA errors during encryption (less common than decryption failures) if "data too large" in str(e).lower(): # OAEP padding adds overhead. Key size must be large enough for AES_KEY_SIZE_CRYPTO + padding. # RSA key size in bits must be > (AES_KEY_SIZE_CRYPTO + 2*SHA256_digest_size + 2) * 8 # For AES-256 (32B) and SHA256 (32B digest), this is > (32 + 2*32 + 2) * 8 = (32 + 64 + 2) * 8 = 98 * 8 = 784 bits. # 2048 bits (256 bytes) is ample. This error is more likely if the RSA key size is somehow misreported or corrupted. raise ValueError(f"RSA encryption of symmetric key failed: The symmetric key size ({AES_KEY_SIZE_CRYPTO}B) might be too large for the RSA key size ({public_key.key_size} bits) with OAEP padding. Use a larger RSA key. Original error: {e}") raise ValueError(f"RSA encryption of the symmetric key failed: {e}. Check public key validity or RSA key size.") # 4. Combine the crypto payload structure for embedding # Structure: [Encrypted AES Key Length (4 bytes)] [Encrypted AES Key] [AES Nonce (12 bytes)] [AES GCM Ciphertext + AES GCM Tag (16 bytes)] encrypted_aes_key_len_bytes = struct.pack('>I', len(rsa_encrypted_aes_key)) logger.debug(f"Packed encrypted AES key length ({len(rsa_encrypted_aes_key)} bytes). Header bytes size: {len(encrypted_aes_key_len_bytes)}.") # The combined payload includes: # 1. Length of the RSA encrypted key (4 bytes) # 2. The RSA encrypted key bytes # 3. The AES Nonce (12 bytes) -> Use the 'nonce' variable directly # 4. The AES Ciphertext + Tag bytes -> Use 'ciphertext_with_tag' combined_crypto_payload = encrypted_aes_key_len_bytes + rsa_encrypted_aes_key + nonce + ciphertext_with_tag logger.info(f"Hybrid encryption successful. Total combined crypto payload size: {len(combined_crypto_payload)} bytes.") return combined_crypto_payload except (ValueError, RuntimeError, InvalidKey) as e: logger.error(f"Hybrid encryption error (caught specific exception): {e}", exc_info=True) raise e # Re-raise value/runtime/invalidkey errors from sub-functions except Exception as e: # Catch any other unexpected errors logger.error(f"Unexpected hybrid encryption error: {e}", exc_info=True) raise RuntimeError(f"An unexpected error occurred during encryption: {e}") def decrypt_data_hybrid(crypto_payload_from_lsb: bytes, recipient_private_key_pem: bytes | str) -> bytes: """ Decrypts the crypto payload (extracted from LSB, *without* its header) using the recipient's RSA private key. """ logger.debug(f"Starting hybrid decryption. Received crypto_payload_from_lsb size: {len(crypto_payload_from_lsb)} bytes.") # The input 'crypto_payload_from_lsb' is expected to be the raw bytes extracted # by the LSB layer *after* its 4-byte header. # This payload should have the structure: # [Encrypted AES Key Length (4 bytes)] [Encrypted AES Key] [AES Nonce (12 bytes)] [AES GCM Ciphertext] [AES GCM Tag (16 bytes)] try: # Ensure private key is loaded correctly, handling potential string input and cleaning private_key = load_rsa_private_key_from_pem(recipient_private_key_pem) logger.debug(f"Using RSA private key with size: {private_key.key_size} bits for decryption.") # 1. Parse the combined crypto payload # Minimum size check needs to be based on the *crypto* payload structure now # It needs 4 bytes for encrypted_aes_key_len + min RSA output size + AES nonce + AES tag # The encrypted key size will be exactly the RSA key size in bytes (e.g., 256 bytes for 2048 bits). rsa_key_size_bytes = private_key.key_size // 8 # The minimum payload must contain the len field, the encrypted key, the nonce, and the tag. min_expected_crypto_payload_size = ENCRYPTED_AES_KEY_LEN_SIZE_CRYPTO + rsa_key_size_bytes + AES_GCM_NONCE_SIZE_CRYPTO + AES_GCM_TAG_SIZE_CRYPTO logger.debug(f"Expected min crypto payload size based on private key ({rsa_key_size_bytes}B RSA key): {min_expected_crypto_payload_size} bytes.") if len(crypto_payload_from_lsb) < min_expected_crypto_payload_size: logger.error(f"Crypto payload too short. Size: {len(crypto_payload_from_lsb)}, Expected min: {min_expected_crypto_payload_size}") raise ValueError(f"Crypto payload extracted from image is too short ({len(crypto_payload_from_lsb)} bytes). Expected at least {min_expected_crypto_payload_size} bytes. Image likely corrupted or does not contain a valid RSA-encrypted payload.") # Extract the length of the encrypted AES key (first 4 bytes of the crypto payload) encrypted_aes_key_len_bytes = crypto_payload_from_lsb[:ENCRYPTED_AES_KEY_LEN_SIZE_CRYPTO] try: encrypted_aes_key_len = struct.unpack('>I', encrypted_aes_key_len_bytes)[0] except struct.error as e: logger.error(f"Failed to unpack encrypted AES key length header: {e}", exc_info=True) raise ValueError(f"Failed to read encrypted AES key length header from crypto payload: {e}. Payload corrupted.") logger.debug(f"Parsed encrypted AES key length: {encrypted_aes_key_len} bytes.") # Validate the indicated encrypted_aes_key_len # It should be equal to the RSA key size in bytes used during encryption expected_rsa_encrypted_aes_key_size = private_key.key_size // 8 if encrypted_aes_key_len != expected_rsa_encrypted_aes_key_size: logger.error(f"Parsed encrypted AES key length ({encrypted_aes_key_len}B) does not match private key size in bytes ({expected_rsa_encrypted_aes_key_size}B).") # This check is crucial and can catch key mismatches or corruption early raise ValueError(f"Encrypted AES key length mismatch ({encrypted_aes_key_len} bytes). Expected {expected_rsa_encrypted_aes_key_size} bytes based on private key size ({private_key.key_size} bits). Indicates incorrect key pair or data corruption.") # Calculate offsets for parsing the rest of the payload rsa_encrypted_aes_key_start = ENCRYPTED_AES_KEY_LEN_SIZE_CRYPTO # 4 rsa_encrypted_aes_key_end = rsa_encrypted_aes_key_start + encrypted_aes_key_len aes_nonce_start = rsa_encrypted_aes_key_end aes_nonce_end = aes_nonce_start + AES_GCM_NONCE_SIZE_CRYPTO # 12 bytes after encrypted key aes_ciphertext_with_tag_start = aes_nonce_end logger.debug(f"Calculated offsets for parsing crypto payload: RSA Encrypted Key starts at {rsa_encrypted_aes_key_start}, ends at {rsa_encrypted_aes_key_end}. AES Nonce starts at {aes_nonce_start}, ends at {aes_nonce_end}. AES Ciphertext+Tag starts at {aes_ciphertext_with_tag_start}.") # Check if the payload is long enough for the declared encrypted key length + nonce + tag # Note: Ciphertext can be 0 length, but tag is always 16 bytes for AES-GCM required_min_payload_after_encrypted_key = AES_GCM_NONCE_SIZE_CRYPTO + AES_GCM_TAG_SIZE_CRYPTO if len(crypto_payload_from_lsb) < rsa_encrypted_aes_key_end + required_min_payload_after_encrypted_key: logger.error(f"Crypto payload too short based on parsed lengths and minimum requirements. Size: {len(crypto_payload_from_lsb)}, Required min based on parsed length: {rsa_encrypted_aes_key_end + required_min_payload_after_encrypted_key} ({ENCRYPTED_AES_KEY_LEN_SIZE_CRYPTO}B len + {encrypted_aes_key_len}B RSA key + 12B Nonce + 16B Tag).") raise ValueError("Crypto payload extracted from image is truncated or corrupted based on parsed lengths.") # Extract the RSA-encrypted AES key rsa_encrypted_aes_key = crypto_payload_from_lsb[rsa_encrypted_aes_key_start : rsa_encrypted_aes_key_end] logger.debug(f"Extracted RSA encrypted AES key. Size: {len(rsa_encrypted_aes_key)} bytes.") # Extract the AES Nonce aes_nonce = crypto_payload_from_lsb[aes_nonce_start : aes_nonce_end] logger.debug(f"Extracted AES Nonce. Size: {len(aes_nonce)} bytes.") # Explicit check for nonce size (Added for robustness) if len(aes_nonce) != AES_GCM_NONCE_SIZE_CRYPTO: logger.error(f"Extracted AES nonce has incorrect size: {len(aes_nonce)} bytes. Expected: {AES_GCM_NONCE_SIZE_CRYPTO} bytes.") raise ValueError(f"Extracted AES nonce has incorrect size ({len(aes_nonce)} bytes). Expected {AES_GCM_NONCE_SIZE_CRYPTO} bytes. Indicates payload corruption or incorrect parsing.") # Extract the AES ciphertext and tag aes_ciphertext_with_tag = crypto_payload_from_lsb[aes_ciphertext_with_tag_start:] logger.debug(f"Extracted AES ciphertext+tag. Size: {len(aes_ciphertext_with_tag)} bytes.") # Check minimum size for tag (ciphertext can be empty, but tag is always 16B) if len(aes_ciphertext_with_tag) < AES_GCM_TAG_SIZE_CRYPTO: logger.error(f"AES ciphertext+tag part too short ({len(aes_ciphertext_with_tag)}B). Expected min {AES_GCM_TAG_SIZE_CRYPTO}B (tag).") raise ValueError("AES ciphertext+tag part is too short (missing tag).") # 2. Decrypt the encrypted AES key using the recipient's RSA private key try: recovered_aes_key = private_key.decrypt( rsa_encrypted_aes_key, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None ) ) logger.debug(f"RSA decryption of AES key successful. Recovered key size: {len(recovered_aes_key)} bytes.") except Exception as e: logger.error(f"RSA decryption of AES key failed: {e}", exc_info=True) # Specific error for RSA decryption failure # Catch common RSA decryption errors and map to a helpful message err_msg = str(e).lower() if "decryption failed" in err_msg or "data too large" in err_msg or "oaep" in err_msg or "private key" in err_msg or "signature" in err_msg: raise ValueError(f"RSA decryption of the symmetric key failed. This typically means the incorrect private key was used, or the embedded encrypted key data is corrupted. Original error: {e}") else: raise RuntimeError(f"An unexpected error occurred during RSA decryption: {e}") # Check the length of the recovered key - it must be AES_KEY_SIZE_CRYPTO if len(recovered_aes_key) != AES_KEY_SIZE_CRYPTO: logger.error(f"RSA decryption resulted in key of unexpected length ({len(recovered_aes_key)}B). Expected {AES_KEY_SIZE_CRYPTO}B.") # This is a strong indicator of a key pair mismatch or severe corruption raise ValueError(f"RSA decryption produced an AES key of unexpected length ({len(recovered_aes_key)} bytes). This strongly suggests the private key used does not match the public key used for encryption, or the data is corrupted.") # 3. Decrypt the data using the recovered AES key and nonce/tag logger.debug("Attempting AES-GCM decryption of data...") aesgcm = AESGCM(recovered_aes_key) # Note: cryptography's AESGCM.decrypt expects nonce, ciphertext+tag, aad try: decrypted_data = aesgcm.decrypt(aes_nonce, aes_ciphertext_with_tag, None) # No AAD used here except InvalidTag: # This is the primary way AES-GCM indicates authentication/decryption failure logger.error("AES-GCM decryption failed: Invalid tag.", exc_info=True) raise ValueError("Decryption failed (InvalidTag): The private key used is likely incorrect or the image data is corrupted.") except Exception as e: logger.error(f"Unexpected error during AES-GCM decryption: {e}", exc_info=True) raise RuntimeError(f"An unexpected error occurred during AES-GCM decryption: {e}") logger.info("Hybrid decryption process completed successfully.") return decrypted_data except (ValueError, InvalidTag, InvalidKey, TypeError, RuntimeError) as e: # Catch specific exceptions and re-raise with more context logger.error(f"Hybrid decryption error (caught specific exception): {e}", exc_info=True) # Specific error messages should already be set if they originated from sub-functions we wrapped raise e # Re-raise the exception as is except Exception as e: # Catch any other unexpected errors logger.error(f"Unexpected hybrid decryption error: {e}", exc_info=True) raise RuntimeError(f"An unexpected error occurred during decryption: {e}") # --- LSB Steganography --- def _d2b(d:bytes)->str: """Convert bytes to a binary string.""" return ''.join(format(b,'08b') for b in d) def _b2B(b:str)->bytes: """Convert a binary string to bytes.""" # The binary string should ideally be a multiple of 8 bits. # If not, it indicates corruption or an issue with extraction. # We will NOT pad here during conversion, as it would hide truncation errors. if len(b)%8!=0: logger.error(f"Binary string length ({len(b)}) is not a multiple of 8 during _b2B conversion.") # Raise an error instead of padding, as padding could mask corruption raise ValueError(f"Binary string length ({len(b)}) is not a multiple of 8. Data corruption suspected.") try: return bytes(int(b[i:i+8],2) for i in range(0,len(b),8)) except ValueError as e: logger.error(f"Failed to convert binary string to bytes: {e}. Input binary string length: {len(b)}") # Provide part of the binary string for debugging log_b = b[:100] + ('...' if len(b)>100 else '') logger.debug(f"Input binary string (first 100 chars): {log_b}") raise ValueError(f"Failed to convert binary string to bytes: {e}. Data might be corrupted.") def embed_data_in_image(img_obj:Image.Image,data:bytes)->Image.Image: """ Embeds raw data bytes (`data`) into the LSB of the image's RGB channels, prefixed with a 4-byte length header. The `data` should be the output of the crypto layer (e.g., the combined_crypto_payload from encrypt_data_hybrid). """ logger.debug(f"LSB Embedding: Received {len(data)} bytes of data from crypto layer (the crypto payload).") # Create a copy and ensure image is RGB # Using 'RGB' mode ensures 3 channels. If input is RGBA, A channel is dropped. # This is crucial for predictable LSB embedding across different input image types. try: img = img_obj.convert("RGB") except Exception as e: logger.error(f"Failed to convert image to RGB before LSB embedding: {e}", exc_info=True) raise RuntimeError(f"Image processing failed: Could not convert image to RGB for embedding: {e}") try: px = np.array(img) fpx = px.ravel() # Flatten pixel array (R, G, B, R, G, B, ...) except Exception as e: logger.error(f"Failed to convert image to numpy array: {e}", exc_info=True) raise RuntimeError(f"Image processing failed: Could not convert image to numpy array for embedding: {e}") # Convert the data length to a 4-byte binary header # This header indicates the length of the `data` block being embedded. data_length = len(data) # Check if data_length exceeds max value of an unsigned 32-bit integer (2^32 - 1) max_uint32 = (2**32) - 1 if data_length > max_uint32: # This case is highly unlikely given typical image capacity, but worth checking defensively logger.error(f"LSB Embedding: Data length ({data_length}) exceeds maximum value representable by 4-byte unsigned integer header ({max_uint32}).") raise ValueError(f"Data length ({data_length} bytes) exceeds maximum size embeddable with the 4-byte header.") data_length_header = struct.pack('>I', data_length) # 4 bytes = 32 bits data_length_binary = _d2b(data_length_header) # 32 bits logger.debug(f"LSB Embedding: Data length: {data_length} bytes. Header bytes size: {len(data_length_header)}. Header binary size: {len(data_length_binary)} bits.") # The full binary payload to embed is the header bits + data bits data_binary = _d2b(data) full_binary_payload = data_length_binary + data_binary total_payload_bits = len(full_binary_payload) total_payload_bytes = total_payload_bits // 8 # Should be LENGTH_HEADER_SIZE + len(data) logger.debug(f"LSB Embedding: Total binary payload size for embedding: {total_payload_bits} bits / {total_payload_bytes} bytes.") # Check capacity (1 bit per pixel channel in RGB) # Total available LSBs = total number of elements in the flattened pixel array (width * height * 3) total_capacity_bits = len(fpx) if total_payload_bits > total_capacity_bits: logger.error(f"LSB Embedding: Data too large for image capacity. Needed: {total_payload_bits} bits, Available: {total_capacity_bits} bits ({total_capacity_bits // 8} bytes). Image WxH*3 LSBs.") raise ValueError(f"Data too large for image capacity: {total_payload_bits} bits ({total_payload_bytes} bytes) needed, {total_capacity_bits} bits available ({total_capacity_bits // 8} bytes - Image WxH*3 LSBs).") logger.debug(f"LSB Embedding: Embedding {total_payload_bits} bits into {total_capacity_bits} available pixel channels.") # Embed the full binary payload into the LSBs try: for i in range(total_payload_bits): # Clear the last bit (AND with 0xFE, which is 11111110) # Set the last bit to the i-th bit of the binary payload (OR with 0 or 1) # Ensure pixel value is treated as int before bitwise operations fpx[i] = (int(fpx[i]) & 0xFE) | int(full_binary_payload[i]) except IndexError: # Should be caught by explicit capacity check, but defensive programming logger.error("LSB Embedding: Index error during embedding loop. Payload size exceeds image capacity.", exc_info=True) raise RuntimeError("LSB Embedding failed: Attempted to write beyond image capacity.") except Exception as e: logger.error(f"Unexpected error during LSB embedding loop: {e}", exc_info=True) raise RuntimeError(f"An unexpected error occurred during LSB embedding: {e}") # Reshape flattened pixel array back to image dimensions and create new Image object try: stego_pixels = fpx.reshape(px.shape) stego_img = Image.fromarray(stego_pixels.astype(np.uint8),'RGB') except Exception as e: logger.error(f"Failed to reshape or create image from modified numpy array: {e}", exc_info=True) raise RuntimeError(f"Image processing failed: Could not create image from stego pixels: {e}") logger.debug("LSB Embedding: Data embedding complete.") # Ensure output is PNG format to preserve LSBs return set_pil_image_format_to_png(stego_img) # RESTORED TO ORIGINAL EXTRACT LOGIC def extract_data_from_image(img_obj:Image.Image)->bytes: """ Extracts raw data bytes from the LSB of the image's RGB channels, using the leading 4-byte length header. Returns *only* the data payload that followed the header. """ logger.debug("LSB Extraction: Starting data extraction from image LSB.") # Ensure image is RGB and convert to numpy array try: img = img_obj.convert("RGB") except Exception as e: logger.error(f"Failed to convert image to RGB before LSB extraction: {e}", exc_info=True) raise RuntimeError(f"Image processing failed: Could not convert image to RGB for extraction: {e}") try: px = np.array(img) fpx = px.ravel() # Flatten pixel array (R, G, B, R, G, B, ...) total_image_pixels = len(fpx) # Total number of LSBs available (W * H * 3) except Exception as e: logger.error(f"Failed to convert image to numpy array: {e}", exc_info=True) raise RuntimeError(f"Image processing failed: Could not convert image to numpy array for extraction: {e}") logger.debug(f"LSB Extraction: Flattened image pixel array size: {total_image_pixels}.") # Extract the data length header (first 4 bytes = 32 bits) header_bits_count = LENGTH_HEADER_SIZE * 8 # 32 bits if total_image_pixels < header_bits_count: logger.error(f"LSB Extraction: Image too small for LSB header. Size: {total_image_pixels} LSB bits, Header needs {header_bits_count} bits ({LENGTH_HEADER_SIZE} bytes).") raise ValueError(f"Image is too small ({total_image_pixels} LSB bits available) to contain the LSB data length header ({LENGTH_HEADER_SIZE} bytes / {header_bits_count} bits).") logger.debug(f"LSB Extraction: Extracting LSB header ({header_bits_count} bits) from first {header_bits_count} pixel channels LSBs.") try: data_length_binary = "".join(str(int(fpx[i]) & 1) for i in range(header_bits_count)) # Ensure int() cast for safety except IndexError: # Should be caught by explicit size check, but defensive logger.error("LSB Extraction: Index error during header extraction loop.", exc_info=True) raise RuntimeError("LSB Extraction failed: Image size inconsistent during header read.") except Exception as e: logger.error(f"Unexpected error during LSB header extraction loop: {e}", exc_info=True) raise RuntimeError(f"An unexpected error occurred during LSB header extraction: {e}") try: # The header stores the *length of the data FOLLOWING the header* data_length_header_bytes = _b2B(data_length_binary) if len(data_length_header_bytes) != LENGTH_HEADER_SIZE: # Should not happen if header_bits_count is correct and _b2B gets 32 bits, but a safety check logger.error(f"LSB Extraction: Decoded header bytes have incorrect size: {len(data_length_header_bytes)}. Expected {LENGTH_HEADER_SIZE}.") raise ValueError(f"Decoded LSB header has incorrect byte size ({len(data_length_header_bytes)}). Expected {LENGTH_HEADER_SIZE}. Data corruption or not a KeyLock image.") data_length = struct.unpack('>I', data_length_header_bytes)[0] logger.debug(f"LSB Extraction: Parsed data length from LSB header: {data_length} bytes (This is the size of the crypto payload).") except ValueError as e: # Re-raise value errors from _b2B or struct.unpack raise ValueError(f"Failed to decode data length header from image LSB: {e}. Image may not be a valid KeyLock file or is corrupted.") except Exception as e: logger.error(f"LSB Extraction: Failed to decode data length header: {e}", exc_info=True) raise ValueError(f"Failed to decode data length header from image LSB: {e}. Image may not be a valid KeyLock file or is corrupted.") # Check if the indicated data length is reasonable within the image capacity # Remaining capacity after the header in bits remaining_capacity_bits = total_image_pixels - header_bits_count max_possible_data_bits = remaining_capacity_bits max_possible_data_bytes = max_possible_data_bits // 8 if data_length > max_possible_data_bytes: logger.error(f"LSB Extraction: Indicated data length ({data_length}B) exceeds remaining image capacity ({max_possible_data_bytes}B).") raise ValueError(f"Indicated data length ({data_length} bytes) in LSB header exceeds remaining image capacity ({max_possible_data_bytes} bytes after header). Image likely truncated or corrupted.") if data_length == 0: logger.info("LSB Extraction: LSB header indicates zero data length embedded. Returning empty bytes.") return b"" # Return empty bytes as per original logic for zero length # Extract the actual data bits based on the length from the header data_bits_count = data_length * 8 start_offset = header_bits_count # Start extraction *after* the header bits end_offset = start_offset + data_bits_count # End after the data bits logger.debug(f"LSB Extraction: Extracting data bits. Start offset: {start_offset} (after {LENGTH_HEADER_SIZE}B header), Data bits: {data_bits_count}, End offset: {end_offset}. Total image LSB bits: {total_image_pixels}.") if total_image_pixels < end_offset: logger.error(f"LSB Extraction: Image truncated or corrupted. Cannot extract full data. Image LSB bits: {total_image_pixels}, Required end offset: {end_offset}.") raise ValueError("Image is truncated or corrupted. Cannot extract full data based on header length.") try: data_binary = "".join(str(int(fpx[i]) & 1) for i in range(start_offset, end_offset)) # Ensure int() cast for safety except IndexError: # Should be caught by the total_image_pixels < end_offset check, but defensive logger.error("LSB Extraction: Index error during data extraction loop. Data length inconsistent with image size.", exc_info=True) raise RuntimeError("LSB Extraction failed: Data length inconsistent with image capacity.") except Exception as e: logger.error(f"Unexpected error during LSB data extraction loop: {e}", exc_info=True) raise RuntimeError(f"An unexpected error occurred during LSB data extraction: {e}") # Convert binary string back to bytes try: extracted_bytes = _b2B(data_binary) if len(extracted_bytes) != data_length: # This should only happen if data_bits_count wasn't a multiple of 8, or _b2B failed logger.error(f"LSB Extraction: Extracted bytes length ({len(extracted_bytes)}) does not match expected data length ({data_length}) from header. Binary string length: {len(data_binary)} bits.") # This indicates _b2B or bit extraction issue, likely corruption raise ValueError(f"Extracted bytes length ({len(extracted_bytes)}) does not match expected length ({data_length}) from header. Data corruption.") logger.debug(f"LSB Extraction: Successfully extracted {len(extracted_bytes)} bytes of data (the crypto payload).") # Return *only* the extracted data bytes, NOT including the LSB header. return extracted_bytes except ValueError as e: # Re-raise value errors from _b2B or length mismatch check raise e except Exception as e: logger.error(f"LSB Extraction: Failed to convert extracted bits to bytes: {e}", exc_info=True) raise ValueError(f"Failed to convert extracted bits to bytes: {e}") # --- Utility Functions --- def parse_kv_string_to_dict(kv_str:str)->dict: """Parses a string of key:value or key=value pairs into a dictionary.""" if not kv_str or not kv_str.strip(): logger.debug("Input KV string is empty or whitespace. Returning empty dict.") return {} dd={} for ln,ol in enumerate(kv_str.splitlines(),1): l=ol.strip() if not l or l.startswith('#'): # Skip empty lines or comments continue lc=l.split('#',1)[0].strip() # Remove inline comments if not lc: # Skip line if only contained comment/whitespace after comment removal continue # Use partition to handle values containing '=' or ':' if '=' in lc: p = lc.partition('=') elif ':' in lc: p = lc.partition(':') else: # If no separator is found, consider it an invalid format as per original logic logger.warning(f"L{ln}: Invalid format '{ol}'. Must contain '=' or ':'.") raise ValueError(f"Line {ln}: Invalid format '{ol}'. Each line must contain '=' or ':'.") k, sep, v = p # sep will be '=' or ':' k, v = k.strip(), v.strip() if not k: logger.warning(f"L{ln}: Empty key found in '{ol}'.") raise ValueError(f"Line {ln}: Empty key found in '{ol}'.") # Remove surrounding quotes if present, *only* if they are a matched pair if len(v) >= 2 and v[0] == v[-1] and v[0] in ("'",'"'): v=v[1:-1] # If a line was "key:", the value will be empty string, which is fine. dd[k]=v logger.debug(f"Parsed KV pair: '{k}'='{v}' from line {ln}") return dd def generate_keylock_carrier_image(w=800,h=600,msg="KeyLock Wallet")->Image.Image: """Generates a simple gradient image with a KeyLock logo.""" # Ensure width and height are positive integers w = max(100, int(w)) h = max(100, int(h)) cs,ce=(30,40,50),(70,80,90) # Start and end colors for gradient img=Image.new("RGB",(w,h),cs) # Create base image with start color draw=ImageDraw.Draw(img) # Draw gradient lines for y_ in range(h): # Calculate interpolation factor (0 at top, 1 at bottom) i = y_ / (h - 1) if h > 1 else 0.5 # Handle h=1 edge case # Interpolate colors for the current line r_,g_,b_ = (int(s_ * (1 - i) + e_ * i) for s_,e_ in zip(cs,ce)) # Draw a horizontal line draw.line([(0,y_),(w,y_)], fill=(r_,g_,b_)) # Draw KeyLock logo (circle and rectangle) ib = min(w,h) // 7 # Base size for logo elements icx,icy = w // 2, h // 3 # Center coordinates for the circle (top part of logo) cr = ib // 2 # Circle radius # Circle bounding box (left, top, right, bottom) cb = [(icx - cr, icy - cr), (icx + cr, icy + cr)] rw = ib // 4 # Rectangle width rh = ib // 2 # Rectangle height rty = icy + int(cr * 0.2) # Top Y for rectangle (positioned below circle center) # Rectangle bounding box (left, top, right, bottom) rb = [(icx - rw // 2, rty), (icx + rw // 2, rty + rh)] kc,ko = (190,195,200),(120,125,130) # Key color and outline color ow = max(1, int(ib / 30)) # Outline width, ensure minimum 1px # Draw circle and rectangle with outline draw.ellipse(cb, fill=kc, outline=ko, width=ow) draw.rectangle(rb, fill=kc, outline=ko, width=ow) # Draw message text below the logo # Adjust font size calculation slightly for potentially larger text # Base calculation: max(18 minimum, scale based on width/height ratios) fs = max(18, min(int(w/15), h//7)) # Increase min size and adjust ratios slightly fnt = _get_font(PREFERRED_FONTS, fs) # Get the font object tc = (225,230,235) # Text color sc = (max(0,s_-20) for s_ in cs) # Shadow color (slightly darker than start gradient color) tx,ty = w/2, h*.68 # Target center position for the text so = max(1, int(fs/25)) # Shadow offset, ensure minimum 1px try: # Use anchor="mm" (middle, middle) to position the text centered at (tx, ty) draw.text((tx+so,ty+so), msg, font=fnt, fill=tuple(sc), anchor="mm") # Draw shadow text draw.text((tx,ty), msg, font=fnt, fill=tc, anchor="mm") # Draw main text logger.debug(f"Drew message text '{msg}' with anchor='mm'. Font size: {fs}") except AttributeError: # Fallback for older PIL versions without anchor support logger.warning("PIL version does not support text anchor. Using manual centering fallback.") try: # Need to calculate text position manually for centering around (tx, ty) # Use _get_text_measurement for robust size calculation tw, th = _get_text_measurement(draw, msg, fnt) # Calculate top-left corner (ax, ay) to center the text block (tw, th) around (tx, ty) ax, ay = tx - tw / 2.0, ty - th / 2.0 draw.text((ax + so, ay + so), msg, font=fnt, fill=tuple(sc)) # Draw shadow text draw.text((ax, ay), msg, font=fnt, fill=tc) # Draw main text logger.debug(f"Drew message text '{msg}' with manual centering. Font size: {fs}") except Exception as e: logger.error(f"Fallback manual text centering failed: {e}", exc_info=True) # As a last resort, draw at a fixed position or raise error try: # Try drawing at a standard top-left position with padding as an absolute fallback logger.warning("Manual centering failed, attempting fallback text draw at margin position.") draw.text((margin, margin), msg, font=fnt, fill=tc) except Exception as draw_e: logger.error(f"Failed to draw text overlay even at fixed position: {draw_e}", exc_info=True) # Image generation might proceed without text if drawing fails completely return img def _get_text_measurement(draw_obj, text_str, font_obj): """ Returns (width, height) of text using the best available Pillow method. Handles potential errors and provides fallbacks. """ if not text_str: return 0, 0 try: # textbbox is most accurate for TrueType fonts (Pillow 8.0.0+) # Returns (x1, y1, x2, y2) bounding box relative to origin (0,0) bbox = draw_obj.textbbox((0, 0), text_str, font=font_obj) width = bbox[2] - bbox[0] height = bbox[3] - bbox[1] # Basic sanity check: if width or height are zero for non-empty string, use fallback if width > 0 and height > 0: return width, height else: logger.debug(f"textbbox returned zero dimensions for '{text_str[:20]}...'. Attempting fallback.") raise ValueError("textbbox returned zero dimensions.") # Force fallback except Exception: try: # textsize is another method (often less accurate than textbbox) - Older Pillow versions width, height = draw_obj.textsize(text_str, font=font_obj) if width > 0 and height > 0: return width, height else: logger.debug(f"textsize returned zero dimensions for '{text_str[:20]}...'. Attempting fallback.") raise ValueError("textsize returned zero dimensions.") # Force fallback except (AttributeError, ValueError): try: # getsize is also an older method - potentially on the font object itself width, height = font_obj.getsize(text_str) if width > 0 and height > 0: return width, height else: logger.debug(f"getsize returned zero dimensions for '{text_str[:20]}...'. Attempting fallback.") raise ValueError("getsize returned zero dimensions.") # Force fallback except (AttributeError, ValueError): try: # Fallback to simple estimation based on font size (very rough) # This assumes monospaced or average character width char_width_approx = font_obj.size * 0.6 # Estimate character width char_height_approx = font_obj.size # Estimate line height width_est = int(len(text_str) * char_width_approx) height_est = int(char_height_approx) logger.warning(f"Using estimated text size for '{text_str[:20]}...': ({width_est}, {height_est}). Font: {getattr(font_obj, 'path', 'default')}, Size: {font_obj.size}") # Ensure estimate is positive for non-empty string if width_est > 0 and height_est > 0: return width_est, height_est else: logger.debug(f"Estimation returned zero dimensions for '{text_str[:20]}...'. Using final hardcoded fallback.") raise ValueError("Estimation returned zero dimensions.") # Force final fallback except Exception: logger.warning("Failed to estimate text size using any method for '{text_str[:20]}...'. Returning hardcoded default.") # Final hardcoded fallback if all else fails # Return minimum 1 for dimensions for non-empty string return max(1, len(text_str) * 8), max(1, 10 + font_obj.size // 5) def draw_key_list_dropdown_overlay(image: Image.Image, keys: list[str] = None, title: str = "Data Embedded (RSA Encrypted)") -> Image.Image: """ Draws a visual overlay on the image, including a title and optional list of keys. Uses the top-right corner. """ # Return original image if nothing to draw if not title and (keys is None or not keys): logger.debug("No title or keys to display overlay. Returning original image.") return set_pil_image_format_to_png(image.copy()) # Create a copy and convert to RGBA for alpha blending img_overlayed = image.copy() if img_overlayed.mode != 'RGBA': img_overlayed = img_overlayed.convert('RGBA') draw = ImageDraw.Draw(img_overlayed) margin = 10 # Margin from image edges # Padding inside overlay boxes (pixels) padding = { 'title_h_pad': 6, # Vertical padding inside title bar 'title_w_pad': 10, # Horizontal padding inside title bar 'key_h_pad': 5, # Vertical padding around each key line 'key_w_pad': 10 # Horizontal padding around each key line } line_spacing = 4 # Extra space between key lines in the list title_bg_color=(60,60,60,180) # Dark grey, semi-transparent (RGBA) title_text_color=(230,230,90) # Yellowish key_list_bg_color=(50,50,50,160) # Slightly darker grey, semi-transparent (RGBA) key_text_color=(210,210,210) # Light grey ellipsis_color=(170,170,170) # Grey for ellipsis # Calculate overlay box width based on image width, within min/max limits OVERLAY_TARGET_WIDTH_RATIO = 0.30 MIN_OVERLAY_WIDTH_PX = 180 MAX_OVERLAY_WIDTH_PX = 500 final_overlay_box_width = int(image.width * OVERLAY_TARGET_WIDTH_RATIO) final_overlay_box_width = max(MIN_OVERLAY_WIDTH_PX, final_overlay_box_width) final_overlay_box_width = min(MAX_OVERLAY_WIDTH_PX, final_overlay_box_width) final_overlay_box_width = min(final_overlay_box_width, image.width - 2 * margin) # Ensure it fits with margins if final_overlay_box_width <= 2 * max(padding['title_w_pad'], padding['key_w_pad']): # Ensure enough space for padding logger.warning("Calculated overlay box width is too small for horizontal padding. Cannot draw overlay.") if img_overlayed.mode == 'RGBA': img_overlayed = img_overlayed.convert('RGB') return set_pil_image_format_to_png(img_overlayed) logger.debug(f"Overlay box width calculated: {final_overlay_box_width} px.") # Calculate font sizes based on image/overlay dimensions TITLE_FONT_HEIGHT_RATIO = 0.030 # Relative to image height MIN_TITLE_FONT_SIZE = 14 MAX_TITLE_FONT_SIZE = 28 title_font_size = int(image.height * TITLE_FONT_HEIGHT_RATIO) # Refine based on required characters per line in the overlay width if title is very long if title: # Estimate average character width relative to font size (e.g., 0.6) estimated_char_width_ratio = 0.6 # Calculate required font size if title text must fit horizontally within padding available_width_for_title_text = final_overlay_box_width - 2 * padding['title_w_pad'] # Avoid division by zero if len(title) is 0 or est_char_width_ratio is 0 if len(title) > 0 and estimated_char_width_ratio > 0: required_font_size_for_width = available_width_for_title_text / (len(title) * estimated_char_width_ratio) title_font_size = min(title_font_size, int(required_font_size_for_width)) title_font_size = max(MIN_TITLE_FONT_SIZE, title_font_size) title_font_size = min(MAX_TITLE_FONT_SIZE, title_font_size) title_font = _get_font(PREFERRED_FONTS, title_font_size) logger.debug(f"Title font size selected: {title_font_size}") KEY_FONT_HEIGHT_RATIO = 0.025 # Relative to image height MIN_KEY_FONT_SIZE = 12 MAX_KEY_FONT_SIZE = 22 key_font_size = int(image.height * KEY_FONT_HEIGHT_RATIO) # Refine key font size based on average expected character width relative to overlay width key_font_size_from_overlay_width = int(final_overlay_box_width * 0.07) # heuristic ratio key_font_size = min(key_font_size, key_font_size_from_overlay_width) key_font_size = max(MIN_KEY_FONT_SIZE, key_font_size) key_font_size = min(MAX_KEY_FONT_SIZE, key_font_size) key_font = _get_font(PREFERRED_FONTS, key_font_size) logger.debug(f"Key list font size selected: {key_font_size}") # Calculate text dimensions for initial layout planning actual_title_w, actual_title_h = _get_text_measurement(draw, title, title_font) logger.debug(f"Measured title size: ({actual_title_w}, {actual_title_h})") disp_keys, actual_key_text_widths, key_line_heights = [], [], [] total_keys_render_h_estimate = 0 # Estimate total height needed *before* truncation logic if keys: # Limit keys to display (includes space for ellipsis line if needed) limited_keys = keys[:MAX_KEYS_TO_DISPLAY_OVERLAY-1] if len(keys) > MAX_KEYS_TO_DISPLAY_OVERLAY else keys for kt in limited_keys: disp_keys.append(kt) kw, kh = _get_text_measurement(draw, kt, key_font) actual_key_text_widths.append(kw) key_line_heights.append(kh) # Store height based on original text total_keys_render_h_estimate += kh # Add ellipsis line text if keys were truncated if len(keys) > MAX_KEYS_TO_DISPLAY_OVERLAY: ellipsis_text = f"... ({len(keys)-(MAX_KEYS_TO_DISPLAY_OVERLAY-1)} more)" disp_keys.append(ellipsis_text) ew, eh = _get_text_measurement(draw, ellipsis_text, key_font) actual_key_text_widths.append(ew) key_line_heights.append(eh) # Store height for ellipsis line total_keys_render_h_estimate += eh # Add line spacing height estimate if len(disp_keys) > 1: total_keys_render_h_estimate += line_spacing * (len(disp_keys) - 1) # --- Draw Title Bar (Top-Right) --- title_bar_h = actual_title_h + 2 * padding['title_h_pad'] title_bar_x1 = image.width - margin # Right edge title_bar_x0 = title_bar_x1 - final_overlay_box_width # Left edge using final_overlay_box_width title_bar_y0 = margin # Top edge title_bar_y1 = title_bar_y0 + title_bar_h # Bottom edge # Ensure title bar has positive dimensions if title_bar_x0 >= title_bar_x1 or title_bar_y0 >= title_bar_y1: logger.warning("Calculated title bar dimensions are invalid. Cannot draw title overlay.") else: draw.rectangle([(title_bar_x0,title_bar_y0),(title_bar_x1,title_bar_y1)],fill=title_bg_color) # Draw title text (centered horizontally in the available space) available_width_for_title_text = final_overlay_box_width - 2 * padding['title_w_pad'] title_text_draw_x = title_bar_x0 + padding['title_w_pad'] + max(0, (available_width_for_title_text - actual_title_w) / 2) title_text_draw_y = title_bar_y0 + padding['title_h_pad'] draw.text((title_text_draw_x, title_text_draw_y), title, font=title_font, fill=title_text_color) logger.debug(f"Drew title '{title}' at ({title_text_draw_x}, {title_text_draw_y})") # --- Draw Key List Box --- if disp_keys: key_list_box_h_ideal = total_keys_render_h_estimate + 2 * padding['key_h_pad'] key_list_x0, key_list_x1 = title_bar_x0, title_bar_x1 # Align key list box with title bar key_list_y0 = title_bar_y1 # Start immediately below title bar # Calculate the actual height needed considering image bottom boundary max_key_list_box_h = image.height - margin - key_list_y0 current_key_list_box_h = min(key_list_box_h_ideal, max_key_list_box_h) key_list_y1 = key_list_y0 + current_key_list_box_h # Check if there's enough vertical space to draw at least the first key line (including padding) min_required_key_list_height = (key_line_heights[0] if key_line_heights else 0) + 2 * padding['key_h_pad'] if current_key_list_box_h < min_required_key_list_height and len(disp_keys) > 0: logger.warning(f"Not enough vertical space ({current_key_list_box_h}px) to draw key list overlay (needed min {min_required_key_list_height}px).") # Convert back to RGB if it was converted if img_overlayed.mode == 'RGBA': img_overlayed = img_overlayed.convert('RGB') return set_pil_image_format_to_png(img_overlayed) # Ensure key list box has positive dimensions if key_list_x0 >= key_list_x1 or key_list_y0 >= key_list_y1: logger.warning("Calculated key list box dimensions are invalid. Cannot draw key list overlay.") else: draw.rectangle([(key_list_x0,key_list_y0),(key_list_x1,key_list_y1)],fill=key_list_bg_color) logger.debug(f"Drew key list box from ({key_list_x0},{key_list_y0}) to ({key_list_x1},{key_list_y1})") current_text_y = key_list_y0 + padding['key_h_pad'] available_text_width_for_keys = final_overlay_box_width - 2 * padding['key_w_pad'] for i, key_text_item in enumerate(disp_keys): # Get height of the *original* item text for initial vertical check # This uses the pre-calculated height which should be accurate based on the original string estimated_line_h = key_line_heights[i] if i < len(key_line_heights) else _get_text_measurement(draw, key_text_item, key_font)[1] if estimated_line_h <= 0 and len(key_text_item) > 0: estimated_line_h = key_font.size # Fallback height # Calculate the space needed *including* line spacing for the *next* line IF there is one space_needed_for_this_line_and_next_gap = estimated_line_h + (line_spacing if i < len(disp_keys)-1 else 0) # Check vertical fit *before* potentially drawing this line # This checks if adding this line and the potential space after it would exceed the bounds if current_text_y + space_needed_for_this_line_and_next_gap > key_list_y1 - padding['key_h_pad']: # This line and the gap *after* it won't fit. # Check if we can draw *just* an ellipsis for truncation indication at the current Y. if not key_text_item.endswith("..."): # Avoid drawing ellipsis twice if it's already the final "..." line ellipsis_str = "..." ellipsis_w, ellipsis_h = _get_text_measurement(draw, ellipsis_str, key_font) # Check if just the ellipsis fits vertically within the remaining space if current_text_y + ellipsis_h <= key_list_y1 - padding['key_h_pad']: # Draw the ellipsis centered horizontally ellipsis_draw_x = key_list_x0 + padding['key_w_pad'] + max(0, (available_text_width_for_keys - ellipsis_w) / 2) draw.text((ellipsis_draw_x, current_text_y), ellipsis_str, font=key_font, fill=ellipsis_color) logger.debug(f"Drew final ellipsis at ({ellipsis_draw_x}, {current_text_y}) due to vertical space limit.") break # Stop processing/drawing further lines # If it fits vertically: Determine text to draw with horizontal truncation. # --- Determine text_to_draw and its dimensions AFTER horizontal truncation --- original_key_text_w = actual_key_text_widths[i] if i < len(actual_key_text_widths) else _get_text_measurement(draw, key_text_item, key_font)[0] text_to_draw = key_text_item # Start assuming no truncation if original_key_text_w > available_text_width_for_keys and not key_text_item.startswith("..."): temp_text = key_text_item ellipsis_str = "..." # Recalculate ellipsis width based on key_font ellipsis_w, _ = _get_text_measurement(draw, ellipsis_str, key_font) # Ensure space_for_text is not negative space_for_text = max(0, available_text_width_for_keys - ellipsis_w) truncated = False # Truncate from the end until `temp_text + ellipsis_str` fits horizontally # Use _get_text_measurement for width calculation while _get_text_measurement(draw, temp_text + ellipsis_str, key_font)[0] > available_text_width_for_keys and len(temp_text) > 0: temp_text = temp_text[:-1] truncated = True # Add ellipsis if truncation happened text_to_draw = temp_text + ellipsis_str if truncated else temp_text # Re-calculate actual dimensions *after* truncation for drawing and Y update final_key_text_w, current_line_actual_h = _get_text_measurement(draw, text_to_draw, key_font) # Aggressive fallback truncation if needed (less likely now with robust _get_text_measurement) # Check again if the truncated text + ellipsis fits horizontally if final_key_text_w > available_text_width_for_keys: logger.warning(f"Aggressive truncation needed for key: '{key_text_item[:20]}...'. Final text '{text_to_draw[:20]}...' still too wide ({final_key_text_w}px > {available_text_width_for_keys}px). Re-truncating.") # Estimate max characters that fit using a rough ratio est_chars_that_fit = int(available_text_width_for_keys / max(1, key_font_size * 0.5)) # Use a slightly smaller ratio for safety text_to_draw = key_text_item[:max(0, est_chars_that_fit - len(ellipsis_str))] + ellipsis_str # Recalculate dimensions after aggressive truncation final_key_text_w, current_line_actual_h = _get_text_measurement(draw, text_to_draw, key_font) else: # No horizontal truncation needed for this specific item or it's an ellipsis line text_to_draw = key_text_item # Use the original text item # Get actual dimensions for this line (might be different from estimated if _get_text_measurement varies) final_key_text_w, current_line_actual_h = _get_text_measurement(draw, text_to_draw, key_font) # Ensure actual height is positive for non-empty string if current_line_actual_h <= 0 and len(text_to_draw) > 0: logger.warning(f"Measured height zero for text '{text_to_draw[:20]}...'. Using estimated height: {key_font.size}") current_line_actual_h = key_font.size # Fallback to font size as height estimate # --- DRAW THE TEXT --- # Center text horizontally within the available padding space key_text_draw_x = key_list_x0 + padding['key_w_pad'] + max(0, (available_text_width_for_keys - final_key_text_w) / 2) # Determine text color: ellipsis color if it's the final "more" ellipsis line # or if truncation resulted in adding "..." to a regular key name. text_color_to_use = key_text_color if key_text_item.startswith("...") or text_to_draw.endswith("...") and text_to_draw != key_text_item: text_color_to_use = ellipsis_color draw.text((key_text_draw_x, current_text_y), text_to_draw, font=key_font, fill=text_color_to_use) logger.debug(f"Drew key item '{text_to_draw}' at ({key_text_draw_x}, {current_text_y}). Measured height: {current_line_actual_h}") # --- UPDATE Y POSITION FOR NEXT LINE --- # Move down by the *actual* height of the line just drawn current_text_y += current_line_actual_h # Add spacing only if there are more lines to draw AND we haven't hit the vertical boundary # The boundary check at the start of the loop should prevent drawing the spacing if the next line won't fit if i < len(disp_keys)-1: current_text_y += line_spacing # Convert back to RGB if it was converted to RGBA # This flattens the layers, making the alpha blending permanent. if img_overlayed.mode == 'RGBA': try: img_overlayed = img_overlayed.convert('RGB') logger.debug("Converted overlayed image back to RGB.") except Exception as e: logger.error(f"Failed to convert overlayed image back to RGB: {e}", exc_info=True) # Decide how to handle this failure - maybe return RGBA or raise error # For now, let's proceed with RGBA if RGB conversion fails, though it might not be desired. pass # Keep as RGBA if conversion fails # Final step: Ensure the output is in PNG format to preserve LSBs # This step also handles the RGB -> PNG conversion robustly. return set_pil_image_format_to_png(img_overlayed)