n8n-dan / app.py
danilonovais's picture
Upload 7 files
a9f0f7c verified
raw
history blame
9.96 kB
#!/usr/bin/env python3
"""
πŸš€ N8N Workflow Documentation Space for Hugging Face
Optimized entry point for Hugging Face Spaces deployment.
"""
import os
import sys
import asyncio
import logging
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent
if str(BASE_DIR) not in sys.path:
sys.path.insert(0, str(BASE_DIR))
# Load environment configuration for Hugging Face Spaces
def load_environment():
"""Load environment variables from .env.hf if it exists."""
env_file = Path(".env.hf")
if env_file.exists():
with open(env_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
if key not in os.environ:
os.environ[key] = value
# Load environment first
load_environment()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Set environment variables for HF Spaces
os.environ.setdefault("HOST", "0.0.0.0")
os.environ.setdefault("PORT", "7860") # HF Spaces standard port
# Database configuration - Auto-detect production environment and use PostgreSQL
# In Hugging Face Spaces, prioritize PostgreSQL if available
db_type = os.environ.get("DB_TYPE")
# Auto-detect if we're in production (HF Spaces) and should use PostgreSQL
if not db_type:
# Check for production environment indicators
is_production = (
os.environ.get("SPACE_ID") or
os.environ.get("SPACES_BUILDKIT_VERSION") or
os.path.exists("/.dockerenv") or
os.environ.get("RAILWAY_PROJECT_ID") or
os.environ.get("RENDER_SERVICE_ID")
)
if is_production:
# Check if filesystem is writable
try:
test_path = Path("./test_write.tmp")
test_path.touch()
test_path.unlink()
filesystem_writable = True
except (PermissionError, OSError):
filesystem_writable = False
if not filesystem_writable:
logger.info("πŸ”’ Detected read-only filesystem - will use temporary storage")
# Prefer PostgreSQL in production if credentials might be available
db_type = "postgresdb"
logger.info("🐘 Auto-detected production environment - attempting PostgreSQL")
else:
db_type = "sqlite"
logger.info("πŸ“ Auto-detected local environment - using SQLite")
if db_type == "postgresdb":
# PostgreSQL configuration for Supabase
os.environ.setdefault("DB_POSTGRESDB_HOST", "aws-1-sa-east-1.pooler.supabase.com")
os.environ.setdefault("DB_POSTGRESDB_PORT", "6543")
os.environ.setdefault("DB_POSTGRESDB_DATABASE", "postgres")
os.environ.setdefault("DB_POSTGRESDB_SSL", "true")
logger.info("🐘 Using PostgreSQL database configuration")
else:
# SQLite fallback
os.environ.setdefault("WORKFLOW_DB_PATH", "database/workflows.db")
logger.info("πŸ“ Using SQLite database configuration")
logger.info("πŸ“ Using SQLite database configuration")
def setup_huggingface_environment():
"""Setup directories and environment for HF Spaces."""
logger.info("πŸ”§ Setting up Hugging Face Spaces environment...")
# Create necessary directories with error handling
directory_paths = {
"database": Path("database"),
"static": Path("static"),
"workflows": Path("workflows")
}
for name, path in directory_paths.items():
try:
path.mkdir(exist_ok=True, parents=True, mode=0o755)
logger.info(f"βœ… Directory created/verified: {name} ({path.absolute()})")
except (PermissionError, OSError) as e:
logger.warning(f"⚠️ Could not create {name} directory ({e}) - using fallback")
if name == "database":
# Force temp database location
os.environ["WORKFLOW_DB_PATH"] = "/tmp/workflows.db"
logger.info(f"πŸ” Database will use temp location: /tmp/workflows.db")
elif name == "static":
# Static files will be served from memory
logger.info("πŸ“ Static files will be served from memory")
elif name == "workflows":
# Workflows directory fallback
logger.info("πŸ“ Workflows will be loaded from embedded sources if available")
os.environ.setdefault("WORKFLOW_SOURCE_DIR", str(directory_paths["workflows"]))
os.environ.setdefault("STATIC_DIR", str(directory_paths["static"]))
# Initialize database
try:
from workflow_db import WorkflowDatabase
# Determine database configuration
db_type = os.environ.get("DB_TYPE", "sqlite")
if db_type == "postgresdb":
# Note: Current WorkflowDatabase only supports SQLite
# PostgreSQL support would need to be implemented in WorkflowDatabase class
logger.warning("⚠️ PostgreSQL requested but WorkflowDatabase only supports SQLite")
logger.info("πŸ” Falling back to SQLite (PostgreSQL support not implemented)...")
db_type = "sqlite"
os.environ["DB_TYPE"] = "sqlite"
if db_type == "sqlite":
# Try local database first, fall back to temp if read-only filesystem
try:
db_path = BASE_DIR / "database" / "workflows.db"
logger.info(f"πŸ“ Attempting to use SQLite database: {db_path}")
# Test if we can write to the database directory
test_file = db_path.parent / "test_write.tmp"
test_file.touch()
test_file.unlink()
# If we get here, we can write to the directory
if not Path(db_path).exists() or Path(db_path).stat().st_size == 0:
logger.info("πŸ“š Initializing SQLite workflows database...")
db = WorkflowDatabase(str(db_path))
else:
db = WorkflowDatabase(str(db_path))
except (PermissionError, OSError) as e:
logger.warning(f"⚠️ Cannot write to local database directory: {e}")
logger.info("πŸ” Using temporary database location...")
temp_db_path = "/tmp/workflows.db"
os.environ["WORKFLOW_DB_PATH"] = temp_db_path
logger.info(f"πŸ“ Using temporary SQLite database: {temp_db_path}")
db = WorkflowDatabase(temp_db_path)
# Database will be checked and indexed in the background by the API server
logger.info("βœ… Database setup complete. Indexing will be handled by the API server.")
except Exception as e:
logger.error(f"❌ Database setup error: {e}")
# Final fallback - try temp database
try:
logger.info("πŸ” Attempting final fallback to temporary database...")
temp_db_path = "/tmp/fallback_workflows.db"
os.environ["DB_TYPE"] = "sqlite"
os.environ["WORKFLOW_DB_PATH"] = temp_db_path
from workflow_db import WorkflowDatabase
db = WorkflowDatabase(temp_db_path)
logger.info(f"βœ… Emergency fallback database created: {temp_db_path}")
except Exception as fallback_error:
logger.warning(f"⚠️ All database options failed: {fallback_error}")
logger.info("πŸ“ API will start without pre-initialized database")
logger.info("πŸ”„ Database initialization will be attempted on first API request")
# Create a minimal environment setup so the API can still start
os.environ["DB_TYPE"] = "sqlite"
os.environ["WORKFLOW_DB_PATH"] = "/tmp/delayed_init_workflows.db"
async def startup_tasks():
"""Perform startup tasks asynchronously."""
try:
setup_huggingface_environment()
logger.info("πŸŽ‰ Hugging Face Spaces setup completed successfully!")
except Exception as e:
logger.error(f"❌ Startup error: {e}")
# Don't fail completely, let the app start anyway
def main():
"""Main entry point for Hugging Face Spaces."""
logger.info("πŸš€ Starting N8N Workflow Documentation API on Hugging Face Spaces...")
# Run startup tasks
asyncio.run(startup_tasks())
# Import and start the FastAPI app
try:
from api_server import app
import uvicorn
# Get configuration from environment
host = os.getenv("HOST", "0.0.0.0")
port = int(os.getenv("PORT", "7860"))
logger.info(f"🌐 Server starting on {host}:{port}")
logger.info("πŸ“Š API Documentation will be available at /docs")
# Start the server
uvicorn.run(
app,
host=host,
port=port,
log_level="info",
access_log=True
)
except ImportError as e:
logger.error(f"❌ Failed to import required modules: {e}")
sys.exit(1)
except Exception as e:
logger.error(f"❌ Failed to start server: {e}")
sys.exit(1)
# For Hugging Face Spaces compatibility
if __name__ == "__main__":
main()
# Also expose the app directly for gunicorn/uvicorn
try:
# Ensure environment is set up
asyncio.run(startup_tasks())
from api_server import app
except Exception as e:
logger.warning(f"⚠️ Could not set up environment during import: {e}")
# Create a minimal fallback app
from fastapi import FastAPI
app = FastAPI(title="N8N Workflows API - Setup Required")
@app.get("/")
async def root():
return {"message": "N8N Workflows API - Setup in progress", "status": "initializing"}
@app.get("/health")
async def health():
return {"status": "starting"}