Database / routes.py
Lavlu118557's picture
Create routes.py
5da618c verified
from flask import render_template, request, redirect, url_for, session, flash, jsonify
from flask_babel import _, ngettext
from app import app, db
from models import DataTable, DataPipeline, DataRecord
from utils import parse_yaml, fuzzy_search, render_markdown, create_dynamic_table
import json
import yaml
import logging
@app.route('/')
def index():
"""Main dashboard"""
tables = DataTable.query.filter_by(is_active=True).all()
pipelines = DataPipeline.query.filter_by(is_active=True).all()
return render_template('index.html', tables=tables, pipelines=pipelines)
@app.route('/language/<language>')
def set_language(language=None):
"""Set the language preference"""
if language in app.config['LANGUAGES']:
session['language'] = language
flash(_('Language changed successfully'), 'success')
else:
flash(_('Invalid language selection'), 'error')
# Redirect to the referring page or index
return redirect(request.referrer or url_for('index'))
@app.route('/create_table', methods=['GET', 'POST'])
def create_table():
"""Create a new data table"""
if request.method == 'POST':
try:
name = request.form.get('name')
description = request.form.get('description')
schema_text = request.form.get('schema')
if not name or not schema_text:
flash(_('Table name and schema are required'), 'error')
return redirect(url_for('create_table'))
# Parse schema (can be JSON or YAML)
try:
if schema_text.strip().startswith('{'):
schema = json.loads(schema_text)
else:
schema = yaml.safe_load(schema_text)
except (json.JSONDecodeError, yaml.YAMLError) as e:
flash(_('Invalid schema format: {}').format(str(e)), 'error')
return redirect(url_for('create_table'))
# Check if table already exists
existing_table = DataTable.query.filter_by(name=name).first()
if existing_table:
flash(_('Table with this name already exists'), 'error')
return redirect(url_for('create_table'))
# Create table record
new_table = DataTable()
new_table.name = name
new_table.description = description
new_table.schema = schema
db.session.add(new_table)
db.session.commit()
flash(_('Table created successfully! πŸŽ‰'), 'success')
return redirect(url_for('table_view', table_name=name))
except Exception as e:
logging.error(f"Error creating table: {str(e)}")
flash(_('Error creating table: {}').format(str(e)), 'error')
return redirect(url_for('create_table'))
return render_template('create_table.html')
@app.route('/table/<table_name>')
def table_view(table_name):
"""View and manage table data"""
table = DataTable.query.filter_by(name=table_name, is_active=True).first()
if not table:
flash(_('Table not found'), 'error')
return redirect(url_for('index'))
# Get records for this table
records = DataRecord.query.filter_by(table_name=table_name).order_by(DataRecord.created_at.desc()).all()
# Convert records to JSON-serializable format
records_data = []
for record in records:
records_data.append({
'id': record.id,
'data': record.data,
'created_at': record.created_at.strftime('%Y-%m-%d %H:%M:%S') if record.created_at else None,
'updated_at': record.updated_at.strftime('%Y-%m-%d %H:%M:%S') if record.updated_at else None
})
return render_template('table_view.html', table=table, records=records, records_json=records_data)
@app.route('/table/<table_name>/add', methods=['POST'])
def add_record(table_name):
"""Add a new record to a table"""
table = DataTable.query.filter_by(name=table_name, is_active=True).first()
if not table:
flash(_('Table not found'), 'error')
return redirect(url_for('index'))
try:
# Get form data
data = {}
for field in table.schema.get('fields', []):
field_name = field.get('name')
field_value = request.form.get(field_name)
if field_value:
data[field_name] = field_value
# Create new record
new_record = DataRecord()
new_record.table_name = table_name
new_record.data = data
db.session.add(new_record)
db.session.commit()
flash(_('Record added successfully! βœ…'), 'success')
except Exception as e:
logging.error(f"Error adding record: {str(e)}")
flash(_('Error adding record: {}').format(str(e)), 'error')
return redirect(url_for('table_view', table_name=table_name))
@app.route('/data_pipeline', methods=['GET', 'POST'])
def data_pipeline():
"""Create and manage data pipelines"""
if request.method == 'POST':
try:
name = request.form.get('name')
description = request.form.get('description')
source_table = request.form.get('source_table')
target_table = request.form.get('target_table')
yaml_config = request.form.get('yaml_config')
if not name or not source_table or not yaml_config:
flash(_('Pipeline name, source table, and YAML configuration are required'), 'error')
return redirect(url_for('data_pipeline'))
# Parse YAML configuration
try:
pipeline_config = yaml.safe_load(yaml_config)
except yaml.YAMLError as e:
flash(_('Invalid YAML configuration: {}').format(str(e)), 'error')
return redirect(url_for('data_pipeline'))
# Check if pipeline already exists
existing_pipeline = DataPipeline.query.filter_by(name=name).first()
if existing_pipeline:
flash(_('Pipeline with this name already exists'), 'error')
return redirect(url_for('data_pipeline'))
# Create pipeline record
new_pipeline = DataPipeline()
new_pipeline.name = name
new_pipeline.description = description
new_pipeline.source_table = source_table
new_pipeline.target_table = target_table
new_pipeline.pipeline_config = pipeline_config
new_pipeline.yaml_config = yaml_config
db.session.add(new_pipeline)
db.session.commit()
flash(_('Data pipeline created successfully! πŸš€'), 'success')
return redirect(url_for('index'))
except Exception as e:
logging.error(f"Error creating pipeline: {str(e)}")
flash(_('Error creating pipeline: {}').format(str(e)), 'error')
return redirect(url_for('data_pipeline'))
# Get available tables for dropdown
tables = DataTable.query.filter_by(is_active=True).all()
return render_template('data_pipeline.html', tables=tables)
@app.route('/search')
def search():
"""Fuzzy search across all tables"""
query = request.args.get('q', '')
if not query:
return jsonify([])
results = []
# Search through all data records
records = DataRecord.query.all()
for record in records:
# Perform fuzzy search on record data
matches = fuzzy_search(query, record.data)
if matches:
results.append({
'table_name': record.table_name,
'record_id': record.id,
'matches': matches,
'data': record.data
})
return jsonify(results)
@app.route('/api/markdown', methods=['POST'])
def api_markdown():
"""Render markdown text"""
json_data = request.get_json() or {}
text = json_data.get('text', '')
html = render_markdown(text)
return jsonify({'html': html})
@app.route('/delete_table/<table_name>', methods=['POST'])
def delete_table(table_name):
"""Soft delete a table"""
table = DataTable.query.filter_by(name=table_name).first()
if table:
table.is_active = False
db.session.commit()
flash(_('Table deleted successfully'), 'success')
else:
flash(_('Table not found'), 'error')
return redirect(url_for('index'))
@app.route('/delete_pipeline/<int:pipeline_id>', methods=['POST'])
def delete_pipeline(pipeline_id):
"""Soft delete a pipeline"""
pipeline = DataPipeline.query.get(pipeline_id)
if pipeline:
pipeline.is_active = False
db.session.commit()
flash(_('Pipeline deleted successfully'), 'success')
else:
flash(_('Pipeline not found'), 'error')
return redirect(url_for('index'))
@app.route('/delete_record/<int:record_id>', methods=['POST'])
def delete_record(record_id):
"""Delete a data record"""
record = DataRecord.query.get(record_id)
if record:
table_name = record.table_name
db.session.delete(record)
db.session.commit()
flash(_('Record deleted successfully'), 'success')
return redirect(url_for('table_view', table_name=table_name))
else:
flash(_('Record not found'), 'error')
return redirect(url_for('index'))