# keylock/app.py import gradio as gr from PIL import Image, ImageFont import tempfile import os import json import logging import traceback import base64 import io # Import specific exceptions from cryptography from cryptography.exceptions import InvalidTag, InvalidKey import core # Use relative import for core module __version__ ="v1"# Import version for footer app_logger = logging.getLogger("keylock_app") # Configure logger if not already configured if not app_logger.hasHandlers(): handler = logging.StreamHandler() # Using the format from the original code for consistency formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) app_logger.addHandler(handler) app_logger.setLevel(logging.INFO) # Keep INFO level for app logs # Theming colors (using fallback names for broader compatibility if needed) try: # Attempt to use Gradio's theme colors if available gr.themes.colors.blue # Check if colors exist blue_color = gr.themes.colors.blue sky_color = gr.themes.colors.sky slate_color = gr.themes.colors.slate cyan_color = gr.themes.colors.cyan neutral_color = gr.themes.colors.neutral except AttributeError: # Fallback for older Gradio versions or if colors structure changes app_logger.warning("gr.themes.colors not found. Using placeholder colors for themes.") class FallbackColors: # Basic fallback colors blue = "blue"; sky = "skyblue"; slate = "slategray"; cyan = "cyan"; neutral = "gray" blue_color = FallbackColors.blue sky_color = FallbackColors.sky slate_color = FallbackColors.slate cyan_color = FallbackColors.cyan neutral_color = FallbackColors.neutral ICON_EMBED = "🔒" # Changed icon to reflect encryption ICON_EXTRACT = "🔓" # Changed icon to reflect decryption ICON_KEY = "🔑" # New icon for key management def pil_to_base64_html(pil_image, max_width_px=None): buffered = io.BytesIO() # Save as PNG to preserve potential LSB data (though preview isn't used for stego) # Ensure it's RGB for saving if the original was RGBA (overlay conversion) if pil_image.mode == 'RGBA': pil_image = pil_image.convert('RGB') pil_image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() style = f"max-width:{max_width_px}px; height:auto; border:1px solid #ccc; display:block; margin-left:auto; margin-right:auto;" if max_width_px else "border:1px solid #ccc; display:block; margin-left:auto; margin-right:auto;" return f"
Stego Image Preview
" def gradio_embed_data(kv_string: str, recipient_public_key_pem: str, input_image_pil: Image.Image, generate_carrier_flag: bool, show_keys_on_image_flag: bool, output_filename_base: str): """Handles the embedding process in Gradio.""" output_html_img_str, status_msg, dl_file_path = None, "An error occurred.", None # Validate inputs if not recipient_public_key_pem or not recipient_public_key_pem.strip(): return None, "Error: Recipient's Public Key is required.", None if not kv_string or not kv_string.strip(): return None, "Error: Key-Value data cannot be empty.", None try: # Parse KV string data_dict = core.parse_kv_string_to_dict(kv_string) if not data_dict: return None, "Error: Parsed Key-Value data is empty or only contains comments/empty lines.", None serial_data = json.dumps(data_dict, indent=None).encode('utf-8') # Compact JSON for embedding # Prepare carrier image original_format_note = "" if generate_carrier_flag or input_image_pil is None: app_logger.info("Generating new carrier image.") carrier_img = core.generate_keylock_carrier_image() else: app_logger.info(f"Using uploaded carrier image (format: {getattr(input_image_pil, 'format', 'Unknown')}).") carrier_img = input_image_pil.copy() if hasattr(input_image_pil, 'format') and input_image_pil.format and input_image_pil.format.upper() != 'PNG': # Note about format conversion if not PNG original_format_note = ( f"Input carrier image was format '{input_image_pil.format}'. " f"It will be converted to RGB for LSB embedding and saved as PNG. " f"If original was lossy (e.g., JPEG), quality is preserved from upload; " f"if it had transparency (e.g., GIF, WebP with alpha), transparency will be lost." ) app_logger.warning(original_format_note) # Ensure carrier is RGB before overlay and embedding carrier_img = carrier_img.convert("RGB") # Encrypt the data using hybrid RSA/AES app_logger.info(f"Encrypting {len(serial_data)} bytes of data with hybrid encryption.") # Need bytes for public key input encrypted_payload = core.encrypt_data_hybrid(serial_data, recipient_public_key_pem.encode('utf-8')) app_logger.info(f"Encrypted payload size (from crypto layer): {len(encrypted_payload)} bytes.") # Add visual overlay keys_for_overlay = list(data_dict.keys()) if show_keys_on_image_flag else None overlay_title = "KeyLock: RSA Encrypted Data Embedded" final_carrier_with_overlay = core.draw_key_list_dropdown_overlay( carrier_img, keys=keys_for_overlay, title=overlay_title ) app_logger.info("Visual overlay added.") # Embed the encrypted payload using the LSB layer # The LSB function will add its own 4-byte header for this payload's length. app_logger.info(f"Embedding {len(encrypted_payload)} bytes (crypto payload) into image pixels.") stego_final_img = core.embed_data_in_image(final_carrier_with_overlay, encrypted_payload) # The total size embedded in the image LSBs is len(encrypted_payload) + core.LENGTH_HEADER_SIZE. app_logger.info(f"Data embedding complete. Total bytes embedded in LSB (including header): {len(encrypted_payload) + core.LENGTH_HEADER_SIZE} bytes.") # Save the stego image to a temporary file fname_base = "".join(c if c.isalnum() or c in ('_','-') else '_' for c in output_filename_base.strip()) or "keylock_img_rsa" temp_fp = None try: with tempfile.NamedTemporaryFile(prefix=fname_base+"_", suffix=".png", delete=False) as tmp: stego_final_img.save(tmp, format="PNG") temp_fp = tmp.name app_logger.info(f"Stego image saved to temp file: {temp_fp}") except Exception as e: app_logger.error(f"Error saving temp file: {e}", exc_info=True) status_msg = f"Error saving output image: {e}" return None, status_msg, None # Prepare output for Gradio output_html_img_str = pil_to_base64_html(stego_final_img, max_width_px=480) status_msg = (f"Data embedded into '{os.path.basename(temp_fp)}'.\n" f"{original_format_note}" f"Image contains visual \"{overlay_title}\" overlay " f"{'(with key list)' if show_keys_on_image_flag and keys_for_overlay else ''} " f"and your LSB-encoded encrypted data.\n" f"Secret data size: {len(serial_data)}B (raw JSON).\n" f"Crypto payload size: {len(encrypted_payload)}B (Hybrid RSA/AES).\n" f"Total bytes embedded in LSB (including header): {len(encrypted_payload) + core.LENGTH_HEADER_SIZE}B.\n" f"Recipient needs their Private Key to extract.") return output_html_img_str, status_msg, temp_fp except (ValueError, RuntimeError, TypeError, InvalidKey) as e: # Catch InvalidKey here too # Catch errors raised by core functions (e.g., invalid key, data too large, crypto errors) app_logger.error(f"Embed process error: {e}", exc_info=True) return None, f"Error during embedding: {str(e)}", None except Exception as e: # Catch any other unexpected errors app_logger.error(f"Unexpected Embed Error: {e}", exc_info=True) return None, f"An unexpected error occurred: {str(e)}", None def gradio_extract_data(stego_image_pil: Image.Image, recipient_private_key_pem: str): """Handles the extraction process in Gradio.""" if stego_image_pil is None: return "Error: No image provided.", "Error: No image." if not recipient_private_key_pem or not recipient_private_key_pem.strip(): return "Error: Your Private Key is required for decryption.", "Error: Private Key required." try: # Ensure image is RGB for extraction stego_image_rgb = stego_image_pil.convert("RGB") if hasattr(stego_image_pil, 'format') and stego_image_pil.format and stego_image_pil.format.upper() != "PNG": app_logger.warning(f"Uploaded image for extraction is format '{stego_image_pil.format}', not PNG. LSB data may be compromised if not the original KeyLock file.") # Extract the embedded payload using the LSB layer # This function (restored) reads the LSB header and returns *only* the data payload after it. app_logger.info("Extracting data from image pixels using LSB.") extracted_crypto_payload = core.extract_data_from_image(stego_image_rgb) app_logger.info(f"Extracted crypto payload size (from LSB layer): {len(extracted_crypto_payload)} bytes.") if not extracted_crypto_payload: # The original extract_data_from_image would return b"" if header length was 0. # This is handled in core's decrypt_data_hybrid, which checks min size. pass # Allow empty payload to proceed to core's decryption logic for specific errors # Decrypt the crypto payload using hybrid RSA/AES # This function now expects the raw crypto payload bytes, without the LSB header. app_logger.info("Decrypting extracted crypto payload with hybrid encryption.") # Need bytes for private key input decrypted_bytes = core.decrypt_data_hybrid(extracted_crypto_payload, recipient_private_key_pem.encode('utf-8')) app_logger.info(f"Decrypted data size (raw): {len(decrypted_bytes)} bytes.") # Attempt to parse decrypted data (usually JSON) try: data = json.loads(decrypted_bytes.decode('utf-8')) # Pretty print JSON txt, stat = json.dumps(data, indent=2), "Data extracted successfully (JSON)." app_logger.info("Successfully decrypted and parsed as JSON.") except (json.JSONDecodeError, UnicodeDecodeError): # Fallback if not valid UTF-8 JSON try: txt = "Decrypted (UTF-8, not JSON):\n" + decrypted_bytes.decode('utf-8', errors='replace') # Use replace for invalid chars stat = "Warning: Decrypted as UTF-8, but not valid JSON." app_logger.warning("Decrypted data is not valid JSON, but appears to be UTF-8.") except UnicodeDecodeError: # Fallback if not valid UTF-8 txt = "Decrypted (raw hex, not JSON/UTF-8):\n" + decrypted_bytes.hex() stat = "Warning: Decrypted bytes are not valid UTF-8 or JSON. Displaying as hex." app_logger.warning("Decrypted data is not valid UTF-8. Displaying as hex.") return txt, stat # Corrected the exception catching here to import InvalidTag and InvalidKey except (ValueError, RuntimeError, InvalidTag, InvalidKey, TypeError) as e: # Catch errors raised by core functions (e.g., extraction errors, decryption errors) app_logger.error(f"Extract process error: {e}", exc_info=True) # Pass through specific error messages from core # Use type(e).__name__ to show the actual exception type in the UI status return f"Error during extraction: {str(e)}", f"Extraction Failed: {type(e).__name__} Error." except Exception as e: # Catch any other unexpected errors app_logger.error(f"Unexpected Extract Error: {e}", exc_info=True) return f"An unexpected error occurred: {str(e)}", f"Extraction Failed: Unexpected Error." # Corrected the constant name here def gradio_generate_keys(key_size_bits: int = core.RSA_KEY_SIZE_DEFAULT): """Generates RSA keys and returns them in PEM format.""" status = "" try: private_pem, public_pem = core.generate_rsa_key_pair(key_size_bits) status = f"RSA {key_size_bits}-bit key pair generated successfully." app_logger.info(status) return private_pem.decode('utf-8'), public_pem.decode('utf-8'), status except Exception as e: app_logger.error(f"Key generation error: {e}", exc_info=True) status = f"Error generating keys: {str(e)}" return "", "", status def build_interface(): """Builds the Gradio interface with RSA key management.""" custom_theme = gr.themes.Base( primary_hue="indigo", secondary_hue="cyan", neutral_hue="zinc", text_size="md", spacing_size="md", radius_size="sm", font=["System UI", "sans-serif"] # Use system fonts first ).set( # Example adjustments for better dark theme appearance button_secondary_background_fill_hover=blue_color, # Use theme colors button_secondary_border_color_hover=blue_color, button_secondary_text_color_hover='white', # Add more theme adjustments as desired ) # Using inline CSS for custom backgrounds and colors as in original custom_css = """ body { background: linear-gradient(to right bottom, rgb(44, 62, 80), rgb(80 168 255)); /* Dark blue gradient */ color: #ecf0f1; /* Light text color */ } span { color: inherit; /* Inherit color from parent */ } .gradio-container { background: transparent !important; /* Make main container transparent */ } .gr-box, .gr-panel, .gr-pill { background-color: rgba(44, 62, 80, 0.8) !important; /* Semi-transparent dark boxes */ border-color: rgba(189, 195, 199, 0.2) !important; /* Light border */ } .gr-textbox, .gr-dropdown, .gr-button, .gr-code, .gr-chat-message, .gr-image { border-color: rgba(189, 195, 199, 0.3) !important; /* Lighter border for inputs */ background-color: rgba(52, 73, 94, 0.9) !important; /* Slightly lighter dark background for inputs */ color: #ecf0f1 !important; /* Light text color */ } /* Specific button colors using theme/fallback */ .gr-button { color: #c6c6fc !important; /* Default button text color */ } .gr-button.gr-button-primary { background-color: #1abc9c !important; /* Teal */ color: white !important; border-color: #16a085 !important; } .gr-button.gr-button-secondary { background-color: #9b59b6 !important; /* Purple */ color: white !important; border-color: #8e44ad !important; } .gr-button.gr-button-secondary:hover { background-color: #8e44ad !important; /* Darker purple on hover */ } .gr-button.gr-button-stop { background-color: #e74c3c !important; /* Red */ color: white !important; border-color: #c0392b !important; } .gr-markdown { background-color: rgba(44, 62, 80, 0.7) !important; /* Semi-transparent dark background for markdown */ padding: 10px; border-radius: 5px; } .gr-markdown h1, .gr-markdown h2, .gr-markdown h3, .gr-markdown h4, .gr-markdown h5, .gr-markdown h6 { color: #ecf0f1 !important; /* Light heading color */ border-bottom-color: rgba(189, 195, 199, 0.3) !important; /* Light border below headings */ } .gr-markdown p { /* Style for paragraph text in markdown */ color: #bdc3c7; /* Lighter gray for body text */ } .gr-markdown pre code { background-color: rgba(52, 73, 94, 0.95) !important; /* Dark code block background */ border-color: rgba(189, 195, 199, 0.3) !important; /* Light border for code */ } .gr-image div img { /* Style for image preview */ border: 1px solid #ccc; background-color: rgba(52, 73, 94, 0.9) !important; /* Dark background behind image */ } .gr-file div button { /* Style for file download button */ background-color: #1abc9c !important; /* Teal */ color: white !important; border: 1px solid #16a085 !important; } /* Style for the Key PEM textboxes to handle whitespace */ #key_pem_textbox textarea { white-space: pre !important; /* Preserve line breaks and spacing */ overflow-x: auto !important; /* Add scrollbar if lines are long */ } """ with gr.Blocks(theme=custom_theme, css=custom_css, title=f"KeyLock Steganography v{__version__}") as keylock_app_interface: gr.Markdown(f"
🔑 KeyLock v{__version__}

Portable API Key Wallet in a PNG (RSA Encrypted)

") gr.HTML("
Securely embed and extract API key-value pairs (or any text) within PNG images using LSB steganography and RSA Public-Key Encryption, allowing recipient decryption without a shared password.
") gr.HTML("
GitHub: KeyLock-API-Wallet | Decoder Module: keylock-decode
") gr.HTML("
") with gr.Tabs(): with gr.TabItem(f"{ICON_EMBED} Embed Data (Encrypt for Recipient)"): with gr.Row(): with gr.Column(scale=2): embed_kv_input = gr.Textbox( label="Secret Data (Key:Value Pairs, one per line)", placeholder="API_KEY_1: your_secret_value_1\nSERVICE_USER = 'user@example.com'\n# Lines starting with # are ignored", lines=7, info="Enter secrets as Key:Value or Key=Value. Each pair on a new line." ) embed_public_key_input = gr.Textbox( label="Recipient's Public Key (PEM Format)", placeholder="Paste the recipient's RSA public key here, or load from file...", lines=8, info="Data will be encrypted for the holder of the corresponding Private Key.", elem_id="key_pem_textbox" # Apply custom CSS ) # Optional: File upload for Public Key # embed_public_key_file = gr.File( # label="Load Public Key from .pem file", # type="filepath", # or "bytes" if reading in python # file_count="single", # # Add logic to read file content into the textbox # ) embed_output_filename_base = gr.Textbox( label="Base Name for Downloaded Stego Image", value="keylock_wallet_rsa", info="'.png' will be appended. e.g., 'project_secrets_for_john'" ) with gr.Accordion("Carrier Image Options", open=False): embed_generate_carrier_checkbox = gr.Checkbox( label="Generate new KeyLock Wallet image", value=True, info="Uncheck to upload your own PNG carrier image." ) embed_input_image_upload = gr.Image( label="Upload Your Own PNG Carrier (Optional)", type="pil", image_mode="RGB", # Will convert to RGB for LSB regardless sources=["upload","clipboard"], visible=False, # Initially hidden show_download_button=False, interactive=True ) embed_show_keys_checkbox = gr.Checkbox( label="Show list of key names on image overlay", value=True, info="Displays embedded key names (not values) on the image." ) embed_button = gr.Button("Embed Secrets "+ICON_EMBED, variant="primary") with gr.Column(scale=3): gr.Markdown("### Output Image & Status") embed_output_status = gr.Textbox( label="Embedding Status", lines=4, interactive=False, placeholder="Status messages will appear here..." ) embed_output_image_html = gr.HTML( label="Preview of Stego Image (Max 480px width)", value="
Image preview will appear here after embedding.
" ) embed_download_file = gr.File( label="Download Your KeyLock Image (PNG)", interactive=False, file_count="single" ) # --- Interactivity --- def toggle_carrier_upload(generate_flag): return gr.update(visible=not generate_flag) embed_generate_carrier_checkbox.change( fn=toggle_carrier_upload, inputs=[embed_generate_carrier_checkbox], outputs=[embed_input_image_upload] ) embed_button.click( fn=gradio_embed_data, inputs=[ embed_kv_input, embed_public_key_input, embed_input_image_upload, embed_generate_carrier_checkbox, embed_show_keys_checkbox, embed_output_filename_base ], outputs=[ embed_output_image_html, embed_output_status, embed_download_file ] ) with gr.TabItem(f"{ICON_EXTRACT} Extract Data (Decrypt with Private Key)"): with gr.Row(): with gr.Column(scale=1): extract_stego_image_upload = gr.Image( label="Upload KeyLock PNG Image", type="pil", image_mode="RGB", # Will convert to RGB for LSB regardless sources=["upload","clipboard"], show_download_button=False, interactive=True, ) extract_private_key_input = gr.Textbox( label="Your Private Key (PEM Format)", placeholder="Paste your RSA private key here, or load from file...", lines=8, info="Required to decrypt the data.", elem_id="key_pem_textbox" # Apply custom CSS ) # Optional: File upload for Private Key # extract_private_key_file = gr.File( # label="Load Private Key from .pem file", # type="filepath", # or "bytes" if reading in python # file_count="single", # # Add logic to read file content into the textbox # ) extract_button = gr.Button("Extract Secrets "+ICON_EXTRACT, variant="primary") with gr.Column(scale=2): gr.Markdown("### Extracted Data & Status") extract_output_status = gr.Textbox( label="Extraction Status", lines=2, interactive=False, placeholder="Status messages will appear here..." ) extract_output_data = gr.Textbox( label="Extracted Secret Data", lines=10, interactive=False, placeholder="Extracted data (usually JSON) will appear here...", show_copy_button=True ) # --- Interactivity --- extract_button.click( fn=gradio_extract_data, inputs=[ extract_stego_image_upload, extract_private_key_input ], outputs=[ extract_output_data, extract_output_status ] ) with gr.TabItem(f"{ICON_KEY} Key Management (Generate Keys)"): with gr.Row(): with gr.Column(): gr.Markdown("### Generate RSA Key Pair") gr.Markdown( "Use this tab to generate a new pair of RSA Public and Private Keys.
" "🔑 **The Public Key** is used by others to encrypt data *for you*. You can share this freely.
" "🔐 **The Private Key** is used *only by you* to decrypt data sent to you. Keep this **absolutely secret and secure**! If you lose it, you cannot decrypt data encrypted with the corresponding Public Key.
" "Keys are generated in standard PEM format." ) key_size_dropdown = gr.Dropdown( label="Key Size (bits)", # Corrected the constant name here as well choices=[2048, 3072, 4096], value=core.RSA_KEY_SIZE_DEFAULT, info="Larger keys are more secure but result in larger encrypted data payloads (affecting image capacity)." ) generate_key_button = gr.Button("Generate New Key Pair "+ICON_KEY, variant="secondary") key_gen_status = gr.Textbox( label="Status", interactive=False, placeholder="Key generation status..." ) with gr.Column(): gr.Markdown("### Your Generated Keys") generated_private_key = gr.Textbox( label="Generated Private Key (Keep Secret!)", lines=10, interactive=False, show_copy_button=True, elem_id="key_pem_textbox" # Apply custom CSS ) generated_public_key = gr.Textbox( label="Generated Public Key (Share This)", lines=8, interactive=False, show_copy_button=True, elem_id="key_pem_textbox" # Apply custom CSS ) # Optional: Download buttons for keys # download_private_key_file = gr.File(label="Download Private Key", interactive=False) # download_public_key_file = gr.File(label="Download Public Key", interactive=False) # --- Interactivity --- generate_key_button.click( fn=gradio_generate_keys, inputs=[key_size_dropdown], outputs=[generated_private_key, generated_public_key, key_gen_status] # Add outputs for download files if implemented ) # Footer gr.Markdown("
") gr.Markdown(f"
KeyLock-API-Wallet v{__version__}. Use responsibly. Read Important Notes & Best Practices.
") return keylock_app_interface def main(): app_logger.info(f"Starting KeyLock Gradio Application v{__version__} (RSA Encryption Mode)...") try: # Attempt to load a common font to check PIL/Pillow font handling # These checks are more for diagnostic logging ImageFont.truetype("DejaVuSans.ttf", 10) # Common on Linux app_logger.info("DejaVuSans font found, PIL font rendering should be good.") except IOError: try: ImageFont.truetype("arial.ttf", 10) # Common on Windows app_logger.info("Arial font found, PIL font rendering should be good.") except IOError: app_logger.warning("Common system fonts (DejaVuSans/Arial) not found. PIL might use basic bitmap font if other preferred fonts in core.py are also unavailable. Overlay text quality might be affected.") keylock_app_interface = build_interface() # Prepare launch arguments launch_args = {} # Start with empty dict # Add allowed_paths for temp directory for file downloads temp_dir = tempfile.gettempdir() launch_args["allowed_paths"] = [temp_dir] app_logger.info(f"Allowing file downloads from temp directory: {temp_dir}") # Check environment variables for server name/port server_name = os.environ.get('GRADIO_SERVER_NAME') server_port = os.environ.get('GRADIO_SERVER_PORT') if server_name: launch_args["server_name"] = server_name app_logger.info(f"Using server_name from environment: {server_name}") if server_port: try: launch_args["server_port"] = int(server_port) app_logger.info(f"Using server_port from environment: {server_port}") except ValueError: app_logger.warning(f"Invalid GRADIO_SERVER_PORT: {server_port}. Using default.") # Check for --share argument (optional, for public sharing) # if "--share" in sys.argv: # Need to import sys if using this # launch_args["share"] = True # app_logger.info("Launching with share=True...") keylock_app_interface.launch(**launch_args) # Entry point for the Gradio app if __name__ == "__main__": # Add basic file logging for production if needed, alongside stream handler # file_handler = logging.FileHandler('keylock_app.log') # file_handler.setFormatter(formatter) # Use formatter defined earlier # app_logger.addHandler(file_handler) # logger.addHandler(file_handler) # Add to core logger too main()