nas / main.py
Starchik1's picture
Rename app.py to main.py
0b49701 verified
from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, session, jsonify
import os
import sqlite3
import hashlib
from werkzeug.utils import secure_filename
import datetime
import shutil
import re
import mimetypes
import json
def get_file_type(filename):
"""Определяет тип файла на основе его расширения"""
mime_type, _ = mimetypes.guess_type(filename)
if mime_type:
if mime_type.startswith('image/'):
return 'image'
elif mime_type.startswith('video/'):
return 'video'
elif mime_type.startswith('audio/'):
return 'audio'
elif mime_type.startswith('text/') or mime_type in ['application/json', 'application/xml', 'application/javascript']:
return 'text'
# Проверяем расширение файла для текстовых форматов
text_extensions = ['.txt', '.html', '.css', '.js', '.py', '.md', '.json', '.xml', '.csv', '.ini', '.cfg', '.conf']
for ext in text_extensions:
if filename.lower().endswith(ext):
return 'text'
return 'other'
def hex_to_rgb(value):
"""Преобразует hex-код цвета в формат RGB"""
value = value.lstrip('#')
return f"{int(value[0:2], 16)}, {int(value[2:4], 16)}, {int(value[4:6], 16)}"
# Хелперы для работы с базой данных
def get_db_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def hash_password(password):
return hashlib.sha256(password.encode()).hexdigest()
def get_file_by_id(file_id):
"""Получает информацию о файле по его ID"""
if not file_id:
return None
conn = get_db_connection()
file = conn.execute('SELECT * FROM files WHERE id = ?', (file_id,)).fetchone()
conn.close()
return file
def format_size(size_bytes):
"""Форматирует размер файла в человекочитаемый вид"""
if size_bytes < 1024:
return f"{size_bytes} Б"
elif size_bytes < 1024 * 1024:
return f"{size_bytes/1024:.1f} КБ"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes/(1024*1024):.1f} МБ"
else:
return f"{size_bytes/(1024*1024*1024):.1f} ГБ"
app = Flask(__name__)
app.secret_key = 'supersecretkey'
# Регистрация пользовательского фильтра для преобразования цветовых кодов
app.jinja_env.filters['hex_to_rgb'] = hex_to_rgb
app.jinja_env.globals['get_file_by_id'] = get_file_by_id
app.jinja_env.globals['format_size'] = format_size
# Настройка путей для хранения файлов
BASEDIR = os.path.abspath(os.path.dirname(__file__))
UPLOAD_FOLDER = os.path.join(BASEDIR, 'uploads')
BACKGROUNDS_FOLDER = os.path.join(BASEDIR, 'static', 'backgrounds')
DB_PATH = os.path.join(BASEDIR, 'database.db')
# Создаем папки для загрузок и фонов, если они не существуют
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
if not os.path.exists(BACKGROUNDS_FOLDER):
os.makedirs(BACKGROUNDS_FOLDER)
# Инициализация базы данных
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Создание таблицы пользователей
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
background_image TEXT DEFAULT 'default.jpg',
color_scheme TEXT DEFAULT '#0d6efd',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Создание таблицы файлов
cursor.execute('''
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
original_filename TEXT NOT NULL,
path TEXT NOT NULL,
size INTEGER NOT NULL,
user_id INTEGER NOT NULL,
parent_folder TEXT DEFAULT '/',
is_folder BOOLEAN DEFAULT 0,
file_type TEXT DEFAULT 'other',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Создание таблицы общего доступа
cursor.execute('''
CREATE TABLE IF NOT EXISTS shares (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
shared_with INTEGER,
permission TEXT DEFAULT 'read',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (file_id) REFERENCES files (id),
FOREIGN KEY (user_id) REFERENCES users (id),
FOREIGN KEY (shared_with) REFERENCES users (id)
)
''')
# Создание таблицы контактов
cursor.execute('''
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
contact_id INTEGER NOT NULL,
status TEXT DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id),
FOREIGN KEY (contact_id) REFERENCES users (id),
UNIQUE(user_id, contact_id)
)
''')
# Создание таблицы сообщений
cursor.execute('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender_id INTEGER NOT NULL,
receiver_id INTEGER NOT NULL,
content TEXT NOT NULL,
attachment_id INTEGER,
is_read BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (sender_id) REFERENCES users (id),
FOREIGN KEY (receiver_id) REFERENCES users (id),
FOREIGN KEY (attachment_id) REFERENCES files (id)
)
''')
conn.commit()
conn.close()
# Инициализация базы данных при запуске
# Маршрут для получения содержимого файла для редактирования
@app.route('/get_file_content/<int:file_id>')
def get_file_content(file_id):
if 'user_id' not in session:
return redirect(url_for('login'))
user_id = session['user_id']
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Получаем информацию о файле
cursor.execute('SELECT filename, path, user_id FROM files WHERE id = ?', (file_id,))
file_data = cursor.fetchone()
if not file_data:
conn.close()
return 'Файл не найден', 404
# Проверяем, принадлежит ли файл пользователю
if file_data[2] != user_id:
# Проверяем, есть ли у пользователя доступ к файлу
cursor.execute('SELECT permission FROM shares WHERE file_id = ? AND shared_with = ?', (file_id, user_id))
share_data = cursor.fetchone()
conn.close()
if not share_data:
return 'Доступ запрещен', 403
# Для просмотра достаточно прав на чтение или запись
if share_data[0] not in ['read', 'write']:
return 'Доступ запрещен', 403
else:
conn.close()
# Получаем полный путь к файлу
file_path = os.path.join(BASEDIR, file_data[1])
try:
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
return content
except Exception as e:
return str(e), 500
# Маршрут для сохранения отредактированного содержимого файла
@app.route('/save_file_content/<int:file_id>', methods=['POST'])
def save_file_content(file_id):
if 'user_id' not in session:
return jsonify({'success': False, 'error': 'Необходима авторизация'}), 401
user_id = session['user_id']
# Получаем данные из запроса
data = request.get_json()
if not data or 'content' not in data:
return jsonify({'success': False, 'error': 'Отсутствует содержимое файла'}), 400
content = data['content']
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Получаем информацию о файле
cursor.execute('SELECT filename, path, user_id FROM files WHERE id = ?', (file_id,))
file_data = cursor.fetchone()
conn.close()
if not file_data:
return jsonify({'success': False, 'error': 'Файл не найден'}), 404
# Проверяем, принадлежит ли файл пользователю
if file_data[2] != user_id:
# Проверяем, есть ли у пользователя доступ к файлу
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT permission FROM shares WHERE file_id = ? AND shared_with = ?', (file_id, user_id))
share_data = cursor.fetchone()
conn.close()
if not share_data or share_data[0] != 'write':
return jsonify({'success': False, 'error': 'Доступ запрещен'}), 403
# Получаем полный путь к файлу
file_path = os.path.join(BASEDIR, file_data[1])
try:
with open(file_path, 'w', encoding='utf-8') as file:
file.write(content)
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
init_db()
# Проверка авторизации
def login_required(f):
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('Пожалуйста, войдите в систему для доступа к этой странице', 'error')
return redirect(url_for('login'))
return f(*args, **kwargs)
decorated_function.__name__ = f.__name__
return decorated_function
# Маршруты приложения
@app.route('/')
def index():
if 'user_id' in session:
return redirect(url_for('dashboard'))
return render_template('index.html', now=datetime.datetime.now(), background='default.jpg')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
email = request.form['email']
background_image = 'default.jpg'
conn = get_db_connection()
# Проверка, существует ли пользователь
user = conn.execute('SELECT * FROM users WHERE username = ? OR email = ?',
(username, email)).fetchone()
if user:
flash('Пользователь с таким именем или email уже существует', 'error')
conn.close()
return redirect(url_for('register'))
# Создание нового пользователя
hashed_password = hash_password(password)
conn.execute('INSERT INTO users (username, password, email, background_image) VALUES (?, ?, ?, ?)',
(username, hashed_password, email, background_image))
conn.commit()
# Получение ID нового пользователя
user_id = conn.execute('SELECT id FROM users WHERE username = ?', (username,)).fetchone()[0]
# Создание персональной папки для пользователя
user_folder = os.path.join(UPLOAD_FOLDER, str(user_id))
if not os.path.exists(user_folder):
os.makedirs(user_folder)
conn.close()
flash('Регистрация успешна! Теперь вы можете войти в систему', 'success')
return redirect(url_for('login'))
return render_template('register.html', now=datetime.datetime.now(), background='default.jpg')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
conn = get_db_connection()
user = conn.execute('SELECT * FROM users WHERE username = ?', (username,)).fetchone()
if user and user['password'] == hash_password(password):
session['user_id'] = user['id']
session['username'] = user['username']
session['background'] = user['background_image']
session['color_scheme'] = user['color_scheme']
conn.close()
flash('Вы успешно вошли в систему!', 'success')
return redirect(url_for('dashboard'))
else:
flash('Неверное имя пользователя или пароль', 'error')
conn.close()
return redirect(url_for('login'))
return render_template('login.html', now=datetime.datetime.now(), background='default.jpg')
@app.route('/logout')
def logout():
session.clear()
flash('Вы вышли из системы', 'info')
return redirect(url_for('index'))
@app.route('/dashboard')
@login_required
def dashboard():
user_id = session['user_id']
current_folder = request.args.get('folder', '/')
# Нормализация текущей папки
if current_folder != '/' and not current_folder.endswith('/'):
current_folder += '/'
conn = get_db_connection()
# Исправленный запрос с точным совпадением
files = conn.execute(
'SELECT * FROM files WHERE user_id = ? AND parent_folder = ? ORDER BY is_folder DESC, filename',
(user_id, current_folder)
).fetchall()
# Получение родительской папки
parent_folder = os.path.dirname(current_folder.rstrip('/'))
if parent_folder == '':
parent_folder = '/'
# Получение информации о текущей папке
current_folder_info = None
if current_folder != '/':
current_folder_info = conn.execute(
'SELECT * FROM files WHERE user_id = ? AND filename = ? AND is_folder = 1',
(user_id, current_folder)
).fetchone()
# Получение пути навигации
breadcrumbs = []
if current_folder != '/':
parts = current_folder.strip('/').split('/')
path = '/'
breadcrumbs.append(('/', 'Корень'))
for i, part in enumerate(parts):
if part:
path += part + '/'
breadcrumbs.append((path, part))
else:
breadcrumbs.append(('/', 'Корень'))
conn.close()
return render_template('dashboard.html',
files=files,
current_folder=current_folder,
breadcrumbs=breadcrumbs,
current_folder_info=current_folder_info,
now=datetime.datetime.now(),
background=session.get('background', 'default.jpg'))
@app.route('/upload', methods=['POST'])
@login_required
def upload_file():
user_id = session['user_id']
current_folder = request.form.get('current_folder', '/')
# Проверка, была ли отправлена форма с файлом
if 'files[]' not in request.files:
flash('Файл не выбран', 'error')
return redirect(url_for('dashboard', folder=current_folder))
files = request.files.getlist('files[]')
# Если файлы не выбраны
if not files or all(file.filename == '' for file in files):
flash('Файлы не выбраны', 'error')
return redirect(url_for('dashboard', folder=current_folder))
# Обработка всех файлов
success_count = 0
for file in files:
if file.filename == '':
continue
try:
conn = get_db_connection()
filename = secure_filename(file.filename)
# Определение пути для сохранения файла
user_folder = os.path.join(UPLOAD_FOLDER, str(user_id))
# Создание пути к текущей папке
if current_folder == '/':
save_path = user_folder
else:
folder_path = current_folder.strip('/')
save_path = os.path.join(user_folder, folder_path)
# Проверка существования папки
if not os.path.exists(save_path):
os.makedirs(save_path)
# Полный путь к файлу
file_path = os.path.join(save_path, filename)
# Сохранение файла
file.save(file_path)
# Получение размера файла
file_size = os.path.getsize(file_path)
# Определение типа файла
file_type = get_file_type(filename)
conn.execute(
'INSERT INTO files (filename, original_filename, path, size, user_id, parent_folder, file_type) VALUES (?, ?, ?, ?, ?, ?, ?)',
(filename, file.filename, file_path, file_size, user_id, current_folder, file_type)
)
conn.commit()
success_count += 1
except Exception as e:
print(f'Ошибка при загрузке файла {file.filename}: {str(e)}')
finally:
conn.close()
if success_count > 0:
flash(f'Успешно загружено файлов: {success_count}', 'success')
else:
flash('Не удалось загрузить ни одного файла', 'error')
return redirect(url_for('dashboard', folder=current_folder))
# Обновляем функцию создания папки
@app.route('/create_folder', methods=['POST'])
@login_required
def create_folder():
user_id = session['user_id']
folder_name = request.form['folder_name']
current_folder = request.form.get('current_folder', '/')
# Нормализация пути
if current_folder != '/' and not current_folder.endswith('/'):
current_folder += '/'
# Формирование полного пути
full_path = current_folder + folder_name + '/'
# Проверка существования
conn = get_db_connection()
existing = conn.execute('SELECT * FROM files WHERE filename = ? AND user_id = ?',
(full_path, user_id)).fetchone()
if existing:
flash('Папка уже существует', 'error')
return redirect(url_for('dashboard', folder=current_folder))
# Формирование корректного пути для БД
db_path = f"{current_folder}{folder_name}/"
# Обновленный путь для файловой системы
user_folder = os.path.join(UPLOAD_FOLDER, str(user_id))
new_folder_path = os.path.join(user_folder, current_folder.lstrip('/'), folder_name)
# Обновленный SQL-запрос
conn.execute(
'INSERT INTO files (filename, original_filename, path, size, user_id, parent_folder, is_folder) VALUES (?, ?, ?, ?, ?, ?, ?)',
(db_path, folder_name, new_folder_path, 0, user_id, current_folder, 1)
)
conn.commit()
conn.close()
flash(f'Папка {folder_name} успешно создана', 'success')
return redirect(url_for('dashboard', folder=current_folder))
@app.route('/download/<int:file_id>')
@login_required
def download_file(file_id):
user_id = session['user_id']
conn = get_db_connection()
# Проверяем, есть ли у пользователя прямой доступ к файлу
file = conn.execute('SELECT * FROM files WHERE id = ? AND user_id = ?', (file_id, user_id)).fetchone()
# Если файл не принадлежит пользователю, проверяем общий доступ
if not file:
file = conn.execute('''
SELECT f.* FROM files f
JOIN shares s ON f.id = s.file_id
WHERE f.id = ? AND s.shared_with = ?
''', (file_id, user_id)).fetchone()
# Если файл не найден через общий доступ, проверяем доступ через сообщения
if not file:
# Проверяем, является ли пользователь получателем сообщения с этим вложением
file = conn.execute('''
SELECT f.*
FROM files f
JOIN messages m ON f.id = m.attachment_id
WHERE f.id = ? AND m.receiver_id = ?
''', (file_id, user_id)).fetchone()
# Также проверяем, является ли пользователь отправителем сообщения с этим вложением
if not file:
file = conn.execute('''
SELECT f.*
FROM files f
JOIN messages m ON f.id = m.attachment_id
WHERE f.id = ? AND m.sender_id = ?
''', (file_id, user_id)).fetchone()
conn.close()
if file and not file['is_folder']:
directory = os.path.dirname(file['path'])
return send_from_directory(directory, os.path.basename(file['path']), as_attachment=True, download_name=file['original_filename'])
else:
flash('Файл не найден или у вас нет прав для его скачивания', 'error')
return redirect(url_for('dashboard'))
@app.route('/delete/<int:file_id>', methods=['POST'])
@login_required
def delete_file(file_id):
user_id = session['user_id']
conn = get_db_connection()
file = conn.execute('SELECT * FROM files WHERE id = ? AND user_id = ?', (file_id, user_id)).fetchone()
if not file:
conn.close()
flash('Файл не найден или у вас нет прав для его удаления', 'error')
return redirect(url_for('dashboard'))
current_folder = file['parent_folder']
# Если это папка, удаляем все вложенные файлы и папки
if file['is_folder']:
# Удаление физической папки
if os.path.exists(file['path']):
shutil.rmtree(file['path'])
# Удаление всех записей о файлах и папках внутри этой папки
folder_path = file['filename']
conn.execute('DELETE FROM files WHERE user_id = ? AND (parent_folder = ? OR parent_folder LIKE ?)',
(user_id, folder_path, folder_path + '/%'))
else:
# Удаление физического файла
if os.path.exists(file['path']):
os.remove(file['path'])
# Удаление записи о файле/папке из базы данных
conn.execute('DELETE FROM files WHERE id = ?', (file_id,))
# Удаление всех записей о совместном доступе к этому файлу
conn.execute('DELETE FROM shares WHERE file_id = ?', (file_id,))
conn.commit()
conn.close()
flash('Файл успешно удален', 'success')
return redirect(url_for('dashboard', folder=current_folder))
@app.route('/share/<int:file_id>', methods=['GET', 'POST'])
@login_required
def share_file(file_id):
user_id = session['user_id']
conn = get_db_connection()
file = conn.execute('SELECT * FROM files WHERE id = ? AND user_id = ? AND is_folder = 0', (file_id, user_id)).fetchone()
if not file:
conn.close()
flash('Файл не найден или у вас нет прав для его совместного использования', 'error')
return redirect(url_for('dashboard'))
if request.method == 'POST':
shared_username = request.form['username']
permission = request.form['permission']
# Проверка существования пользователя
shared_user = conn.execute('SELECT id FROM users WHERE username = ?', (shared_username,)).fetchone()
if not shared_user:
conn.close()
flash(f'Пользователь {shared_username} не найден', 'error')
return redirect(url_for('share_file', file_id=file_id))
# Проверка, не пытается ли пользователь поделиться с самим собой
if shared_user['id'] == user_id:
conn.close()
flash('Вы не можете поделиться файлом с самим собой', 'error')
return redirect(url_for('share_file', file_id=file_id))
# Проверка, не поделился ли пользователь уже с этим пользователем
existing_share = conn.execute(
'SELECT * FROM shares WHERE file_id = ? AND user_id = ? AND shared_with = ?',
(file_id, user_id, shared_user['id'])
).fetchone()
if existing_share:
# Обновление прав доступа
conn.execute(
'UPDATE shares SET permission = ? WHERE id = ?',
(permission, existing_share['id'])
)
flash(f'Права доступа для пользователя {shared_username} обновлены', 'success')
else:
# Создание новой записи о совместном доступе
conn.execute(
'INSERT INTO shares (file_id, user_id, shared_with, permission) VALUES (?, ?, ?, ?)',
(file_id, user_id, shared_user['id'], permission)
)
flash(f'Файл успешно предоставлен пользователю {shared_username}', 'success')
conn.commit()
conn.close()
return redirect(url_for('dashboard'))
# Получение списка пользователей, с которыми уже поделились
shares = conn.execute(
'''SELECT s.*, u.username FROM shares s
JOIN users u ON s.shared_with = u.id
WHERE s.file_id = ? AND s.user_id = ?''',
(file_id, user_id)
).fetchall()
conn.close()
return render_template('share.html', file=file, shares=shares, now=datetime.datetime.now(), background=session.get('background', 'default.jpg'))
@app.route('/shared_with_me')
@login_required
def shared_with_me():
user_id = session['user_id']
conn = get_db_connection()
shared_files = conn.execute(
'''SELECT f.*, s.permission, u.username as owner_username
FROM files f
JOIN shares s ON f.id = s.file_id
JOIN users u ON f.user_id = u.id
WHERE s.shared_with = ?
ORDER BY f.is_folder DESC, f.filename''',
(user_id,)
).fetchall()
# Преобразуем результаты запроса для корректного отображения в шаблоне
files_with_access = []
for file in shared_files:
file_dict = dict(file)
file_dict['access_type'] = file_dict['permission'] # Копируем permission в access_type
file_dict['owner'] = file_dict['owner_username'] # Копируем owner_username в owner
files_with_access.append(file_dict)
conn.close()
return render_template('shared_with_me.html', files=files_with_access, now=datetime.datetime.now(), background=session.get('background', 'default.jpg'))
@app.route('/settings')
@login_required
def settings():
conn = get_db_connection()
user = conn.execute('SELECT color_scheme FROM users WHERE id = ?', (session['user_id'],)).fetchone()
conn.close()
return render_template('settings.html',
now=datetime.datetime.now(),
background=session.get('background', 'default.jpg'),
color_scheme=user['color_scheme'])
@app.route('/change_color_scheme', methods=['POST'])
@login_required
def change_color_scheme():
user_id = session['user_id']
color_scheme = request.form['color_scheme']
conn = get_db_connection()
conn.execute('UPDATE users SET color_scheme = ? WHERE id = ?', (color_scheme, user_id))
conn.commit()
conn.close()
session['color_scheme'] = color_scheme
flash('Цветовая схема успешно обновлена', 'success')
return redirect(url_for('settings'))
@app.route('/change_background', methods=['POST'])
@login_required
def change_background():
user_id = session['user_id']
if 'background_file' not in request.files:
flash('Файл не выбран', 'error')
return redirect(url_for('settings'))
file = request.files['background_file']
if file.filename == '':
flash('Файл не выбран', 'error')
return redirect(url_for('settings'))
# Проверка расширения файла
allowed_extensions = {'jpg', 'jpeg', 'png', 'gif'}
if not '.' in file.filename or file.filename.rsplit('.', 1)[1].lower() not in allowed_extensions:
flash('Недопустимый формат файла. Разрешены только изображения (jpg, jpeg, png, gif)', 'error')
return redirect(url_for('settings'))
# Создаем уникальное имя файла
filename = f"user_{user_id}_{secure_filename(file.filename)}"
file_path = os.path.join(BACKGROUNDS_FOLDER, filename)
# Сохраняем файл
file.save(file_path)
# Обновляем информацию в базе данных
conn = get_db_connection()
conn.execute('UPDATE users SET background_image = ? WHERE id = ?', (filename, user_id))
conn.commit()
conn.close()
# Обновляем сессию
session['background'] = filename
flash('Фоновое изображение успешно обновлено', 'success')
return redirect(url_for('settings'))
# Маршруты для работы с контактами
@app.route('/contacts')
@login_required
def contacts():
user_id = session['user_id']
conn = get_db_connection()
# Получаем список контактов пользователя
contacts = conn.execute('''
SELECT c.id, c.contact_id, c.status, u.username, u.email,
(SELECT COUNT(*) FROM messages
WHERE sender_id = c.contact_id AND receiver_id = ? AND is_read = 0) as unread_count
FROM contacts c
JOIN users u ON c.contact_id = u.id
WHERE c.user_id = ? AND c.status = 'accepted'
ORDER BY u.username
''', (user_id, user_id)).fetchall()
# Получаем входящие запросы на добавление в контакты
incoming_requests = conn.execute('''
SELECT c.id, c.user_id, u.username, u.email
FROM contacts c
JOIN users u ON c.user_id = u.id
WHERE c.contact_id = ? AND c.status = 'pending'
ORDER BY c.created_at DESC
''', (user_id,)).fetchall()
# Получаем исходящие запросы на добавление в контакты
outgoing_requests = conn.execute('''
SELECT c.id, c.contact_id, u.username, u.email
FROM contacts c
JOIN users u ON c.contact_id = u.id
WHERE c.user_id = ? AND c.status = 'pending'
ORDER BY c.created_at DESC
''', (user_id,)).fetchall()
conn.close()
return render_template('contacts.html',
contacts=contacts,
incoming_requests=incoming_requests,
outgoing_requests=outgoing_requests,
now=datetime.datetime.now(),
background=session.get('background', 'default.jpg'))
@app.route('/add_contact', methods=['POST'])
@login_required
def add_contact():
user_id = session['user_id']
username_or_email = request.form['username_or_email']
conn = get_db_connection()
# Ищем пользователя по имени или email
user = conn.execute('SELECT id FROM users WHERE username = ? OR email = ?',
(username_or_email, username_or_email)).fetchone()
if not user:
flash('Пользователь не найден', 'error')
conn.close()
return redirect(url_for('contacts'))
contact_id = user['id']
# Проверяем, не пытается ли пользователь добавить сам себя
if contact_id == user_id:
flash('Вы не можете добавить себя в контакты', 'error')
conn.close()
return redirect(url_for('contacts'))
# Проверяем, существует ли уже такой контакт
existing = conn.execute('SELECT * FROM contacts WHERE user_id = ? AND contact_id = ?',
(user_id, contact_id)).fetchone()
if existing:
flash('Этот пользователь уже в вашем списке контактов или запросов', 'error')
conn.close()
return redirect(url_for('contacts'))
# Проверяем, есть ли входящий запрос от этого пользователя
incoming = conn.execute('SELECT * FROM contacts WHERE user_id = ? AND contact_id = ?',
(contact_id, user_id)).fetchone()
if incoming:
# Если есть входящий запрос, принимаем его
conn.execute('UPDATE contacts SET status = "accepted" WHERE user_id = ? AND contact_id = ?',
(contact_id, user_id))
# И создаем обратную связь
conn.execute('INSERT INTO contacts (user_id, contact_id, status) VALUES (?, ?, "accepted")',
(user_id, contact_id))
flash('Контакт успешно добавлен', 'success')
else:
# Иначе создаем новый запрос
conn.execute('INSERT INTO contacts (user_id, contact_id, status) VALUES (?, ?, "pending")',
(user_id, contact_id))
flash('Запрос на добавление в контакты отправлен', 'success')
conn.commit()
conn.close()
return redirect(url_for('contacts'))
@app.route('/accept_contact/<int:contact_id>', methods=['POST'])
@login_required
def accept_contact(contact_id):
user_id = session['user_id']
conn = get_db_connection()
# Получаем информацию о запросе
contact_request = conn.execute('SELECT * FROM contacts WHERE id = ? AND contact_id = ?',
(contact_id, user_id)).fetchone()
if not contact_request:
flash('Запрос не найден', 'error')
conn.close()
return redirect(url_for('contacts'))
# Принимаем запрос
conn.execute('UPDATE contacts SET status = "accepted" WHERE id = ?', (contact_id,))
# Создаем обратную связь
requester_id = contact_request['user_id']
existing = conn.execute('SELECT * FROM contacts WHERE user_id = ? AND contact_id = ?',
(user_id, requester_id)).fetchone()
if not existing:
conn.execute('INSERT INTO contacts (user_id, contact_id, status) VALUES (?, ?, "accepted")',
(user_id, requester_id))
else:
conn.execute('UPDATE contacts SET status = "accepted" WHERE user_id = ? AND contact_id = ?',
(user_id, requester_id))
conn.commit()
conn.close()
flash('Контакт успешно добавлен', 'success')
return redirect(url_for('contacts'))
@app.route('/reject_contact/<int:contact_id>', methods=['POST'])
@login_required
def reject_contact(contact_id):
user_id = session['user_id']
conn = get_db_connection()
# Удаляем запрос
conn.execute('DELETE FROM contacts WHERE id = ? AND contact_id = ?', (contact_id, user_id))
conn.commit()
conn.close()
flash('Запрос отклонен', 'info')
return redirect(url_for('contacts'))
@app.route('/remove_contact/<int:contact_id>', methods=['POST'])
@login_required
def remove_contact(contact_id):
user_id = session['user_id']
conn = get_db_connection()
# Получаем ID контакта
contact = conn.execute('SELECT contact_id FROM contacts WHERE id = ? AND user_id = ?',
(contact_id, user_id)).fetchone()
if not contact:
flash('Контакт не найден', 'error')
conn.close()
return redirect(url_for('contacts'))
other_user_id = contact['contact_id']
# Удаляем связь в обе стороны
conn.execute('DELETE FROM contacts WHERE user_id = ? AND contact_id = ?', (user_id, other_user_id))
conn.execute('DELETE FROM contacts WHERE user_id = ? AND contact_id = ?', (other_user_id, user_id))
conn.commit()
conn.close()
flash('Контакт удален', 'info')
return redirect(url_for('contacts'))
# Маршруты для работы с сообщениями
@app.route('/messages/<int:contact_id>')
@login_required
def messages(contact_id):
user_id = session['user_id']
conn = get_db_connection()
# Получаем список контактов для боковой панели
contacts = conn.execute('''
SELECT c.id, c.contact_id, u.username,
(SELECT COUNT(*) FROM messages
WHERE sender_id = c.contact_id AND receiver_id = ? AND is_read = 0) as unread_count
FROM contacts c
JOIN users u ON c.contact_id = u.id
WHERE c.user_id = ? AND c.status = 'accepted'
ORDER BY u.username
''', (user_id, user_id)).fetchall()
# Если contact_id = 0, это значит, что пользователь перешел из меню
# В этом случае перенаправляем на первый контакт или показываем пустую страницу
if contact_id == 0:
# Если есть контакты, перенаправляем на первый
if contacts:
conn.close()
return redirect(url_for('messages', contact_id=contacts[0]['contact_id']))
# Иначе показываем пустую страницу сообщений
conn.close()
return render_template('messages.html',
contact=None,
messages=[],
contacts=contacts,
user_id=user_id,
now=datetime.datetime.now(),
background=session.get('background', 'default.jpg'))
# Проверяем, есть ли этот пользователь в контактах
contact = conn.execute('''
SELECT c.id, c.contact_id, u.username, u.email
FROM contacts c
JOIN users u ON c.contact_id = u.id
WHERE c.user_id = ? AND c.contact_id = ? AND c.status = 'accepted'
''', (user_id, contact_id)).fetchone()
if not contact:
flash('Контакт не найден или не подтвержден', 'error')
conn.close()
return redirect(url_for('contacts'))
# Получаем историю сообщений
messages = conn.execute('''
SELECT * FROM messages
WHERE (sender_id = ? AND receiver_id = ?) OR (sender_id = ? AND receiver_id = ?)
ORDER BY created_at
''', (user_id, contact_id, contact_id, user_id)).fetchall()
# Отмечаем все сообщения как прочитанные
conn.execute('UPDATE messages SET is_read = 1 WHERE sender_id = ? AND receiver_id = ?',
(contact_id, user_id))
conn.commit()
conn.close()
return render_template('messages.html',
contact=contact,
messages=messages,
contacts=contacts,
user_id=user_id,
now=datetime.datetime.now(),
background=session.get('background', 'default.jpg'))
@app.route('/send_message/<int:contact_id>', methods=['POST'])
@login_required
def send_message(contact_id):
user_id = session['user_id']
content = request.form['content']
if not content or content.strip() == '':
flash('Сообщение не может быть пустым', 'error')
return redirect(url_for('messages', contact_id=contact_id))
conn = get_db_connection()
# Проверяем, есть ли этот пользователь в контактах
contact = conn.execute('''
SELECT * FROM contacts
WHERE user_id = ? AND contact_id = ? AND status = 'accepted'
''', (user_id, contact_id)).fetchone()
if not contact:
flash('Контакт не найден или не подтвержден', 'error')
conn.close()
return redirect(url_for('contacts'))
# Обработка прикрепленного файла
attachment_id = None
if 'attachment' in request.files and request.files['attachment'].filename != '':
file = request.files['attachment']
filename = secure_filename(file.filename)
# Создаем папку для вложений, если она не существует
attachments_folder = os.path.join(UPLOAD_FOLDER, str(user_id), 'attachments')
if not os.path.exists(attachments_folder):
os.makedirs(attachments_folder)
# Сохраняем файл
file_path = os.path.join(attachments_folder, filename)
file.save(file_path)
# Получаем размер файла
file_size = os.path.getsize(file_path)
# Определяем тип файла
file_type = get_file_type(filename)
# Сохраняем информацию о файле в базу данных
cursor = conn.cursor()
cursor.execute(
'INSERT INTO files (filename, original_filename, path, size, user_id, parent_folder, file_type) VALUES (?, ?, ?, ?, ?, ?, ?)',
(filename, file.filename, file_path, file_size, user_id, '/attachments/', file_type)
)
conn.commit()
# Получаем ID созданного файла
attachment_id = cursor.lastrowid
# Сохраняем сообщение с прикрепленным файлом (если есть)
conn.execute('INSERT INTO messages (sender_id, receiver_id, content, attachment_id) VALUES (?, ?, ?, ?)',
(user_id, contact_id, content, attachment_id))
conn.commit()
conn.close()
return redirect(url_for('messages', contact_id=contact_id))
@app.route('/get_unread_count')
@login_required
def get_unread_count():
user_id = session['user_id']
conn = get_db_connection()
# Получаем общее количество непрочитанных сообщений
unread_count = conn.execute('''
SELECT COUNT(*) as count FROM messages
WHERE receiver_id = ? AND is_read = 0
''', (user_id,)).fetchone()['count']
conn.close()
return jsonify({'unread_count': unread_count})
@app.route('/get_new_messages/<int:contact_id>')
@login_required
def get_new_messages(contact_id):
user_id = session['user_id']
last_id = request.args.get('last_id', 0, type=int)
conn = get_db_connection()
# Проверяем, есть ли этот пользователь в контактах
contact = conn.execute('''
SELECT * FROM contacts
WHERE user_id = ? AND contact_id = ? AND status = 'accepted'
''', (user_id, contact_id)).fetchone()
if not contact:
conn.close()
return jsonify({'error': 'Контакт не найден или не подтвержден', 'messages': []})
# Получаем новые сообщения после указанного ID
messages = conn.execute('''
SELECT * FROM messages
WHERE ((sender_id = ? AND receiver_id = ?) OR (sender_id = ? AND receiver_id = ?)) AND id > ?
ORDER BY created_at
''', (user_id, contact_id, contact_id, user_id, last_id)).fetchall()
# Отмечаем все сообщения как прочитанные
conn.execute('UPDATE messages SET is_read = 1 WHERE sender_id = ? AND receiver_id = ?',
(contact_id, user_id))
conn.commit()
# Преобразуем сообщения в словари и добавляем информацию о вложениях
message_list = []
for message in messages:
message_dict = dict(message)
# Если у сообщения есть вложение, добавляем информацию о нем
if message['attachment_id']:
attachment = conn.execute('SELECT * FROM files WHERE id = ?', (message['attachment_id'],)).fetchone()
if attachment:
attachment_dict = dict(attachment)
attachment_dict['size_formatted'] = format_size(attachment['size'])
message_dict['attachment'] = attachment_dict
message_list.append(message_dict)
conn.close()
return jsonify({
'messages': message_list,
'user_id': user_id
})
@app.route('/search_users')
@login_required
def search_users():
user_id = session['user_id']
query = request.args.get('q', '')
if len(query) < 3:
return jsonify([])
conn = get_db_connection()
# Ищем пользователей по имени или email
users = conn.execute('''
SELECT id, username, email FROM users
WHERE (username LIKE ? OR email LIKE ?) AND id != ?
LIMIT 10
''', (f'%{query}%', f'%{query}%', user_id)).fetchall()
# Проверяем, есть ли уже контакты с найденными пользователями
result = []
for user in users:
contact = conn.execute('''
SELECT status FROM contacts
WHERE user_id = ? AND contact_id = ?
''', (user_id, user['id'])).fetchone()
status = contact['status'] if contact else None
result.append({
'id': user['id'],
'username': user['username'],
'email': user['email'],
'status': status
})
conn.close()
return jsonify(result)
@app.route('/view_shared/<int:file_id>')
@login_required
def view_shared_file(file_id):
user_id = session['user_id']
conn = get_db_connection()
# Проверка прав доступа
share = conn.execute(
'SELECT s.*, f.* FROM shares s JOIN files f ON s.file_id = f.id WHERE s.file_id = ? AND s.shared_with = ?',
(file_id, user_id)
).fetchone()
if not share:
conn.close()
flash('Файл не найден или у вас нет прав для его просмотра', 'error')
return redirect(url_for('shared_with_me'))
# Если это папка, показываем ее содержимое
if share['is_folder']:
# Получение файлов в общей папке
folder_path = share['filename']
shared_files = conn.execute(
'''SELECT f.*, s.permission
FROM files f
JOIN shares s ON f.id = s.file_id
WHERE f.user_id = ? AND f.parent_folder = ? AND s.shared_with = ?
ORDER BY f.is_folder DESC, f.filename''',
(share['user_id'], folder_path, user_id)
).fetchall()
conn.close()
return render_template('view_shared_folder.html', folder=share, files=shared_files, now=datetime.datetime.now(), background=session.get('background', 'default.jpg'))
else:
# Если это файл, скачиваем его
conn.close()
directory = os.path.dirname(share['path'])
return send_from_directory(directory, os.path.basename(share['path']), as_attachment=True, download_name=share['original_filename'])
@app.route('/delete_share/<int:share_id>', methods=['POST'])
@login_required
def delete_share(share_id):
user_id = session['user_id']
conn = get_db_connection()
# Получаем информацию о записи общего доступа
share = conn.execute('SELECT * FROM shares WHERE id = ? AND user_id = ?', (share_id, user_id)).fetchone()
if not share:
conn.close()
flash('Запись не найдена или у вас нет прав для ее удаления', 'error')
return redirect(url_for('dashboard'))
# Получаем информацию о файле для перенаправления
file_id = share['file_id']
# Удаляем запись об общем доступе
conn.execute('DELETE FROM shares WHERE id = ?', (share_id,))
conn.commit()
conn.close()
flash('Общий доступ успешно отменен', 'success')
return redirect(url_for('share_file', file_id=file_id))
@app.route('/search')
@login_required
def search():
user_id = session['user_id']
query = request.args.get('query', '')
if not query:
return redirect(url_for('dashboard'))
conn = get_db_connection()
# Поиск по имени файла
files = conn.execute(
'''SELECT * FROM files
WHERE user_id = ? AND (filename LIKE ? OR original_filename LIKE ?)
ORDER BY is_folder DESC, filename''',
(user_id, f'%{query}%', f'%{query}%')
).fetchall()
# Поиск в общих файлах
shared_files = conn.execute(
'''SELECT f.*, s.permission, u.username as owner_username
FROM files f
JOIN shares s ON f.id = s.file_id
JOIN users u ON f.user_id = u.id
WHERE s.shared_with = ? AND (f.filename LIKE ? OR f.original_filename LIKE ?)
ORDER BY f.is_folder DESC, f.filename''',
(user_id, f'%{query}%', f'%{query}%')
).fetchall()
conn.close()
return render_template('search_results.html',
files=files,
shared_files=shared_files,
query=query,
now=datetime.datetime.now(),
background=session.get('background', 'default.jpg'))
# Запуск приложения
if __name__ == '__main__':
app.run(host='0.0.0.0', debug=True)