broadfield-dev's picture
Update server/server.py
5b92625 verified
import os
import io
import json
import base64
import struct
import logging
import gradio as gr
from PIL import Image
import numpy as np
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.exceptions import InvalidTag
HEADER_BITS = 32
AES_GCM_NONCE_SIZE = 12
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
KEYLOCK_PRIV_KEY_PEM = os.environ.get('KEYLOCK_PRIV_KEY')
PRIVATE_KEY_OBJECT = None
PUBLIC_KEY_PEM_STRING = ""
KEYLOCK_STATUS_MESSAGE = ""
if not KEYLOCK_PRIV_KEY_PEM:
dev_key_path = os.path.join(os.path.dirname(__file__), '..', 'keys', 'DEMO_ONLY_THIS _IS_SECRET_keylock_priv_key.pem')
try:
with open(dev_key_path, "r") as f:
KEYLOCK_PRIV_KEY_PEM = f.read()
logger.warning(f"Loaded private key from dev path. This is for local testing only.")
KEYLOCK_STATUS_MESSAGE = f"⚠️ Loaded from development key file. This is for local testing but insecure for production."
except FileNotFoundError:
logger.error(f"FATAL: Private key not found at '{dev_key_path}' and 'KEYLOCK_PRIV_KEY' secret is not set.")
KEYLOCK_STATUS_MESSAGE = "❌ NOT FOUND. The API is non-functional. Set the `KEYLOCK_PRIV_KEY` secret or provide the demo key file."
else:
logger.info("Successfully loaded private key from environment variable 'KEYLOCK_PRIV_KEY'.")
KEYLOCK_STATUS_MESSAGE = "✅ Loaded successfully from secrets/environment variable. Recommended secure configuration."
if KEYLOCK_PRIV_KEY_PEM:
try:
PRIVATE_KEY_OBJECT = serialization.load_pem_private_key(KEYLOCK_PRIV_KEY_PEM.encode(), password=None)
public_key = PRIVATE_KEY_OBJECT.public_key()
PUBLIC_KEY_PEM_STRING = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode('utf-8')
KEYLOCK_STATUS_MESSAGE += "\n✅ Public key derived successfully."
except Exception as e:
logger.error(f"Failed to load private key or derive public key: {e}", exc_info=True)
PRIVATE_KEY_OBJECT = None
PUBLIC_KEY_PEM_STRING = f"Error: Could not process the configured private key. Details: {e}"
KEYLOCK_STATUS_MESSAGE += f"\n❌ Failed to parse key: {e}"
MOCK_USER_DATABASE = {
"sk-12345-abcde": {"user": "demo-user", "permissions": "read"},
"sk-67890-fghij": {"user": "admin-user", "permissions": "read,write,delete"}
}
def example_authenticate(api_key: str, user_id: str) -> bool:
if not api_key or not user_id:
return False
db_entry = MOCK_USER_DATABASE.get(api_key)
if db_entry and db_entry.get("user") == user_id:
logger.info(f"Authentication successful for user '{user_id}'.")
return True
else:
logger.warning(f"Authentication failed for user '{user_id}' with key '{api_key[:8]}...'.")
return False
def get_public_key():
if not PUBLIC_KEY_PEM_STRING or "Error" in PUBLIC_KEY_PEM_STRING:
raise gr.Error("Server key is not configured correctly.")
return PUBLIC_KEY_PEM_STRING
def get_server_info():
return {
"name": "KeyLock Auth Server",
"version": "1.4",
"documentation": "This server decrypts data hidden in KeyLock images and performs a mock authentication based on the decrypted payload.",
"required_payload_keys": [
{"key_name": "API_KEY", "description": "Your unique API Key for this service.", "example": "sk-12345-abcde"},
{"key_name": "USER", "description": "The user ID associated with the key.", "example": "demo-user"}
],
"endpoints": {
"/keylock-pub": "GET - Returns the server's public key.",
"/keylock-info": "GET - Returns this information object.",
"/keylock-server": "POST - Decrypts a KeyLock image and attempts authentication."
}
}
def decode_data(image_base64_string: str) -> dict:
if not PRIVATE_KEY_OBJECT:
error_msg = "Server Error: The API is not configured with a private key."
logger.error(error_msg)
raise gr.Error(error_msg)
try:
image_buffer = base64.b64decode(image_base64_string)
img = Image.open(io.BytesIO(image_buffer)).convert("RGB")
pixel_data = np.array(img).ravel()
header_binary_string = "".join(str(pixel & 1) for pixel in pixel_data[:HEADER_BITS])
data_length = int(header_binary_string, 2)
if data_length == 0: raise ValueError("No data found in image.")
data_bits_count = data_length * 8
end_offset = HEADER_BITS + data_bits_count
if pixel_data.size < end_offset: raise ValueError("Image data corrupt or truncated.")
data_binary_string = "".join(str(pixel & 1) for pixel in pixel_data[HEADER_BITS:end_offset])
crypto_payload = int(data_binary_string, 2).to_bytes(data_length, byteorder='big')
offset = 4
encrypted_aes_key_len = struct.unpack('>I', crypto_payload[:offset])[0]
encrypted_aes_key = crypto_payload[offset : offset + encrypted_aes_key_len]
offset += encrypted_aes_key_len
nonce = crypto_payload[offset : offset + AES_GCM_NONCE_SIZE]
offset += AES_GCM_NONCE_SIZE
ciphertext_with_tag = crypto_payload[offset:]
recovered_aes_key = PRIVATE_KEY_OBJECT.decrypt(encrypted_aes_key, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
decrypted_bytes = AESGCM(recovered_aes_key).decrypt(nonce, ciphertext_with_tag, None)
# Symmetrical decoding: json.loads correctly parses the JSON string into a Python dict.
decrypted_payload = json.loads(decrypted_bytes.decode('utf-8'))
logger.info(f"Successfully decoded payload: {decrypted_payload}")
# The values retrieved are now clean Python strings, ready for direct use.
api_key = decrypted_payload.get('API_KEY')
user_id = decrypted_payload.get('USER')
is_authenticated = example_authenticate(api_key=api_key, user_id=user_id)
if is_authenticated:
return {
"authentication_status": "Success",
"message": f"User '{user_id}' successfully authenticated.",
"granted_permissions": MOCK_USER_DATABASE[api_key]['permissions'],
"decoded_payload": decrypted_payload
}
else:
return {
"authentication_status": "Failed",
"message": "Authentication failed. Invalid credentials provided in the image.",
"decoded_payload": decrypted_payload
}
except (ValueError, InvalidTag, TypeError, struct.error) as e:
logger.warning(f"Decryption failed: {e}")
raise gr.Error(f"Decryption failed. Image may be corrupt or used the wrong public key. Details: {e}")
except Exception as e:
logger.error(f"An unexpected server error occurred during decryption: {e}", exc_info=True)
raise gr.Error(f"An unexpected server error occurred. Details: {e}")
def generate_rsa_keys():
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_pem = private_key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode('utf-8')
public_pem = private_key.public_key().public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo).decode('utf-8')
return private_pem, public_pem