# keylock/app.py
import gradio as gr
from PIL import Image, ImageFont
import tempfile
import os
import json
import logging
import traceback
import base64
import io
# Import specific exceptions from cryptography
from cryptography.exceptions import InvalidTag, InvalidKey
import core # Use relative import for core module
__version__ ="v1"# Import version for footer
app_logger = logging.getLogger("keylock_app")
# Configure logger if not already configured
if not app_logger.hasHandlers():
handler = logging.StreamHandler()
# Using the format from the original code for consistency
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
app_logger.addHandler(handler)
app_logger.setLevel(logging.INFO) # Keep INFO level for app logs
# Theming colors (using fallback names for broader compatibility if needed)
try:
# Attempt to use Gradio's theme colors if available
gr.themes.colors.blue # Check if colors exist
blue_color = gr.themes.colors.blue
sky_color = gr.themes.colors.sky
slate_color = gr.themes.colors.slate
cyan_color = gr.themes.colors.cyan
neutral_color = gr.themes.colors.neutral
except AttributeError:
# Fallback for older Gradio versions or if colors structure changes
app_logger.warning("gr.themes.colors not found. Using placeholder colors for themes.")
class FallbackColors: # Basic fallback colors
blue = "blue"; sky = "skyblue"; slate = "slategray"; cyan = "cyan"; neutral = "gray"
blue_color = FallbackColors.blue
sky_color = FallbackColors.sky
slate_color = FallbackColors.slate
cyan_color = FallbackColors.cyan
neutral_color = FallbackColors.neutral
ICON_EMBED = "🔒" # Changed icon to reflect encryption
ICON_EXTRACT = "🔓" # Changed icon to reflect decryption
ICON_KEY = "🔑" # New icon for key management
def pil_to_base64_html(pil_image, max_width_px=None):
buffered = io.BytesIO()
# Save as PNG to preserve potential LSB data (though preview isn't used for stego)
# Ensure it's RGB for saving if the original was RGBA (overlay conversion)
if pil_image.mode == 'RGBA':
pil_image = pil_image.convert('RGB')
pil_image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode()
style = f"max-width:{max_width_px}px; height:auto; border:1px solid #ccc; display:block; margin-left:auto; margin-right:auto;" if max_width_px else "border:1px solid #ccc; display:block; margin-left:auto; margin-right:auto;"
return f"

"
def gradio_embed_data(kv_string: str, recipient_public_key_pem: str,
input_image_pil: Image.Image, generate_carrier_flag: bool,
show_keys_on_image_flag: bool, output_filename_base: str):
"""Handles the embedding process in Gradio."""
output_html_img_str, status_msg, dl_file_path = None, "An error occurred.", None
# Validate inputs
if not recipient_public_key_pem or not recipient_public_key_pem.strip():
return None, "Error: Recipient's Public Key is required.", None
if not kv_string or not kv_string.strip():
return None, "Error: Key-Value data cannot be empty.", None
try:
# Parse KV string
data_dict = core.parse_kv_string_to_dict(kv_string)
if not data_dict:
return None, "Error: Parsed Key-Value data is empty or only contains comments/empty lines.", None
serial_data = json.dumps(data_dict, indent=None).encode('utf-8') # Compact JSON for embedding
# Prepare carrier image
original_format_note = ""
if generate_carrier_flag or input_image_pil is None:
app_logger.info("Generating new carrier image.")
carrier_img = core.generate_keylock_carrier_image()
else:
app_logger.info(f"Using uploaded carrier image (format: {getattr(input_image_pil, 'format', 'Unknown')}).")
carrier_img = input_image_pil.copy()
if hasattr(input_image_pil, 'format') and input_image_pil.format and input_image_pil.format.upper() != 'PNG':
# Note about format conversion if not PNG
original_format_note = (
f"Input carrier image was format '{input_image_pil.format}'. "
f"It will be converted to RGB for LSB embedding and saved as PNG. "
f"If original was lossy (e.g., JPEG), quality is preserved from upload; "
f"if it had transparency (e.g., GIF, WebP with alpha), transparency will be lost."
)
app_logger.warning(original_format_note)
# Ensure carrier is RGB before overlay and embedding
carrier_img = carrier_img.convert("RGB")
# Encrypt the data using hybrid RSA/AES
app_logger.info(f"Encrypting {len(serial_data)} bytes of data with hybrid encryption.")
# Need bytes for public key input
encrypted_payload = core.encrypt_data_hybrid(serial_data, recipient_public_key_pem.encode('utf-8'))
app_logger.info(f"Encrypted payload size (from crypto layer): {len(encrypted_payload)} bytes.")
# Add visual overlay
keys_for_overlay = list(data_dict.keys()) if show_keys_on_image_flag else None
overlay_title = "KeyLock: RSA Encrypted Data Embedded"
final_carrier_with_overlay = core.draw_key_list_dropdown_overlay(
carrier_img,
keys=keys_for_overlay,
title=overlay_title
)
app_logger.info("Visual overlay added.")
# Embed the encrypted payload using the LSB layer
# The LSB function will add its own 4-byte header for this payload's length.
app_logger.info(f"Embedding {len(encrypted_payload)} bytes (crypto payload) into image pixels.")
stego_final_img = core.embed_data_in_image(final_carrier_with_overlay, encrypted_payload)
# The total size embedded in the image LSBs is len(encrypted_payload) + core.LENGTH_HEADER_SIZE.
app_logger.info(f"Data embedding complete. Total bytes embedded in LSB (including header): {len(encrypted_payload) + core.LENGTH_HEADER_SIZE} bytes.")
# Save the stego image to a temporary file
fname_base = "".join(c if c.isalnum() or c in ('_','-') else '_' for c in output_filename_base.strip()) or "keylock_img_rsa"
temp_fp = None
try:
with tempfile.NamedTemporaryFile(prefix=fname_base+"_", suffix=".png", delete=False) as tmp:
stego_final_img.save(tmp, format="PNG")
temp_fp = tmp.name
app_logger.info(f"Stego image saved to temp file: {temp_fp}")
except Exception as e:
app_logger.error(f"Error saving temp file: {e}", exc_info=True)
status_msg = f"Error saving output image: {e}"
return None, status_msg, None
# Prepare output for Gradio
output_html_img_str = pil_to_base64_html(stego_final_img, max_width_px=480)
status_msg = (f"Data embedded into '{os.path.basename(temp_fp)}'.\n"
f"{original_format_note}"
f"Image contains visual \"{overlay_title}\" overlay "
f"{'(with key list)' if show_keys_on_image_flag and keys_for_overlay else ''} "
f"and your LSB-encoded encrypted data.\n"
f"Secret data size: {len(serial_data)}B (raw JSON).\n"
f"Crypto payload size: {len(encrypted_payload)}B (Hybrid RSA/AES).\n"
f"Total bytes embedded in LSB (including header): {len(encrypted_payload) + core.LENGTH_HEADER_SIZE}B.\n"
f"Recipient needs their Private Key to extract.")
return output_html_img_str, status_msg, temp_fp
except (ValueError, RuntimeError, TypeError, InvalidKey) as e: # Catch InvalidKey here too
# Catch errors raised by core functions (e.g., invalid key, data too large, crypto errors)
app_logger.error(f"Embed process error: {e}", exc_info=True)
return None, f"Error during embedding: {str(e)}", None
except Exception as e:
# Catch any other unexpected errors
app_logger.error(f"Unexpected Embed Error: {e}", exc_info=True)
return None, f"An unexpected error occurred: {str(e)}", None
def gradio_extract_data(stego_image_pil: Image.Image, recipient_private_key_pem: str):
"""Handles the extraction process in Gradio."""
if stego_image_pil is None:
return "Error: No image provided.", "Error: No image."
if not recipient_private_key_pem or not recipient_private_key_pem.strip():
return "Error: Your Private Key is required for decryption.", "Error: Private Key required."
try:
# Ensure image is RGB for extraction
stego_image_rgb = stego_image_pil.convert("RGB")
if hasattr(stego_image_pil, 'format') and stego_image_pil.format and stego_image_pil.format.upper() != "PNG":
app_logger.warning(f"Uploaded image for extraction is format '{stego_image_pil.format}', not PNG. LSB data may be compromised if not the original KeyLock file.")
# Extract the embedded payload using the LSB layer
# This function (restored) reads the LSB header and returns *only* the data payload after it.
app_logger.info("Extracting data from image pixels using LSB.")
extracted_crypto_payload = core.extract_data_from_image(stego_image_rgb)
app_logger.info(f"Extracted crypto payload size (from LSB layer): {len(extracted_crypto_payload)} bytes.")
if not extracted_crypto_payload:
# The original extract_data_from_image would return b"" if header length was 0.
# This is handled in core's decrypt_data_hybrid, which checks min size.
pass # Allow empty payload to proceed to core's decryption logic for specific errors
# Decrypt the crypto payload using hybrid RSA/AES
# This function now expects the raw crypto payload bytes, without the LSB header.
app_logger.info("Decrypting extracted crypto payload with hybrid encryption.")
# Need bytes for private key input
decrypted_bytes = core.decrypt_data_hybrid(extracted_crypto_payload, recipient_private_key_pem.encode('utf-8'))
app_logger.info(f"Decrypted data size (raw): {len(decrypted_bytes)} bytes.")
# Attempt to parse decrypted data (usually JSON)
try:
data = json.loads(decrypted_bytes.decode('utf-8'))
# Pretty print JSON
txt, stat = json.dumps(data, indent=2), "Data extracted successfully (JSON)."
app_logger.info("Successfully decrypted and parsed as JSON.")
except (json.JSONDecodeError, UnicodeDecodeError):
# Fallback if not valid UTF-8 JSON
try:
txt = "Decrypted (UTF-8, not JSON):\n" + decrypted_bytes.decode('utf-8', errors='replace') # Use replace for invalid chars
stat = "Warning: Decrypted as UTF-8, but not valid JSON."
app_logger.warning("Decrypted data is not valid JSON, but appears to be UTF-8.")
except UnicodeDecodeError:
# Fallback if not valid UTF-8
txt = "Decrypted (raw hex, not JSON/UTF-8):\n" + decrypted_bytes.hex()
stat = "Warning: Decrypted bytes are not valid UTF-8 or JSON. Displaying as hex."
app_logger.warning("Decrypted data is not valid UTF-8. Displaying as hex.")
return txt, stat
# Corrected the exception catching here to import InvalidTag and InvalidKey
except (ValueError, RuntimeError, InvalidTag, InvalidKey, TypeError) as e:
# Catch errors raised by core functions (e.g., extraction errors, decryption errors)
app_logger.error(f"Extract process error: {e}", exc_info=True)
# Pass through specific error messages from core
# Use type(e).__name__ to show the actual exception type in the UI status
return f"Error during extraction: {str(e)}", f"Extraction Failed: {type(e).__name__} Error."
except Exception as e:
# Catch any other unexpected errors
app_logger.error(f"Unexpected Extract Error: {e}", exc_info=True)
return f"An unexpected error occurred: {str(e)}", f"Extraction Failed: Unexpected Error."
# Corrected the constant name here
def gradio_generate_keys(key_size_bits: int = core.RSA_KEY_SIZE_DEFAULT):
"""Generates RSA keys and returns them in PEM format."""
status = ""
try:
private_pem, public_pem = core.generate_rsa_key_pair(key_size_bits)
status = f"RSA {key_size_bits}-bit key pair generated successfully."
app_logger.info(status)
return private_pem.decode('utf-8'), public_pem.decode('utf-8'), status
except Exception as e:
app_logger.error(f"Key generation error: {e}", exc_info=True)
status = f"Error generating keys: {str(e)}"
return "", "", status
def build_interface():
"""Builds the Gradio interface with RSA key management."""
custom_theme = gr.themes.Base(
primary_hue="indigo",
secondary_hue="cyan",
neutral_hue="zinc",
text_size="md",
spacing_size="md",
radius_size="sm",
font=["System UI", "sans-serif"] # Use system fonts first
).set(
# Example adjustments for better dark theme appearance
button_secondary_background_fill_hover=blue_color, # Use theme colors
button_secondary_border_color_hover=blue_color,
button_secondary_text_color_hover='white',
# Add more theme adjustments as desired
)
# Using inline CSS for custom backgrounds and colors as in original
custom_css = """
body {
background: linear-gradient(to right bottom, rgb(44, 62, 80), rgb(80 168 255)); /* Dark blue gradient */
color: #ecf0f1; /* Light text color */
}
span {
color: inherit; /* Inherit color from parent */
}
.gradio-container {
background: transparent !important; /* Make main container transparent */
}
.gr-box, .gr-panel, .gr-pill {
background-color: rgba(44, 62, 80, 0.8) !important; /* Semi-transparent dark boxes */
border-color: rgba(189, 195, 199, 0.2) !important; /* Light border */
}
.gr-textbox, .gr-dropdown, .gr-button, .gr-code, .gr-chat-message, .gr-image {
border-color: rgba(189, 195, 199, 0.3) !important; /* Lighter border for inputs */
background-color: rgba(52, 73, 94, 0.9) !important; /* Slightly lighter dark background for inputs */
color: #ecf0f1 !important; /* Light text color */
}
/* Specific button colors using theme/fallback */
.gr-button {
color: #c6c6fc !important; /* Default button text color */
}
.gr-button.gr-button-primary {
background-color: #1abc9c !important; /* Teal */
color: white !important;
border-color: #16a085 !important;
}
.gr-button.gr-button-secondary {
background-color: #9b59b6 !important; /* Purple */
color: white !important;
border-color: #8e44ad !important;
}
.gr-button.gr-button-secondary:hover {
background-color: #8e44ad !important; /* Darker purple on hover */
}
.gr-button.gr-button-stop {
background-color: #e74c3c !important; /* Red */
color: white !important;
border-color: #c0392b !important;
}
.gr-markdown {
background-color: rgba(44, 62, 80, 0.7) !important; /* Semi-transparent dark background for markdown */
padding: 10px;
border-radius: 5px;
}
.gr-markdown h1, .gr-markdown h2, .gr-markdown h3, .gr-markdown h4, .gr-markdown h5, .gr-markdown h6 {
color: #ecf0f1 !important; /* Light heading color */
border-bottom-color: rgba(189, 195, 199, 0.3) !important; /* Light border below headings */
}
.gr-markdown p { /* Style for paragraph text in markdown */
color: #bdc3c7; /* Lighter gray for body text */
}
.gr-markdown pre code {
background-color: rgba(52, 73, 94, 0.95) !important; /* Dark code block background */
border-color: rgba(189, 195, 199, 0.3) !important; /* Light border for code */
}
.gr-image div img { /* Style for image preview */
border: 1px solid #ccc;
background-color: rgba(52, 73, 94, 0.9) !important; /* Dark background behind image */
}
.gr-file div button { /* Style for file download button */
background-color: #1abc9c !important; /* Teal */
color: white !important;
border: 1px solid #16a085 !important;
}
/* Style for the Key PEM textboxes to handle whitespace */
#key_pem_textbox textarea {
white-space: pre !important; /* Preserve line breaks and spacing */
overflow-x: auto !important; /* Add scrollbar if lines are long */
}
"""
with gr.Blocks(theme=custom_theme, css=custom_css, title=f"KeyLock Steganography v{__version__}") as keylock_app_interface:
gr.Markdown(f"🔑 KeyLock v{__version__}
Portable API Key Wallet in a PNG (RSA Encrypted)
")
gr.HTML("Securely embed and extract API key-value pairs (or any text) within PNG images using LSB steganography and RSA Public-Key Encryption, allowing recipient decryption without a shared password.
")
gr.HTML("")
gr.HTML("
")
with gr.Tabs():
with gr.TabItem(f"{ICON_EMBED} Embed Data (Encrypt for Recipient)"):
with gr.Row():
with gr.Column(scale=2):
embed_kv_input = gr.Textbox(
label="Secret Data (Key:Value Pairs, one per line)",
placeholder="API_KEY_1: your_secret_value_1\nSERVICE_USER = 'user@example.com'\n# Lines starting with # are ignored",
lines=7,
info="Enter secrets as Key:Value or Key=Value. Each pair on a new line."
)
embed_public_key_input = gr.Textbox(
label="Recipient's Public Key (PEM Format)",
placeholder="Paste the recipient's RSA public key here, or load from file...",
lines=8,
info="Data will be encrypted for the holder of the corresponding Private Key.",
elem_id="key_pem_textbox" # Apply custom CSS
)
# Optional: File upload for Public Key
# embed_public_key_file = gr.File(
# label="Load Public Key from .pem file",
# type="filepath", # or "bytes" if reading in python
# file_count="single",
# # Add logic to read file content into the textbox
# )
embed_output_filename_base = gr.Textbox(
label="Base Name for Downloaded Stego Image",
value="keylock_wallet_rsa",
info="'.png' will be appended. e.g., 'project_secrets_for_john'"
)
with gr.Accordion("Carrier Image Options", open=False):
embed_generate_carrier_checkbox = gr.Checkbox(
label="Generate new KeyLock Wallet image",
value=True,
info="Uncheck to upload your own PNG carrier image."
)
embed_input_image_upload = gr.Image(
label="Upload Your Own PNG Carrier (Optional)",
type="pil",
image_mode="RGB", # Will convert to RGB for LSB regardless
sources=["upload","clipboard"],
visible=False, # Initially hidden
show_download_button=False,
interactive=True
)
embed_show_keys_checkbox = gr.Checkbox(
label="Show list of key names on image overlay",
value=True,
info="Displays embedded key names (not values) on the image."
)
embed_button = gr.Button("Embed Secrets "+ICON_EMBED, variant="primary")
with gr.Column(scale=3):
gr.Markdown("### Output Image & Status")
embed_output_status = gr.Textbox(
label="Embedding Status",
lines=4,
interactive=False,
placeholder="Status messages will appear here..."
)
embed_output_image_html = gr.HTML(
label="Preview of Stego Image (Max 480px width)",
value="Image preview will appear here after embedding.
"
)
embed_download_file = gr.File(
label="Download Your KeyLock Image (PNG)",
interactive=False,
file_count="single"
)
# --- Interactivity ---
def toggle_carrier_upload(generate_flag):
return gr.update(visible=not generate_flag)
embed_generate_carrier_checkbox.change(
fn=toggle_carrier_upload,
inputs=[embed_generate_carrier_checkbox],
outputs=[embed_input_image_upload]
)
embed_button.click(
fn=gradio_embed_data,
inputs=[
embed_kv_input,
embed_public_key_input,
embed_input_image_upload,
embed_generate_carrier_checkbox,
embed_show_keys_checkbox,
embed_output_filename_base
],
outputs=[
embed_output_image_html,
embed_output_status,
embed_download_file
]
)
with gr.TabItem(f"{ICON_EXTRACT} Extract Data (Decrypt with Private Key)"):
with gr.Row():
with gr.Column(scale=1):
extract_stego_image_upload = gr.Image(
label="Upload KeyLock PNG Image",
type="pil",
image_mode="RGB", # Will convert to RGB for LSB regardless
sources=["upload","clipboard"],
show_download_button=False,
interactive=True,
)
extract_private_key_input = gr.Textbox(
label="Your Private Key (PEM Format)",
placeholder="Paste your RSA private key here, or load from file...",
lines=8,
info="Required to decrypt the data.",
elem_id="key_pem_textbox" # Apply custom CSS
)
# Optional: File upload for Private Key
# extract_private_key_file = gr.File(
# label="Load Private Key from .pem file",
# type="filepath", # or "bytes" if reading in python
# file_count="single",
# # Add logic to read file content into the textbox
# )
extract_button = gr.Button("Extract Secrets "+ICON_EXTRACT, variant="primary")
with gr.Column(scale=2):
gr.Markdown("### Extracted Data & Status")
extract_output_status = gr.Textbox(
label="Extraction Status",
lines=2,
interactive=False,
placeholder="Status messages will appear here..."
)
extract_output_data = gr.Textbox(
label="Extracted Secret Data",
lines=10,
interactive=False,
placeholder="Extracted data (usually JSON) will appear here...",
show_copy_button=True
)
# --- Interactivity ---
extract_button.click(
fn=gradio_extract_data,
inputs=[
extract_stego_image_upload,
extract_private_key_input
],
outputs=[
extract_output_data,
extract_output_status
]
)
with gr.TabItem(f"{ICON_KEY} Key Management (Generate Keys)"):
with gr.Row():
with gr.Column():
gr.Markdown("### Generate RSA Key Pair")
gr.Markdown(
"Use this tab to generate a new pair of RSA Public and Private Keys.
"
"🔑 **The Public Key** is used by others to encrypt data *for you*. You can share this freely.
"
"🔐 **The Private Key** is used *only by you* to decrypt data sent to you. Keep this **absolutely secret and secure**! If you lose it, you cannot decrypt data encrypted with the corresponding Public Key.
"
"Keys are generated in standard PEM format."
)
key_size_dropdown = gr.Dropdown(
label="Key Size (bits)",
# Corrected the constant name here as well
choices=[2048, 3072, 4096],
value=core.RSA_KEY_SIZE_DEFAULT,
info="Larger keys are more secure but result in larger encrypted data payloads (affecting image capacity)."
)
generate_key_button = gr.Button("Generate New Key Pair "+ICON_KEY, variant="secondary")
key_gen_status = gr.Textbox(
label="Status",
interactive=False,
placeholder="Key generation status..."
)
with gr.Column():
gr.Markdown("### Your Generated Keys")
generated_private_key = gr.Textbox(
label="Generated Private Key (Keep Secret!)",
lines=10,
interactive=False,
show_copy_button=True,
elem_id="key_pem_textbox" # Apply custom CSS
)
generated_public_key = gr.Textbox(
label="Generated Public Key (Share This)",
lines=8,
interactive=False,
show_copy_button=True,
elem_id="key_pem_textbox" # Apply custom CSS
)
# Optional: Download buttons for keys
# download_private_key_file = gr.File(label="Download Private Key", interactive=False)
# download_public_key_file = gr.File(label="Download Public Key", interactive=False)
# --- Interactivity ---
generate_key_button.click(
fn=gradio_generate_keys,
inputs=[key_size_dropdown],
outputs=[generated_private_key, generated_public_key, key_gen_status]
# Add outputs for download files if implemented
)
# Footer
gr.Markdown("
")
gr.Markdown(f"")
return keylock_app_interface
def main():
app_logger.info(f"Starting KeyLock Gradio Application v{__version__} (RSA Encryption Mode)...")
try:
# Attempt to load a common font to check PIL/Pillow font handling
# These checks are more for diagnostic logging
ImageFont.truetype("DejaVuSans.ttf", 10) # Common on Linux
app_logger.info("DejaVuSans font found, PIL font rendering should be good.")
except IOError:
try:
ImageFont.truetype("arial.ttf", 10) # Common on Windows
app_logger.info("Arial font found, PIL font rendering should be good.")
except IOError:
app_logger.warning("Common system fonts (DejaVuSans/Arial) not found. PIL might use basic bitmap font if other preferred fonts in core.py are also unavailable. Overlay text quality might be affected.")
keylock_app_interface = build_interface()
# Prepare launch arguments
launch_args = {} # Start with empty dict
# Add allowed_paths for temp directory for file downloads
temp_dir = tempfile.gettempdir()
launch_args["allowed_paths"] = [temp_dir]
app_logger.info(f"Allowing file downloads from temp directory: {temp_dir}")
# Check environment variables for server name/port
server_name = os.environ.get('GRADIO_SERVER_NAME')
server_port = os.environ.get('GRADIO_SERVER_PORT')
if server_name:
launch_args["server_name"] = server_name
app_logger.info(f"Using server_name from environment: {server_name}")
if server_port:
try:
launch_args["server_port"] = int(server_port)
app_logger.info(f"Using server_port from environment: {server_port}")
except ValueError:
app_logger.warning(f"Invalid GRADIO_SERVER_PORT: {server_port}. Using default.")
# Check for --share argument (optional, for public sharing)
# if "--share" in sys.argv: # Need to import sys if using this
# launch_args["share"] = True
# app_logger.info("Launching with share=True...")
keylock_app_interface.launch(**launch_args)
# Entry point for the Gradio app
if __name__ == "__main__":
# Add basic file logging for production if needed, alongside stream handler
# file_handler = logging.FileHandler('keylock_app.log')
# file_handler.setFormatter(formatter) # Use formatter defined earlier
# app_logger.addHandler(file_handler)
# logger.addHandler(file_handler) # Add to core logger too
main()