Spaces:
Sleeping
Sleeping
# keylock/core.py | |
import io | |
import json | |
import os | |
import struct | |
import logging | |
import traceback | |
import base64 | |
from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.hazmat.primitives import serialization | |
from cryptography.hazmat.primitives.asymmetric import rsa, padding | |
from cryptography.exceptions import InvalidTag, InvalidSignature, InvalidKey | |
from PIL import Image, ImageDraw, ImageFont | |
import numpy as np | |
logger = logging.getLogger(__name__) | |
# Configure logger if not already configured | |
if not logger.hasHandlers(): | |
handler = logging.StreamHandler() | |
# Using the format from the original code for consistency | |
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(module)s - %(lineno)d - %(message)s') | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
logger.setLevel(logging.DEBUG) # Keep DEBUG level for now to aid troubleshooting | |
# Constants from the original code (kept for clarity, though some are specific to older crypto) | |
SALT_SIZE = 16 # Not used in hybrid AESGCM | |
NONCE_SIZE = 12 # Matches AES_GCM_NONCE_SIZE_CRYPTO | |
TAG_SIZE = 16 # Matches AES_GCM_TAG_SIZE_CRYPTO | |
PBKDF2_ITERATIONS = 390_000 # Not used in hybrid RSA-AES | |
LENGTH_HEADER_SIZE = 4 # The 4-byte header added by the LSB embedder, indicates size of crypto payload *following* this header | |
# Constants for the NEW hybrid encryption payload structure (what gets embedded AFTER the LSB header) | |
# Crypto Payload Structure: | |
# [Encrypted AES Key Length (4 bytes)] [Encrypted AES Key (RSA_Key_Size_Bytes)] [AES Nonce (12 bytes)] [AES GCM Ciphertext] [AES GCM Tag (16 bytes)] | |
ENCRYPTED_AES_KEY_LEN_SIZE_CRYPTO = 4 # Bytes (Size of the length field *within* the crypto payload) | |
AES_GCM_NONCE_SIZE_CRYPTO = 12 # Bytes (Size of the Nonce *within* the crypto payload) | |
AES_GCM_TAG_SIZE_CRYPTO = 16 # Bytes (Size of the Tag *within* the crypto payload) | |
AES_KEY_SIZE_CRYPTO = 32 # Bytes (for AES-256, the size of the key that gets RSA encrypted) | |
RSA_KEY_SIZE_DEFAULT = 2048 # Bits (Default size for RSA key generation) | |
PREFERRED_FONTS = ["Verdana", "Arial", "DejaVu Sans", "Calibri", "Helvetica", "Roboto-Regular", "sans-serif"] | |
MAX_KEYS_TO_DISPLAY_OVERLAY = 12 | |
def _get_font(preferred_fonts, base_size): | |
""" | |
Attempts to load a TrueType font from a list of preferred fonts. | |
Falls back to PIL's default font if preferred fonts are not found. | |
Ensures the font is loaded at the specified base_size. | |
""" | |
fp = None | |
# Ensure base_size is a positive integer for ImageFont.truetype | |
safe_base_size = int(base_size) | |
if safe_base_size <= 0: | |
safe_base_size = 10 # Default to a small positive size | |
# First, try to find a usable font path or name | |
for n in preferred_fonts: | |
try: | |
# Test loading with a small size to find the file path without large memory allocation | |
# The actual font object will be created with safe_base_size later. | |
ImageFont.truetype(n.lower()+".ttf", 10) # Check .ttf first | |
fp = n.lower() + ".ttf" | |
break | |
except IOError: | |
try: | |
ImageFont.truetype(n, 10) # Check bare name (might be system font) | |
fp = n | |
break | |
except IOError: | |
continue # Try next font | |
# If a font path was found, attempt to load it with the desired size | |
if fp: | |
try: | |
logger.debug(f"Found font path/name '{fp}'. Attempting to load with size {safe_base_size}.") | |
return ImageFont.truetype(fp, safe_base_size) | |
except IOError as e: | |
logger.warning(f"Font '{fp}' load failed with size {safe_base_size}: {e}. Attempting default font.") | |
except Exception as e: | |
logger.warning(f"Unexpected error loading font '{fp}' with size {safe_base_size}: {e}. Attempting default font.", exc_info=True) | |
# Fallback to PIL's default font | |
logger.warning("Could not load preferred or system font. Using PIL's load_default.") | |
try: | |
# Try to load default with size if Pillow version supports it (Pillow 10+) | |
try: | |
default_font = ImageFont.load_default(size=safe_base_size) | |
logger.debug(f"Loaded default font with size {safe_base_size}.") | |
return default_font | |
except TypeError: # Older Pillow doesn't support 'size' argument for load_default | |
logger.warning("Pillow version does not support sizing load_default. Using unsized default.") | |
default_font = ImageFont.load_default() | |
# Note: Unsized default font is often a small bitmap font. | |
return default_font | |
except Exception as e: | |
logger.error(f"Failed to load PIL's default font: {e}", exc_info=True) | |
# As a last resort, raise an error as text drawing is likely impossible | |
raise IOError("Failed to load any font for image overlay.") | |
def set_pil_image_format_to_png(image:Image.Image)->Image.Image: | |
"""Ensures image format is set to PNG by saving/reloading. PNG preserves LSBs.""" | |
buf=io.BytesIO() | |
try: | |
# Save the image to an in-memory buffer as PNG | |
image.save(buf, format='PNG') | |
buf.seek(0) # Rewind the buffer to the beginning | |
# Open the image from the buffer | |
reloaded = Image.open(buf) | |
# Explicitly set format attribute, although Image.open often sets it | |
reloaded.format = "PNG" | |
logger.debug(f"Converted image to PNG format. Original mode: {image.mode}, New mode: {reloaded.mode}") | |
return reloaded | |
except Exception as e: | |
logger.error(f"Failed to convert image to PNG format: {e}", exc_info=True) | |
# Re-raise the exception as PNG conversion is crucial for LSB preservation | |
raise RuntimeError(f"Failed to process image for PNG conversion: {e}") | |
# --- RSA Key Management --- | |
def generate_rsa_key_pair(key_size_bits: int = RSA_KEY_SIZE_DEFAULT) -> tuple[bytes, bytes]: | |
"""Generates a new RSA private and public key pair in PEM format.""" | |
logger.debug(f"Generating RSA key pair with size: {key_size_bits} bits.") | |
private_key = rsa.generate_private_key( | |
public_exponent=65537, | |
key_size=key_size_bits | |
) | |
public_key = private_key.public_key() | |
private_pem = private_key.private_bytes( | |
encoding=serialization.Encoding.PEM, | |
format=serialization.PrivateFormat.PKCS8, # PKCS8 is recommended | |
encryption_algorithm=serialization.NoEncryption() # No password for simplicity in this app's context; user must secure file/paste. | |
) | |
public_pem = public_key.public_bytes( | |
encoding=serialization.Encoding.PEM, | |
format=serialization.PublicFormat.SubjectPublicKeyInfo | |
) | |
logger.debug("RSA key pair generated successfully.") | |
return private_pem, public_pem | |
def _clean_pem_bytes(pem_data: bytes | str) -> bytes: | |
"""Strips leading/trailing whitespace and converts string to bytes if needed.""" | |
if isinstance(pem_data, str): | |
# Remove common surrounding strings like backticks if present (heuristic) | |
pem_data = pem_data.strip().strip('```') | |
pem_data = pem_data.encode('utf-8') | |
else: | |
pem_data = pem_data.strip() | |
return pem_data | |
def load_rsa_private_key_from_pem(pem_bytes: bytes | str) -> rsa.RSAPrivateKey: | |
"""Loads an RSA private key from PEM bytes, stripping surrounding data/whitespace.""" | |
try: | |
# Clean input bytes/string | |
cleaned_pem = _clean_pem_bytes(pem_bytes) | |
logger.debug(f"Cleaned private key PEM data (first 50 bytes): {cleaned_pem[:50]}...") | |
# Attempt to load the key | |
try: | |
private_key = serialization.load_pem_private_key( | |
cleaned_pem, | |
password=None # Assumes key file is not password-protected | |
) | |
except (TypeError, ValueError, InvalidKey) as e: | |
# Catch cryptography errors specifically related to format or encryption | |
err_str = str(e).lower() | |
if "password" in err_str or "decryption failed" in err_str: | |
logger.error(f"Failed to load RSA private key: It appears password-protected. {e}") | |
raise ValueError("Private key appears to be password-protected. KeyLock requires an unencrypted key.") | |
if "malformed" in err_str or "invalid pem" in err_str or "unsupported key type" in err_str: | |
logger.error(f"Failed to parse RSA private key (parsing error): {e}", exc_info=True) | |
raise ValueError(f"Failed to parse RSA private key. Ensure it is a valid, unencrypted PKCS8 or traditional PEM format key: {e}") | |
logger.error(f"Failed to load RSA private key (cryptography error): {e}", exc_info=True) | |
raise ValueError(f"Failed to load RSA private key due to a cryptography error: {e}") | |
except Exception as e: | |
logger.error(f"Unexpected error loading RSA private key: {e}", exc_info=True) | |
raise RuntimeError(f"An unexpected error occurred loading the private key: {e}") | |
if not isinstance(private_key, rsa.RSAPrivateKey): | |
logger.error("Loaded object is not an RSA private key instance.") | |
raise TypeError("Loaded key is not a valid RSA private key.") | |
logger.debug(f"RSA private key loaded successfully (size: {private_key.key_size} bits).") | |
return private_key | |
except (ValueError, TypeError, RuntimeError, InvalidKey) as e: | |
# Re-raise exceptions with specific messages if they originated from this function | |
if isinstance(e, (ValueError, TypeError, RuntimeError)) and ( | |
"password" in str(e).lower() or | |
"parse" in str(e).lower() or | |
"private key appears" in str(e) or | |
"cryptography error" in str(e) or | |
"Loaded key is not a valid RSA private key" in str(e) | |
): | |
raise e | |
# Catch anything else and wrap in a generic error | |
logger.error(f"Final exception during private key load: {e}", exc_info=True) | |
raise ValueError(f"Failed to load RSA private key: {e}") | |
def load_rsa_public_key_from_pem(pem_bytes: bytes | str) -> rsa.RSAPublicKey: | |
"""Loads an RSA public key from PEM bytes, stripping surrounding data/whitespace.""" | |
try: | |
# Clean input bytes/string | |
cleaned_pem = _clean_pem_bytes(pem_bytes) | |
logger.debug(f"Cleaned public key PEM data (first 50 bytes): {cleaned_pem[:50]}...") | |
# Attempt to load the key | |
try: | |
public_key = serialization.load_pem_public_key(cleaned_pem) | |
except (ValueError, InvalidKey) as e: | |
# Catch cryptography errors specifically related to format | |
err_str = str(e).lower() | |
if "malformed" in err_str or "invalid pem" in err_str or "unsupported key type" in err_str: | |
logger.error(f"Failed to parse RSA public key (parsing error): {e}", exc_info=True) | |
raise ValueError(f"Failed to parse RSA public key. Ensure it is a valid PEM format key: {e}") | |
logger.error(f"Failed to load RSA public key (cryptography error): {e}", exc_info=True) | |
raise ValueError(f"Failed to load RSA public key due to a cryptography error: {e}") | |
except Exception as e: | |
logger.error(f"Unexpected error loading RSA public key: {e}", exc_info=True) | |
raise RuntimeError(f"An unexpected error occurred loading the public key: {e}") | |
if not isinstance(public_key, rsa.RSAPublicKey): | |
logger.error("Loaded object is not an RSA public key instance.") | |
raise TypeError("Loaded key is not a valid RSA public key.") | |
logger.debug(f"RSA public key loaded successfully (size: {public_key.key_size} bits).") | |
return public_key | |
except (ValueError, TypeError, RuntimeError, InvalidKey) as e: | |
# Re-raise exceptions with specific messages if they originated from this function | |
if isinstance(e, (ValueError, TypeError, RuntimeError)) and ( | |
"parse" in str(e).lower() or | |
"cryptography error" in str(e) or | |
"Loaded key is not a valid RSA public key" in str(e) | |
): | |
raise e | |
# Catch anything else and wrap in a generic error | |
logger.error(f"Final exception during public key load: {e}", exc_info=True) | |
raise ValueError(f"Failed to load RSA public key: {e}") | |
# --- Hybrid Encryption/Decryption --- | |
def encrypt_data_hybrid(data: bytes, recipient_public_key_pem: bytes | str) -> bytes: | |
""" | |
Encrypts data using AES-GCM with a random key, then encrypts the AES key | |
using the recipient's RSA public key. Returns the combined crypto payload: | |
[Encrypted AES Key Length (4 bytes)] [Encrypted AES Key] [AES Nonce (12 bytes)] [AES GCM Ciphertext] [AES GCM Tag (16 bytes)] | |
""" | |
logger.debug(f"Starting hybrid encryption of {len(data)} bytes.") | |
try: | |
# Ensure public key is loaded correctly, handling potential string input and cleaning | |
public_key = load_rsa_public_key_from_pem(recipient_public_key_pem) | |
logger.debug(f"Using RSA public key size: {public_key.key_size} bits.") | |
# 1. Generate a random AES key (AES-256 requires 32 bytes) | |
aes_key = os.urandom(AES_KEY_SIZE_CRYPTO) | |
logger.debug(f"Generated random AES key ({AES_KEY_SIZE_CRYPTO} bytes).") | |
# 2. Encrypt data using AES-GCM with the random key | |
aesgcm = AESGCM(aes_key) | |
# Generate a unique nonce for this encryption operation | |
nonce = os.urandom(AES_GCM_NONCE_SIZE_CRYPTO) | |
logger.debug(f"Generated AES-GCM nonce ({AES_GCM_NONCE_SIZE_CRYPTO} bytes).") | |
# AESGCM.encrypt(nonce, plaintext, associated_data=None) returns ciphertext + tag | |
ciphertext_with_tag = aesgcm.encrypt(nonce, data, None) # No AAD used here | |
logger.debug(f"AES encrypted data (ciphertext+tag) size: {len(ciphertext_with_tag)} bytes.") | |
# 3. Encrypt the random AES key using the recipient's RSA public key | |
try: | |
rsa_encrypted_aes_key = public_key.encrypt( | |
aes_key, | |
padding.OAEP( | |
mgf=padding.MGF1(algorithm=hashes.SHA256()), | |
algorithm=hashes.SHA256(), | |
label=None # Optional: Could add a label here if needed, but requires recipient knowing it | |
) | |
) | |
logger.debug(f"RSA encrypted AES key size: {len(rsa_encrypted_aes_key)} bytes.") | |
except Exception as e: | |
logger.error(f"RSA encryption of AES key failed: {e}", exc_info=True) | |
# Catch specific RSA errors during encryption (less common than decryption failures) | |
if "data too large" in str(e).lower(): | |
# OAEP padding adds overhead. Key size must be large enough for AES_KEY_SIZE_CRYPTO + padding. | |
# RSA key size in bits must be > (AES_KEY_SIZE_CRYPTO + 2*SHA256_digest_size + 2) * 8 | |
# For AES-256 (32B) and SHA256 (32B digest), this is > (32 + 2*32 + 2) * 8 = (32 + 64 + 2) * 8 = 98 * 8 = 784 bits. | |
# 2048 bits (256 bytes) is ample. This error is more likely if the RSA key size is somehow misreported or corrupted. | |
raise ValueError(f"RSA encryption of symmetric key failed: The symmetric key size ({AES_KEY_SIZE_CRYPTO}B) might be too large for the RSA key size ({public_key.key_size} bits) with OAEP padding. Use a larger RSA key. Original error: {e}") | |
raise ValueError(f"RSA encryption of the symmetric key failed: {e}. Check public key validity or RSA key size.") | |
# 4. Combine the crypto payload structure for embedding | |
# Structure: [Encrypted AES Key Length (4 bytes)] [Encrypted AES Key] [AES Nonce (12 bytes)] [AES GCM Ciphertext + AES GCM Tag (16 bytes)] | |
encrypted_aes_key_len_bytes = struct.pack('>I', len(rsa_encrypted_aes_key)) | |
logger.debug(f"Packed encrypted AES key length ({len(rsa_encrypted_aes_key)} bytes). Header bytes size: {len(encrypted_aes_key_len_bytes)}.") | |
# The combined payload includes: | |
# 1. Length of the RSA encrypted key (4 bytes) | |
# 2. The RSA encrypted key bytes | |
# 3. The AES Nonce (12 bytes) -> Use the 'nonce' variable directly | |
# 4. The AES Ciphertext + Tag bytes -> Use 'ciphertext_with_tag' | |
combined_crypto_payload = encrypted_aes_key_len_bytes + rsa_encrypted_aes_key + nonce + ciphertext_with_tag | |
logger.info(f"Hybrid encryption successful. Total combined crypto payload size: {len(combined_crypto_payload)} bytes.") | |
return combined_crypto_payload | |
except (ValueError, RuntimeError, InvalidKey) as e: | |
logger.error(f"Hybrid encryption error (caught specific exception): {e}", exc_info=True) | |
raise e # Re-raise value/runtime/invalidkey errors from sub-functions | |
except Exception as e: | |
# Catch any other unexpected errors | |
logger.error(f"Unexpected hybrid encryption error: {e}", exc_info=True) | |
raise RuntimeError(f"An unexpected error occurred during encryption: {e}") | |
def decrypt_data_hybrid(crypto_payload_from_lsb: bytes, recipient_private_key_pem: bytes | str) -> bytes: | |
""" | |
Decrypts the crypto payload (extracted from LSB, *without* its header) | |
using the recipient's RSA private key. | |
""" | |
logger.debug(f"Starting hybrid decryption. Received crypto_payload_from_lsb size: {len(crypto_payload_from_lsb)} bytes.") | |
# The input 'crypto_payload_from_lsb' is expected to be the raw bytes extracted | |
# by the LSB layer *after* its 4-byte header. | |
# This payload should have the structure: | |
# [Encrypted AES Key Length (4 bytes)] [Encrypted AES Key] [AES Nonce (12 bytes)] [AES GCM Ciphertext] [AES GCM Tag (16 bytes)] | |
try: | |
# Ensure private key is loaded correctly, handling potential string input and cleaning | |
private_key = load_rsa_private_key_from_pem(recipient_private_key_pem) | |
logger.debug(f"Using RSA private key with size: {private_key.key_size} bits for decryption.") | |
# 1. Parse the combined crypto payload | |
# Minimum size check needs to be based on the *crypto* payload structure now | |
# It needs 4 bytes for encrypted_aes_key_len + min RSA output size + AES nonce + AES tag | |
# The encrypted key size will be exactly the RSA key size in bytes (e.g., 256 bytes for 2048 bits). | |
rsa_key_size_bytes = private_key.key_size // 8 | |
# The minimum payload must contain the len field, the encrypted key, the nonce, and the tag. | |
min_expected_crypto_payload_size = ENCRYPTED_AES_KEY_LEN_SIZE_CRYPTO + rsa_key_size_bytes + AES_GCM_NONCE_SIZE_CRYPTO + AES_GCM_TAG_SIZE_CRYPTO | |
logger.debug(f"Expected min crypto payload size based on private key ({rsa_key_size_bytes}B RSA key): {min_expected_crypto_payload_size} bytes.") | |
if len(crypto_payload_from_lsb) < min_expected_crypto_payload_size: | |
logger.error(f"Crypto payload too short. Size: {len(crypto_payload_from_lsb)}, Expected min: {min_expected_crypto_payload_size}") | |
raise ValueError(f"Crypto payload extracted from image is too short ({len(crypto_payload_from_lsb)} bytes). Expected at least {min_expected_crypto_payload_size} bytes. Image likely corrupted or does not contain a valid RSA-encrypted payload.") | |
# Extract the length of the encrypted AES key (first 4 bytes of the crypto payload) | |
encrypted_aes_key_len_bytes = crypto_payload_from_lsb[:ENCRYPTED_AES_KEY_LEN_SIZE_CRYPTO] | |
try: | |
encrypted_aes_key_len = struct.unpack('>I', encrypted_aes_key_len_bytes)[0] | |
except struct.error as e: | |
logger.error(f"Failed to unpack encrypted AES key length header: {e}", exc_info=True) | |
raise ValueError(f"Failed to read encrypted AES key length header from crypto payload: {e}. Payload corrupted.") | |
logger.debug(f"Parsed encrypted AES key length: {encrypted_aes_key_len} bytes.") | |
# Validate the indicated encrypted_aes_key_len | |
# It should be equal to the RSA key size in bytes used during encryption | |
expected_rsa_encrypted_aes_key_size = private_key.key_size // 8 | |
if encrypted_aes_key_len != expected_rsa_encrypted_aes_key_size: | |
logger.error(f"Parsed encrypted AES key length ({encrypted_aes_key_len}B) does not match private key size in bytes ({expected_rsa_encrypted_aes_key_size}B).") | |
# This check is crucial and can catch key mismatches or corruption early | |
raise ValueError(f"Encrypted AES key length mismatch ({encrypted_aes_key_len} bytes). Expected {expected_rsa_encrypted_aes_key_size} bytes based on private key size ({private_key.key_size} bits). Indicates incorrect key pair or data corruption.") | |
# Calculate offsets for parsing the rest of the payload | |
rsa_encrypted_aes_key_start = ENCRYPTED_AES_KEY_LEN_SIZE_CRYPTO # 4 | |
rsa_encrypted_aes_key_end = rsa_encrypted_aes_key_start + encrypted_aes_key_len | |
aes_nonce_start = rsa_encrypted_aes_key_end | |
aes_nonce_end = aes_nonce_start + AES_GCM_NONCE_SIZE_CRYPTO # 12 bytes after encrypted key | |
aes_ciphertext_with_tag_start = aes_nonce_end | |
logger.debug(f"Calculated offsets for parsing crypto payload: RSA Encrypted Key starts at {rsa_encrypted_aes_key_start}, ends at {rsa_encrypted_aes_key_end}. AES Nonce starts at {aes_nonce_start}, ends at {aes_nonce_end}. AES Ciphertext+Tag starts at {aes_ciphertext_with_tag_start}.") | |
# Check if the payload is long enough for the declared encrypted key length + nonce + tag | |
# Note: Ciphertext can be 0 length, but tag is always 16 bytes for AES-GCM | |
required_min_payload_after_encrypted_key = AES_GCM_NONCE_SIZE_CRYPTO + AES_GCM_TAG_SIZE_CRYPTO | |
if len(crypto_payload_from_lsb) < rsa_encrypted_aes_key_end + required_min_payload_after_encrypted_key: | |
logger.error(f"Crypto payload too short based on parsed lengths and minimum requirements. Size: {len(crypto_payload_from_lsb)}, Required min based on parsed length: {rsa_encrypted_aes_key_end + required_min_payload_after_encrypted_key} ({ENCRYPTED_AES_KEY_LEN_SIZE_CRYPTO}B len + {encrypted_aes_key_len}B RSA key + 12B Nonce + 16B Tag).") | |
raise ValueError("Crypto payload extracted from image is truncated or corrupted based on parsed lengths.") | |
# Extract the RSA-encrypted AES key | |
rsa_encrypted_aes_key = crypto_payload_from_lsb[rsa_encrypted_aes_key_start : rsa_encrypted_aes_key_end] | |
logger.debug(f"Extracted RSA encrypted AES key. Size: {len(rsa_encrypted_aes_key)} bytes.") | |
# Extract the AES Nonce | |
aes_nonce = crypto_payload_from_lsb[aes_nonce_start : aes_nonce_end] | |
logger.debug(f"Extracted AES Nonce. Size: {len(aes_nonce)} bytes.") | |
# Explicit check for nonce size (Added for robustness) | |
if len(aes_nonce) != AES_GCM_NONCE_SIZE_CRYPTO: | |
logger.error(f"Extracted AES nonce has incorrect size: {len(aes_nonce)} bytes. Expected: {AES_GCM_NONCE_SIZE_CRYPTO} bytes.") | |
raise ValueError(f"Extracted AES nonce has incorrect size ({len(aes_nonce)} bytes). Expected {AES_GCM_NONCE_SIZE_CRYPTO} bytes. Indicates payload corruption or incorrect parsing.") | |
# Extract the AES ciphertext and tag | |
aes_ciphertext_with_tag = crypto_payload_from_lsb[aes_ciphertext_with_tag_start:] | |
logger.debug(f"Extracted AES ciphertext+tag. Size: {len(aes_ciphertext_with_tag)} bytes.") | |
# Check minimum size for tag (ciphertext can be empty, but tag is always 16B) | |
if len(aes_ciphertext_with_tag) < AES_GCM_TAG_SIZE_CRYPTO: | |
logger.error(f"AES ciphertext+tag part too short ({len(aes_ciphertext_with_tag)}B). Expected min {AES_GCM_TAG_SIZE_CRYPTO}B (tag).") | |
raise ValueError("AES ciphertext+tag part is too short (missing tag).") | |
# 2. Decrypt the encrypted AES key using the recipient's RSA private key | |
try: | |
recovered_aes_key = private_key.decrypt( | |
rsa_encrypted_aes_key, | |
padding.OAEP( | |
mgf=padding.MGF1(algorithm=hashes.SHA256()), | |
algorithm=hashes.SHA256(), | |
label=None | |
) | |
) | |
logger.debug(f"RSA decryption of AES key successful. Recovered key size: {len(recovered_aes_key)} bytes.") | |
except Exception as e: | |
logger.error(f"RSA decryption of AES key failed: {e}", exc_info=True) | |
# Specific error for RSA decryption failure | |
# Catch common RSA decryption errors and map to a helpful message | |
err_msg = str(e).lower() | |
if "decryption failed" in err_msg or "data too large" in err_msg or "oaep" in err_msg or "private key" in err_msg or "signature" in err_msg: | |
raise ValueError(f"RSA decryption of the symmetric key failed. This typically means the incorrect private key was used, or the embedded encrypted key data is corrupted. Original error: {e}") | |
else: | |
raise RuntimeError(f"An unexpected error occurred during RSA decryption: {e}") | |
# Check the length of the recovered key - it must be AES_KEY_SIZE_CRYPTO | |
if len(recovered_aes_key) != AES_KEY_SIZE_CRYPTO: | |
logger.error(f"RSA decryption resulted in key of unexpected length ({len(recovered_aes_key)}B). Expected {AES_KEY_SIZE_CRYPTO}B.") | |
# This is a strong indicator of a key pair mismatch or severe corruption | |
raise ValueError(f"RSA decryption produced an AES key of unexpected length ({len(recovered_aes_key)} bytes). This strongly suggests the private key used does not match the public key used for encryption, or the data is corrupted.") | |
# 3. Decrypt the data using the recovered AES key and nonce/tag | |
logger.debug("Attempting AES-GCM decryption of data...") | |
aesgcm = AESGCM(recovered_aes_key) | |
# Note: cryptography's AESGCM.decrypt expects nonce, ciphertext+tag, aad | |
try: | |
decrypted_data = aesgcm.decrypt(aes_nonce, aes_ciphertext_with_tag, None) # No AAD used here | |
except InvalidTag: | |
# This is the primary way AES-GCM indicates authentication/decryption failure | |
logger.error("AES-GCM decryption failed: Invalid tag.", exc_info=True) | |
raise ValueError("Decryption failed (InvalidTag): The private key used is likely incorrect or the image data is corrupted.") | |
except Exception as e: | |
logger.error(f"Unexpected error during AES-GCM decryption: {e}", exc_info=True) | |
raise RuntimeError(f"An unexpected error occurred during AES-GCM decryption: {e}") | |
logger.info("Hybrid decryption process completed successfully.") | |
return decrypted_data | |
except (ValueError, InvalidTag, InvalidKey, TypeError, RuntimeError) as e: | |
# Catch specific exceptions and re-raise with more context | |
logger.error(f"Hybrid decryption error (caught specific exception): {e}", exc_info=True) | |
# Specific error messages should already be set if they originated from sub-functions we wrapped | |
raise e # Re-raise the exception as is | |
except Exception as e: | |
# Catch any other unexpected errors | |
logger.error(f"Unexpected hybrid decryption error: {e}", exc_info=True) | |
raise RuntimeError(f"An unexpected error occurred during decryption: {e}") | |
# --- LSB Steganography --- | |
def _d2b(d:bytes)->str: | |
"""Convert bytes to a binary string.""" | |
return ''.join(format(b,'08b') for b in d) | |
def _b2B(b:str)->bytes: | |
"""Convert a binary string to bytes.""" | |
# The binary string should ideally be a multiple of 8 bits. | |
# If not, it indicates corruption or an issue with extraction. | |
# We will NOT pad here during conversion, as it would hide truncation errors. | |
if len(b)%8!=0: | |
logger.error(f"Binary string length ({len(b)}) is not a multiple of 8 during _b2B conversion.") | |
# Raise an error instead of padding, as padding could mask corruption | |
raise ValueError(f"Binary string length ({len(b)}) is not a multiple of 8. Data corruption suspected.") | |
try: | |
return bytes(int(b[i:i+8],2) for i in range(0,len(b),8)) | |
except ValueError as e: | |
logger.error(f"Failed to convert binary string to bytes: {e}. Input binary string length: {len(b)}") | |
# Provide part of the binary string for debugging | |
log_b = b[:100] + ('...' if len(b)>100 else '') | |
logger.debug(f"Input binary string (first 100 chars): {log_b}") | |
raise ValueError(f"Failed to convert binary string to bytes: {e}. Data might be corrupted.") | |
def embed_data_in_image(img_obj:Image.Image,data:bytes)->Image.Image: | |
""" | |
Embeds raw data bytes (`data`) into the LSB of the image's RGB channels, | |
prefixed with a 4-byte length header. The `data` should be the output | |
of the crypto layer (e.g., the combined_crypto_payload from encrypt_data_hybrid). | |
""" | |
logger.debug(f"LSB Embedding: Received {len(data)} bytes of data from crypto layer (the crypto payload).") | |
# Create a copy and ensure image is RGB | |
# Using 'RGB' mode ensures 3 channels. If input is RGBA, A channel is dropped. | |
# This is crucial for predictable LSB embedding across different input image types. | |
try: | |
img = img_obj.convert("RGB") | |
except Exception as e: | |
logger.error(f"Failed to convert image to RGB before LSB embedding: {e}", exc_info=True) | |
raise RuntimeError(f"Image processing failed: Could not convert image to RGB for embedding: {e}") | |
try: | |
px = np.array(img) | |
fpx = px.ravel() # Flatten pixel array (R, G, B, R, G, B, ...) | |
except Exception as e: | |
logger.error(f"Failed to convert image to numpy array: {e}", exc_info=True) | |
raise RuntimeError(f"Image processing failed: Could not convert image to numpy array for embedding: {e}") | |
# Convert the data length to a 4-byte binary header | |
# This header indicates the length of the `data` block being embedded. | |
data_length = len(data) | |
# Check if data_length exceeds max value of an unsigned 32-bit integer (2^32 - 1) | |
max_uint32 = (2**32) - 1 | |
if data_length > max_uint32: | |
# This case is highly unlikely given typical image capacity, but worth checking defensively | |
logger.error(f"LSB Embedding: Data length ({data_length}) exceeds maximum value representable by 4-byte unsigned integer header ({max_uint32}).") | |
raise ValueError(f"Data length ({data_length} bytes) exceeds maximum size embeddable with the 4-byte header.") | |
data_length_header = struct.pack('>I', data_length) # 4 bytes = 32 bits | |
data_length_binary = _d2b(data_length_header) # 32 bits | |
logger.debug(f"LSB Embedding: Data length: {data_length} bytes. Header bytes size: {len(data_length_header)}. Header binary size: {len(data_length_binary)} bits.") | |
# The full binary payload to embed is the header bits + data bits | |
data_binary = _d2b(data) | |
full_binary_payload = data_length_binary + data_binary | |
total_payload_bits = len(full_binary_payload) | |
total_payload_bytes = total_payload_bits // 8 # Should be LENGTH_HEADER_SIZE + len(data) | |
logger.debug(f"LSB Embedding: Total binary payload size for embedding: {total_payload_bits} bits / {total_payload_bytes} bytes.") | |
# Check capacity (1 bit per pixel channel in RGB) | |
# Total available LSBs = total number of elements in the flattened pixel array (width * height * 3) | |
total_capacity_bits = len(fpx) | |
if total_payload_bits > total_capacity_bits: | |
logger.error(f"LSB Embedding: Data too large for image capacity. Needed: {total_payload_bits} bits, Available: {total_capacity_bits} bits ({total_capacity_bits // 8} bytes). Image WxH*3 LSBs.") | |
raise ValueError(f"Data too large for image capacity: {total_payload_bits} bits ({total_payload_bytes} bytes) needed, {total_capacity_bits} bits available ({total_capacity_bits // 8} bytes - Image WxH*3 LSBs).") | |
logger.debug(f"LSB Embedding: Embedding {total_payload_bits} bits into {total_capacity_bits} available pixel channels.") | |
# Embed the full binary payload into the LSBs | |
try: | |
for i in range(total_payload_bits): | |
# Clear the last bit (AND with 0xFE, which is 11111110) | |
# Set the last bit to the i-th bit of the binary payload (OR with 0 or 1) | |
# Ensure pixel value is treated as int before bitwise operations | |
fpx[i] = (int(fpx[i]) & 0xFE) | int(full_binary_payload[i]) | |
except IndexError: | |
# Should be caught by explicit capacity check, but defensive programming | |
logger.error("LSB Embedding: Index error during embedding loop. Payload size exceeds image capacity.", exc_info=True) | |
raise RuntimeError("LSB Embedding failed: Attempted to write beyond image capacity.") | |
except Exception as e: | |
logger.error(f"Unexpected error during LSB embedding loop: {e}", exc_info=True) | |
raise RuntimeError(f"An unexpected error occurred during LSB embedding: {e}") | |
# Reshape flattened pixel array back to image dimensions and create new Image object | |
try: | |
stego_pixels = fpx.reshape(px.shape) | |
stego_img = Image.fromarray(stego_pixels.astype(np.uint8),'RGB') | |
except Exception as e: | |
logger.error(f"Failed to reshape or create image from modified numpy array: {e}", exc_info=True) | |
raise RuntimeError(f"Image processing failed: Could not create image from stego pixels: {e}") | |
logger.debug("LSB Embedding: Data embedding complete.") | |
# Ensure output is PNG format to preserve LSBs | |
return set_pil_image_format_to_png(stego_img) | |
# RESTORED TO ORIGINAL EXTRACT LOGIC | |
def extract_data_from_image(img_obj:Image.Image)->bytes: | |
""" | |
Extracts raw data bytes from the LSB of the image's RGB channels, | |
using the leading 4-byte length header. Returns *only* the data payload | |
that followed the header. | |
""" | |
logger.debug("LSB Extraction: Starting data extraction from image LSB.") | |
# Ensure image is RGB and convert to numpy array | |
try: | |
img = img_obj.convert("RGB") | |
except Exception as e: | |
logger.error(f"Failed to convert image to RGB before LSB extraction: {e}", exc_info=True) | |
raise RuntimeError(f"Image processing failed: Could not convert image to RGB for extraction: {e}") | |
try: | |
px = np.array(img) | |
fpx = px.ravel() # Flatten pixel array (R, G, B, R, G, B, ...) | |
total_image_pixels = len(fpx) # Total number of LSBs available (W * H * 3) | |
except Exception as e: | |
logger.error(f"Failed to convert image to numpy array: {e}", exc_info=True) | |
raise RuntimeError(f"Image processing failed: Could not convert image to numpy array for extraction: {e}") | |
logger.debug(f"LSB Extraction: Flattened image pixel array size: {total_image_pixels}.") | |
# Extract the data length header (first 4 bytes = 32 bits) | |
header_bits_count = LENGTH_HEADER_SIZE * 8 # 32 bits | |
if total_image_pixels < header_bits_count: | |
logger.error(f"LSB Extraction: Image too small for LSB header. Size: {total_image_pixels} LSB bits, Header needs {header_bits_count} bits ({LENGTH_HEADER_SIZE} bytes).") | |
raise ValueError(f"Image is too small ({total_image_pixels} LSB bits available) to contain the LSB data length header ({LENGTH_HEADER_SIZE} bytes / {header_bits_count} bits).") | |
logger.debug(f"LSB Extraction: Extracting LSB header ({header_bits_count} bits) from first {header_bits_count} pixel channels LSBs.") | |
try: | |
data_length_binary = "".join(str(int(fpx[i]) & 1) for i in range(header_bits_count)) # Ensure int() cast for safety | |
except IndexError: | |
# Should be caught by explicit size check, but defensive | |
logger.error("LSB Extraction: Index error during header extraction loop.", exc_info=True) | |
raise RuntimeError("LSB Extraction failed: Image size inconsistent during header read.") | |
except Exception as e: | |
logger.error(f"Unexpected error during LSB header extraction loop: {e}", exc_info=True) | |
raise RuntimeError(f"An unexpected error occurred during LSB header extraction: {e}") | |
try: | |
# The header stores the *length of the data FOLLOWING the header* | |
data_length_header_bytes = _b2B(data_length_binary) | |
if len(data_length_header_bytes) != LENGTH_HEADER_SIZE: | |
# Should not happen if header_bits_count is correct and _b2B gets 32 bits, but a safety check | |
logger.error(f"LSB Extraction: Decoded header bytes have incorrect size: {len(data_length_header_bytes)}. Expected {LENGTH_HEADER_SIZE}.") | |
raise ValueError(f"Decoded LSB header has incorrect byte size ({len(data_length_header_bytes)}). Expected {LENGTH_HEADER_SIZE}. Data corruption or not a KeyLock image.") | |
data_length = struct.unpack('>I', data_length_header_bytes)[0] | |
logger.debug(f"LSB Extraction: Parsed data length from LSB header: {data_length} bytes (This is the size of the crypto payload).") | |
except ValueError as e: | |
# Re-raise value errors from _b2B or struct.unpack | |
raise ValueError(f"Failed to decode data length header from image LSB: {e}. Image may not be a valid KeyLock file or is corrupted.") | |
except Exception as e: | |
logger.error(f"LSB Extraction: Failed to decode data length header: {e}", exc_info=True) | |
raise ValueError(f"Failed to decode data length header from image LSB: {e}. Image may not be a valid KeyLock file or is corrupted.") | |
# Check if the indicated data length is reasonable within the image capacity | |
# Remaining capacity after the header in bits | |
remaining_capacity_bits = total_image_pixels - header_bits_count | |
max_possible_data_bits = remaining_capacity_bits | |
max_possible_data_bytes = max_possible_data_bits // 8 | |
if data_length > max_possible_data_bytes: | |
logger.error(f"LSB Extraction: Indicated data length ({data_length}B) exceeds remaining image capacity ({max_possible_data_bytes}B).") | |
raise ValueError(f"Indicated data length ({data_length} bytes) in LSB header exceeds remaining image capacity ({max_possible_data_bytes} bytes after header). Image likely truncated or corrupted.") | |
if data_length == 0: | |
logger.info("LSB Extraction: LSB header indicates zero data length embedded. Returning empty bytes.") | |
return b"" # Return empty bytes as per original logic for zero length | |
# Extract the actual data bits based on the length from the header | |
data_bits_count = data_length * 8 | |
start_offset = header_bits_count # Start extraction *after* the header bits | |
end_offset = start_offset + data_bits_count # End after the data bits | |
logger.debug(f"LSB Extraction: Extracting data bits. Start offset: {start_offset} (after {LENGTH_HEADER_SIZE}B header), Data bits: {data_bits_count}, End offset: {end_offset}. Total image LSB bits: {total_image_pixels}.") | |
if total_image_pixels < end_offset: | |
logger.error(f"LSB Extraction: Image truncated or corrupted. Cannot extract full data. Image LSB bits: {total_image_pixels}, Required end offset: {end_offset}.") | |
raise ValueError("Image is truncated or corrupted. Cannot extract full data based on header length.") | |
try: | |
data_binary = "".join(str(int(fpx[i]) & 1) for i in range(start_offset, end_offset)) # Ensure int() cast for safety | |
except IndexError: | |
# Should be caught by the total_image_pixels < end_offset check, but defensive | |
logger.error("LSB Extraction: Index error during data extraction loop. Data length inconsistent with image size.", exc_info=True) | |
raise RuntimeError("LSB Extraction failed: Data length inconsistent with image capacity.") | |
except Exception as e: | |
logger.error(f"Unexpected error during LSB data extraction loop: {e}", exc_info=True) | |
raise RuntimeError(f"An unexpected error occurred during LSB data extraction: {e}") | |
# Convert binary string back to bytes | |
try: | |
extracted_bytes = _b2B(data_binary) | |
if len(extracted_bytes) != data_length: | |
# This should only happen if data_bits_count wasn't a multiple of 8, or _b2B failed | |
logger.error(f"LSB Extraction: Extracted bytes length ({len(extracted_bytes)}) does not match expected data length ({data_length}) from header. Binary string length: {len(data_binary)} bits.") | |
# This indicates _b2B or bit extraction issue, likely corruption | |
raise ValueError(f"Extracted bytes length ({len(extracted_bytes)}) does not match expected length ({data_length}) from header. Data corruption.") | |
logger.debug(f"LSB Extraction: Successfully extracted {len(extracted_bytes)} bytes of data (the crypto payload).") | |
# Return *only* the extracted data bytes, NOT including the LSB header. | |
return extracted_bytes | |
except ValueError as e: | |
# Re-raise value errors from _b2B or length mismatch check | |
raise e | |
except Exception as e: | |
logger.error(f"LSB Extraction: Failed to convert extracted bits to bytes: {e}", exc_info=True) | |
raise ValueError(f"Failed to convert extracted bits to bytes: {e}") | |
# --- Utility Functions --- | |
def parse_kv_string_to_dict(kv_str:str)->dict: | |
"""Parses a string of key:value or key=value pairs into a dictionary.""" | |
if not kv_str or not kv_str.strip(): | |
logger.debug("Input KV string is empty or whitespace. Returning empty dict.") | |
return {} | |
dd={} | |
for ln,ol in enumerate(kv_str.splitlines(),1): | |
l=ol.strip() | |
if not l or l.startswith('#'): # Skip empty lines or comments | |
continue | |
lc=l.split('#',1)[0].strip() # Remove inline comments | |
if not lc: # Skip line if only contained comment/whitespace after comment removal | |
continue | |
# Use partition to handle values containing '=' or ':' | |
if '=' in lc: | |
p = lc.partition('=') | |
elif ':' in lc: | |
p = lc.partition(':') | |
else: | |
# If no separator is found, consider it an invalid format as per original logic | |
logger.warning(f"L{ln}: Invalid format '{ol}'. Must contain '=' or ':'.") | |
raise ValueError(f"Line {ln}: Invalid format '{ol}'. Each line must contain '=' or ':'.") | |
k, sep, v = p # sep will be '=' or ':' | |
k, v = k.strip(), v.strip() | |
if not k: | |
logger.warning(f"L{ln}: Empty key found in '{ol}'.") | |
raise ValueError(f"Line {ln}: Empty key found in '{ol}'.") | |
# Remove surrounding quotes if present, *only* if they are a matched pair | |
if len(v) >= 2 and v[0] == v[-1] and v[0] in ("'",'"'): | |
v=v[1:-1] | |
# If a line was "key:", the value will be empty string, which is fine. | |
dd[k]=v | |
logger.debug(f"Parsed KV pair: '{k}'='{v}' from line {ln}") | |
return dd | |
def generate_keylock_carrier_image(w=800,h=600,msg="KeyLock Wallet")->Image.Image: | |
"""Generates a simple gradient image with a KeyLock logo.""" | |
# Ensure width and height are positive integers | |
w = max(100, int(w)) | |
h = max(100, int(h)) | |
cs,ce=(30,40,50),(70,80,90) # Start and end colors for gradient | |
img=Image.new("RGB",(w,h),cs) # Create base image with start color | |
draw=ImageDraw.Draw(img) | |
# Draw gradient lines | |
for y_ in range(h): | |
# Calculate interpolation factor (0 at top, 1 at bottom) | |
i = y_ / (h - 1) if h > 1 else 0.5 # Handle h=1 edge case | |
# Interpolate colors for the current line | |
r_,g_,b_ = (int(s_ * (1 - i) + e_ * i) for s_,e_ in zip(cs,ce)) | |
# Draw a horizontal line | |
draw.line([(0,y_),(w,y_)], fill=(r_,g_,b_)) | |
# Draw KeyLock logo (circle and rectangle) | |
ib = min(w,h) // 7 # Base size for logo elements | |
icx,icy = w // 2, h // 3 # Center coordinates for the circle (top part of logo) | |
cr = ib // 2 # Circle radius | |
# Circle bounding box (left, top, right, bottom) | |
cb = [(icx - cr, icy - cr), (icx + cr, icy + cr)] | |
rw = ib // 4 # Rectangle width | |
rh = ib // 2 # Rectangle height | |
rty = icy + int(cr * 0.2) # Top Y for rectangle (positioned below circle center) | |
# Rectangle bounding box (left, top, right, bottom) | |
rb = [(icx - rw // 2, rty), (icx + rw // 2, rty + rh)] | |
kc,ko = (190,195,200),(120,125,130) # Key color and outline color | |
ow = max(1, int(ib / 30)) # Outline width, ensure minimum 1px | |
# Draw circle and rectangle with outline | |
draw.ellipse(cb, fill=kc, outline=ko, width=ow) | |
draw.rectangle(rb, fill=kc, outline=ko, width=ow) | |
# Draw message text below the logo | |
# Adjust font size calculation slightly for potentially larger text | |
# Base calculation: max(18 minimum, scale based on width/height ratios) | |
fs = max(18, min(int(w/15), h//7)) # Increase min size and adjust ratios slightly | |
fnt = _get_font(PREFERRED_FONTS, fs) # Get the font object | |
tc = (225,230,235) # Text color | |
sc = (max(0,s_-20) for s_ in cs) # Shadow color (slightly darker than start gradient color) | |
tx,ty = w/2, h*.68 # Target center position for the text | |
so = max(1, int(fs/25)) # Shadow offset, ensure minimum 1px | |
try: | |
# Use anchor="mm" (middle, middle) to position the text centered at (tx, ty) | |
draw.text((tx+so,ty+so), msg, font=fnt, fill=tuple(sc), anchor="mm") # Draw shadow text | |
draw.text((tx,ty), msg, font=fnt, fill=tc, anchor="mm") # Draw main text | |
logger.debug(f"Drew message text '{msg}' with anchor='mm'. Font size: {fs}") | |
except AttributeError: # Fallback for older PIL versions without anchor support | |
logger.warning("PIL version does not support text anchor. Using manual centering fallback.") | |
try: | |
# Need to calculate text position manually for centering around (tx, ty) | |
# Use _get_text_measurement for robust size calculation | |
tw, th = _get_text_measurement(draw, msg, fnt) | |
# Calculate top-left corner (ax, ay) to center the text block (tw, th) around (tx, ty) | |
ax, ay = tx - tw / 2.0, ty - th / 2.0 | |
draw.text((ax + so, ay + so), msg, font=fnt, fill=tuple(sc)) # Draw shadow text | |
draw.text((ax, ay), msg, font=fnt, fill=tc) # Draw main text | |
logger.debug(f"Drew message text '{msg}' with manual centering. Font size: {fs}") | |
except Exception as e: | |
logger.error(f"Fallback manual text centering failed: {e}", exc_info=True) | |
# As a last resort, draw at a fixed position or raise error | |
try: | |
# Try drawing at a standard top-left position with padding as an absolute fallback | |
logger.warning("Manual centering failed, attempting fallback text draw at margin position.") | |
draw.text((margin, margin), msg, font=fnt, fill=tc) | |
except Exception as draw_e: | |
logger.error(f"Failed to draw text overlay even at fixed position: {draw_e}", exc_info=True) | |
# Image generation might proceed without text if drawing fails completely | |
return img | |
def _get_text_measurement(draw_obj, text_str, font_obj): | |
""" | |
Returns (width, height) of text using the best available Pillow method. | |
Handles potential errors and provides fallbacks. | |
""" | |
if not text_str: return 0, 0 | |
try: | |
# textbbox is most accurate for TrueType fonts (Pillow 8.0.0+) | |
# Returns (x1, y1, x2, y2) bounding box relative to origin (0,0) | |
bbox = draw_obj.textbbox((0, 0), text_str, font=font_obj) | |
width = bbox[2] - bbox[0] | |
height = bbox[3] - bbox[1] | |
# Basic sanity check: if width or height are zero for non-empty string, use fallback | |
if width > 0 and height > 0: | |
return width, height | |
else: | |
logger.debug(f"textbbox returned zero dimensions for '{text_str[:20]}...'. Attempting fallback.") | |
raise ValueError("textbbox returned zero dimensions.") # Force fallback | |
except Exception: | |
try: | |
# textsize is another method (often less accurate than textbbox) - Older Pillow versions | |
width, height = draw_obj.textsize(text_str, font=font_obj) | |
if width > 0 and height > 0: | |
return width, height | |
else: | |
logger.debug(f"textsize returned zero dimensions for '{text_str[:20]}...'. Attempting fallback.") | |
raise ValueError("textsize returned zero dimensions.") # Force fallback | |
except (AttributeError, ValueError): | |
try: | |
# getsize is also an older method - potentially on the font object itself | |
width, height = font_obj.getsize(text_str) | |
if width > 0 and height > 0: | |
return width, height | |
else: | |
logger.debug(f"getsize returned zero dimensions for '{text_str[:20]}...'. Attempting fallback.") | |
raise ValueError("getsize returned zero dimensions.") # Force fallback | |
except (AttributeError, ValueError): | |
try: | |
# Fallback to simple estimation based on font size (very rough) | |
# This assumes monospaced or average character width | |
char_width_approx = font_obj.size * 0.6 # Estimate character width | |
char_height_approx = font_obj.size # Estimate line height | |
width_est = int(len(text_str) * char_width_approx) | |
height_est = int(char_height_approx) | |
logger.warning(f"Using estimated text size for '{text_str[:20]}...': ({width_est}, {height_est}). Font: {getattr(font_obj, 'path', 'default')}, Size: {font_obj.size}") | |
# Ensure estimate is positive for non-empty string | |
if width_est > 0 and height_est > 0: | |
return width_est, height_est | |
else: | |
logger.debug(f"Estimation returned zero dimensions for '{text_str[:20]}...'. Using final hardcoded fallback.") | |
raise ValueError("Estimation returned zero dimensions.") # Force final fallback | |
except Exception: | |
logger.warning("Failed to estimate text size using any method for '{text_str[:20]}...'. Returning hardcoded default.") | |
# Final hardcoded fallback if all else fails | |
# Return minimum 1 for dimensions for non-empty string | |
return max(1, len(text_str) * 8), max(1, 10 + font_obj.size // 5) | |
def draw_key_list_dropdown_overlay(image: Image.Image, keys: list[str] = None, title: str = "Data Embedded (RSA Encrypted)") -> Image.Image: | |
""" | |
Draws a visual overlay on the image, including a title and optional list of keys. | |
Uses the top-right corner. | |
""" | |
# Return original image if nothing to draw | |
if not title and (keys is None or not keys): | |
logger.debug("No title or keys to display overlay. Returning original image.") | |
return set_pil_image_format_to_png(image.copy()) | |
# Create a copy and convert to RGBA for alpha blending | |
img_overlayed = image.copy() | |
if img_overlayed.mode != 'RGBA': | |
img_overlayed = img_overlayed.convert('RGBA') | |
draw = ImageDraw.Draw(img_overlayed) | |
margin = 10 # Margin from image edges | |
# Padding inside overlay boxes (pixels) | |
padding = { | |
'title_h_pad': 6, # Vertical padding inside title bar | |
'title_w_pad': 10, # Horizontal padding inside title bar | |
'key_h_pad': 5, # Vertical padding around each key line | |
'key_w_pad': 10 # Horizontal padding around each key line | |
} | |
line_spacing = 4 # Extra space between key lines in the list | |
title_bg_color=(60,60,60,180) # Dark grey, semi-transparent (RGBA) | |
title_text_color=(230,230,90) # Yellowish | |
key_list_bg_color=(50,50,50,160) # Slightly darker grey, semi-transparent (RGBA) | |
key_text_color=(210,210,210) # Light grey | |
ellipsis_color=(170,170,170) # Grey for ellipsis | |
# Calculate overlay box width based on image width, within min/max limits | |
OVERLAY_TARGET_WIDTH_RATIO = 0.30 | |
MIN_OVERLAY_WIDTH_PX = 180 | |
MAX_OVERLAY_WIDTH_PX = 500 | |
final_overlay_box_width = int(image.width * OVERLAY_TARGET_WIDTH_RATIO) | |
final_overlay_box_width = max(MIN_OVERLAY_WIDTH_PX, final_overlay_box_width) | |
final_overlay_box_width = min(MAX_OVERLAY_WIDTH_PX, final_overlay_box_width) | |
final_overlay_box_width = min(final_overlay_box_width, image.width - 2 * margin) # Ensure it fits with margins | |
if final_overlay_box_width <= 2 * max(padding['title_w_pad'], padding['key_w_pad']): # Ensure enough space for padding | |
logger.warning("Calculated overlay box width is too small for horizontal padding. Cannot draw overlay.") | |
if img_overlayed.mode == 'RGBA': img_overlayed = img_overlayed.convert('RGB') | |
return set_pil_image_format_to_png(img_overlayed) | |
logger.debug(f"Overlay box width calculated: {final_overlay_box_width} px.") | |
# Calculate font sizes based on image/overlay dimensions | |
TITLE_FONT_HEIGHT_RATIO = 0.030 # Relative to image height | |
MIN_TITLE_FONT_SIZE = 14 | |
MAX_TITLE_FONT_SIZE = 28 | |
title_font_size = int(image.height * TITLE_FONT_HEIGHT_RATIO) | |
# Refine based on required characters per line in the overlay width if title is very long | |
if title: | |
# Estimate average character width relative to font size (e.g., 0.6) | |
estimated_char_width_ratio = 0.6 | |
# Calculate required font size if title text must fit horizontally within padding | |
available_width_for_title_text = final_overlay_box_width - 2 * padding['title_w_pad'] | |
# Avoid division by zero if len(title) is 0 or est_char_width_ratio is 0 | |
if len(title) > 0 and estimated_char_width_ratio > 0: | |
required_font_size_for_width = available_width_for_title_text / (len(title) * estimated_char_width_ratio) | |
title_font_size = min(title_font_size, int(required_font_size_for_width)) | |
title_font_size = max(MIN_TITLE_FONT_SIZE, title_font_size) | |
title_font_size = min(MAX_TITLE_FONT_SIZE, title_font_size) | |
title_font = _get_font(PREFERRED_FONTS, title_font_size) | |
logger.debug(f"Title font size selected: {title_font_size}") | |
KEY_FONT_HEIGHT_RATIO = 0.025 # Relative to image height | |
MIN_KEY_FONT_SIZE = 12 | |
MAX_KEY_FONT_SIZE = 22 | |
key_font_size = int(image.height * KEY_FONT_HEIGHT_RATIO) | |
# Refine key font size based on average expected character width relative to overlay width | |
key_font_size_from_overlay_width = int(final_overlay_box_width * 0.07) # heuristic ratio | |
key_font_size = min(key_font_size, key_font_size_from_overlay_width) | |
key_font_size = max(MIN_KEY_FONT_SIZE, key_font_size) | |
key_font_size = min(MAX_KEY_FONT_SIZE, key_font_size) | |
key_font = _get_font(PREFERRED_FONTS, key_font_size) | |
logger.debug(f"Key list font size selected: {key_font_size}") | |
# Calculate text dimensions for initial layout planning | |
actual_title_w, actual_title_h = _get_text_measurement(draw, title, title_font) | |
logger.debug(f"Measured title size: ({actual_title_w}, {actual_title_h})") | |
disp_keys, actual_key_text_widths, key_line_heights = [], [], [] | |
total_keys_render_h_estimate = 0 # Estimate total height needed *before* truncation logic | |
if keys: | |
# Limit keys to display (includes space for ellipsis line if needed) | |
limited_keys = keys[:MAX_KEYS_TO_DISPLAY_OVERLAY-1] if len(keys) > MAX_KEYS_TO_DISPLAY_OVERLAY else keys | |
for kt in limited_keys: | |
disp_keys.append(kt) | |
kw, kh = _get_text_measurement(draw, kt, key_font) | |
actual_key_text_widths.append(kw) | |
key_line_heights.append(kh) # Store height based on original text | |
total_keys_render_h_estimate += kh | |
# Add ellipsis line text if keys were truncated | |
if len(keys) > MAX_KEYS_TO_DISPLAY_OVERLAY: | |
ellipsis_text = f"... ({len(keys)-(MAX_KEYS_TO_DISPLAY_OVERLAY-1)} more)" | |
disp_keys.append(ellipsis_text) | |
ew, eh = _get_text_measurement(draw, ellipsis_text, key_font) | |
actual_key_text_widths.append(ew) | |
key_line_heights.append(eh) # Store height for ellipsis line | |
total_keys_render_h_estimate += eh | |
# Add line spacing height estimate | |
if len(disp_keys) > 1: total_keys_render_h_estimate += line_spacing * (len(disp_keys) - 1) | |
# --- Draw Title Bar (Top-Right) --- | |
title_bar_h = actual_title_h + 2 * padding['title_h_pad'] | |
title_bar_x1 = image.width - margin # Right edge | |
title_bar_x0 = title_bar_x1 - final_overlay_box_width # Left edge using final_overlay_box_width | |
title_bar_y0 = margin # Top edge | |
title_bar_y1 = title_bar_y0 + title_bar_h # Bottom edge | |
# Ensure title bar has positive dimensions | |
if title_bar_x0 >= title_bar_x1 or title_bar_y0 >= title_bar_y1: | |
logger.warning("Calculated title bar dimensions are invalid. Cannot draw title overlay.") | |
else: | |
draw.rectangle([(title_bar_x0,title_bar_y0),(title_bar_x1,title_bar_y1)],fill=title_bg_color) | |
# Draw title text (centered horizontally in the available space) | |
available_width_for_title_text = final_overlay_box_width - 2 * padding['title_w_pad'] | |
title_text_draw_x = title_bar_x0 + padding['title_w_pad'] + max(0, (available_width_for_title_text - actual_title_w) / 2) | |
title_text_draw_y = title_bar_y0 + padding['title_h_pad'] | |
draw.text((title_text_draw_x, title_text_draw_y), title, font=title_font, fill=title_text_color) | |
logger.debug(f"Drew title '{title}' at ({title_text_draw_x}, {title_text_draw_y})") | |
# --- Draw Key List Box --- | |
if disp_keys: | |
key_list_box_h_ideal = total_keys_render_h_estimate + 2 * padding['key_h_pad'] | |
key_list_x0, key_list_x1 = title_bar_x0, title_bar_x1 # Align key list box with title bar | |
key_list_y0 = title_bar_y1 # Start immediately below title bar | |
# Calculate the actual height needed considering image bottom boundary | |
max_key_list_box_h = image.height - margin - key_list_y0 | |
current_key_list_box_h = min(key_list_box_h_ideal, max_key_list_box_h) | |
key_list_y1 = key_list_y0 + current_key_list_box_h | |
# Check if there's enough vertical space to draw at least the first key line (including padding) | |
min_required_key_list_height = (key_line_heights[0] if key_line_heights else 0) + 2 * padding['key_h_pad'] | |
if current_key_list_box_h < min_required_key_list_height and len(disp_keys) > 0: | |
logger.warning(f"Not enough vertical space ({current_key_list_box_h}px) to draw key list overlay (needed min {min_required_key_list_height}px).") | |
# Convert back to RGB if it was converted | |
if img_overlayed.mode == 'RGBA': img_overlayed = img_overlayed.convert('RGB') | |
return set_pil_image_format_to_png(img_overlayed) | |
# Ensure key list box has positive dimensions | |
if key_list_x0 >= key_list_x1 or key_list_y0 >= key_list_y1: | |
logger.warning("Calculated key list box dimensions are invalid. Cannot draw key list overlay.") | |
else: | |
draw.rectangle([(key_list_x0,key_list_y0),(key_list_x1,key_list_y1)],fill=key_list_bg_color) | |
logger.debug(f"Drew key list box from ({key_list_x0},{key_list_y0}) to ({key_list_x1},{key_list_y1})") | |
current_text_y = key_list_y0 + padding['key_h_pad'] | |
available_text_width_for_keys = final_overlay_box_width - 2 * padding['key_w_pad'] | |
for i, key_text_item in enumerate(disp_keys): | |
# Get height of the *original* item text for initial vertical check | |
# This uses the pre-calculated height which should be accurate based on the original string | |
estimated_line_h = key_line_heights[i] if i < len(key_line_heights) else _get_text_measurement(draw, key_text_item, key_font)[1] | |
if estimated_line_h <= 0 and len(key_text_item) > 0: estimated_line_h = key_font.size # Fallback height | |
# Calculate the space needed *including* line spacing for the *next* line IF there is one | |
space_needed_for_this_line_and_next_gap = estimated_line_h + (line_spacing if i < len(disp_keys)-1 else 0) | |
# Check vertical fit *before* potentially drawing this line | |
# This checks if adding this line and the potential space after it would exceed the bounds | |
if current_text_y + space_needed_for_this_line_and_next_gap > key_list_y1 - padding['key_h_pad']: | |
# This line and the gap *after* it won't fit. | |
# Check if we can draw *just* an ellipsis for truncation indication at the current Y. | |
if not key_text_item.endswith("..."): # Avoid drawing ellipsis twice if it's already the final "..." line | |
ellipsis_str = "..." | |
ellipsis_w, ellipsis_h = _get_text_measurement(draw, ellipsis_str, key_font) | |
# Check if just the ellipsis fits vertically within the remaining space | |
if current_text_y + ellipsis_h <= key_list_y1 - padding['key_h_pad']: | |
# Draw the ellipsis centered horizontally | |
ellipsis_draw_x = key_list_x0 + padding['key_w_pad'] + max(0, (available_text_width_for_keys - ellipsis_w) / 2) | |
draw.text((ellipsis_draw_x, current_text_y), ellipsis_str, font=key_font, fill=ellipsis_color) | |
logger.debug(f"Drew final ellipsis at ({ellipsis_draw_x}, {current_text_y}) due to vertical space limit.") | |
break # Stop processing/drawing further lines | |
# If it fits vertically: Determine text to draw with horizontal truncation. | |
# --- Determine text_to_draw and its dimensions AFTER horizontal truncation --- | |
original_key_text_w = actual_key_text_widths[i] if i < len(actual_key_text_widths) else _get_text_measurement(draw, key_text_item, key_font)[0] | |
text_to_draw = key_text_item # Start assuming no truncation | |
if original_key_text_w > available_text_width_for_keys and not key_text_item.startswith("..."): | |
temp_text = key_text_item | |
ellipsis_str = "..." | |
# Recalculate ellipsis width based on key_font | |
ellipsis_w, _ = _get_text_measurement(draw, ellipsis_str, key_font) | |
# Ensure space_for_text is not negative | |
space_for_text = max(0, available_text_width_for_keys - ellipsis_w) | |
truncated = False | |
# Truncate from the end until `temp_text + ellipsis_str` fits horizontally | |
# Use _get_text_measurement for width calculation | |
while _get_text_measurement(draw, temp_text + ellipsis_str, key_font)[0] > available_text_width_for_keys and len(temp_text) > 0: | |
temp_text = temp_text[:-1] | |
truncated = True | |
# Add ellipsis if truncation happened | |
text_to_draw = temp_text + ellipsis_str if truncated else temp_text | |
# Re-calculate actual dimensions *after* truncation for drawing and Y update | |
final_key_text_w, current_line_actual_h = _get_text_measurement(draw, text_to_draw, key_font) | |
# Aggressive fallback truncation if needed (less likely now with robust _get_text_measurement) | |
# Check again if the truncated text + ellipsis fits horizontally | |
if final_key_text_w > available_text_width_for_keys: | |
logger.warning(f"Aggressive truncation needed for key: '{key_text_item[:20]}...'. Final text '{text_to_draw[:20]}...' still too wide ({final_key_text_w}px > {available_text_width_for_keys}px). Re-truncating.") | |
# Estimate max characters that fit using a rough ratio | |
est_chars_that_fit = int(available_text_width_for_keys / max(1, key_font_size * 0.5)) # Use a slightly smaller ratio for safety | |
text_to_draw = key_text_item[:max(0, est_chars_that_fit - len(ellipsis_str))] + ellipsis_str | |
# Recalculate dimensions after aggressive truncation | |
final_key_text_w, current_line_actual_h = _get_text_measurement(draw, text_to_draw, key_font) | |
else: # No horizontal truncation needed for this specific item or it's an ellipsis line | |
text_to_draw = key_text_item # Use the original text item | |
# Get actual dimensions for this line (might be different from estimated if _get_text_measurement varies) | |
final_key_text_w, current_line_actual_h = _get_text_measurement(draw, text_to_draw, key_font) | |
# Ensure actual height is positive for non-empty string | |
if current_line_actual_h <= 0 and len(text_to_draw) > 0: | |
logger.warning(f"Measured height zero for text '{text_to_draw[:20]}...'. Using estimated height: {key_font.size}") | |
current_line_actual_h = key_font.size # Fallback to font size as height estimate | |
# --- DRAW THE TEXT --- | |
# Center text horizontally within the available padding space | |
key_text_draw_x = key_list_x0 + padding['key_w_pad'] + max(0, (available_text_width_for_keys - final_key_text_w) / 2) | |
# Determine text color: ellipsis color if it's the final "more" ellipsis line | |
# or if truncation resulted in adding "..." to a regular key name. | |
text_color_to_use = key_text_color | |
if key_text_item.startswith("...") or text_to_draw.endswith("...") and text_to_draw != key_text_item: | |
text_color_to_use = ellipsis_color | |
draw.text((key_text_draw_x, current_text_y), text_to_draw, font=key_font, fill=text_color_to_use) | |
logger.debug(f"Drew key item '{text_to_draw}' at ({key_text_draw_x}, {current_text_y}). Measured height: {current_line_actual_h}") | |
# --- UPDATE Y POSITION FOR NEXT LINE --- | |
# Move down by the *actual* height of the line just drawn | |
current_text_y += current_line_actual_h | |
# Add spacing only if there are more lines to draw AND we haven't hit the vertical boundary | |
# The boundary check at the start of the loop should prevent drawing the spacing if the next line won't fit | |
if i < len(disp_keys)-1: | |
current_text_y += line_spacing | |
# Convert back to RGB if it was converted to RGBA | |
# This flattens the layers, making the alpha blending permanent. | |
if img_overlayed.mode == 'RGBA': | |
try: | |
img_overlayed = img_overlayed.convert('RGB') | |
logger.debug("Converted overlayed image back to RGB.") | |
except Exception as e: | |
logger.error(f"Failed to convert overlayed image back to RGB: {e}", exc_info=True) | |
# Decide how to handle this failure - maybe return RGBA or raise error | |
# For now, let's proceed with RGBA if RGB conversion fails, though it might not be desired. | |
pass # Keep as RGBA if conversion fails | |
# Final step: Ensure the output is in PNG format to preserve LSBs | |
# This step also handles the RGB -> PNG conversion robustly. | |
return set_pil_image_format_to_png(img_overlayed) |