from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, session, jsonify import os import sqlite3 import hashlib from werkzeug.utils import secure_filename import datetime import shutil import re import mimetypes import json def get_file_type(filename): """Определяет тип файла на основе его расширения""" mime_type, _ = mimetypes.guess_type(filename) if mime_type: if mime_type.startswith('image/'): return 'image' elif mime_type.startswith('video/'): return 'video' elif mime_type.startswith('audio/'): return 'audio' elif mime_type.startswith('text/') or mime_type in ['application/json', 'application/xml', 'application/javascript']: return 'text' # Проверяем расширение файла для текстовых форматов text_extensions = ['.txt', '.html', '.css', '.js', '.py', '.md', '.json', '.xml', '.csv', '.ini', '.cfg', '.conf'] for ext in text_extensions: if filename.lower().endswith(ext): return 'text' return 'other' def hex_to_rgb(value): """Преобразует hex-код цвета в формат RGB""" value = value.lstrip('#') return f"{int(value[0:2], 16)}, {int(value[2:4], 16)}, {int(value[4:6], 16)}" # Хелперы для работы с базой данных def get_db_connection(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def hash_password(password): return hashlib.sha256(password.encode()).hexdigest() def get_file_by_id(file_id): """Получает информацию о файле по его ID""" if not file_id: return None conn = get_db_connection() file = conn.execute('SELECT * FROM files WHERE id = ?', (file_id,)).fetchone() conn.close() return file def format_size(size_bytes): """Форматирует размер файла в человекочитаемый вид""" if size_bytes < 1024: return f"{size_bytes} Б" elif size_bytes < 1024 * 1024: return f"{size_bytes/1024:.1f} КБ" elif size_bytes < 1024 * 1024 * 1024: return f"{size_bytes/(1024*1024):.1f} МБ" else: return f"{size_bytes/(1024*1024*1024):.1f} ГБ" app = Flask(__name__) app.secret_key = 'supersecretkey' # Регистрация пользовательского фильтра для преобразования цветовых кодов app.jinja_env.filters['hex_to_rgb'] = hex_to_rgb app.jinja_env.globals['get_file_by_id'] = get_file_by_id app.jinja_env.globals['format_size'] = format_size # Настройка путей для хранения файлов BASEDIR = os.path.abspath(os.path.dirname(__file__)) UPLOAD_FOLDER = os.path.join(BASEDIR, 'uploads') BACKGROUNDS_FOLDER = os.path.join(BASEDIR, 'static', 'backgrounds') DB_PATH = os.path.join(BASEDIR, 'database.db') # Создаем папки для загрузок и фонов, если они не существуют if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) if not os.path.exists(BACKGROUNDS_FOLDER): os.makedirs(BACKGROUNDS_FOLDER) # Инициализация базы данных def init_db(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Создание таблицы пользователей cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password TEXT NOT NULL, email TEXT UNIQUE NOT NULL, background_image TEXT DEFAULT 'default.jpg', color_scheme TEXT DEFAULT '#0d6efd', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Создание таблицы файлов cursor.execute(''' CREATE TABLE IF NOT EXISTS files ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT NOT NULL, original_filename TEXT NOT NULL, path TEXT NOT NULL, size INTEGER NOT NULL, user_id INTEGER NOT NULL, parent_folder TEXT DEFAULT '/', is_folder BOOLEAN DEFAULT 0, file_type TEXT DEFAULT 'other', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (id) ) ''') # Создание таблицы общего доступа cursor.execute(''' CREATE TABLE IF NOT EXISTS shares ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_id INTEGER NOT NULL, user_id INTEGER NOT NULL, shared_with INTEGER, permission TEXT DEFAULT 'read', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (file_id) REFERENCES files (id), FOREIGN KEY (user_id) REFERENCES users (id), FOREIGN KEY (shared_with) REFERENCES users (id) ) ''') # Создание таблицы контактов cursor.execute(''' CREATE TABLE IF NOT EXISTS contacts ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, contact_id INTEGER NOT NULL, status TEXT DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (id), FOREIGN KEY (contact_id) REFERENCES users (id), UNIQUE(user_id, contact_id) ) ''') # Создание таблицы сообщений cursor.execute(''' CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, sender_id INTEGER NOT NULL, receiver_id INTEGER NOT NULL, content TEXT NOT NULL, attachment_id INTEGER, is_read BOOLEAN DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (sender_id) REFERENCES users (id), FOREIGN KEY (receiver_id) REFERENCES users (id), FOREIGN KEY (attachment_id) REFERENCES files (id) ) ''') conn.commit() conn.close() # Инициализация базы данных при запуске # Маршрут для получения содержимого файла для редактирования @app.route('/get_file_content/') def get_file_content(file_id): if 'user_id' not in session: return redirect(url_for('login')) user_id = session['user_id'] conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Получаем информацию о файле cursor.execute('SELECT filename, path, user_id FROM files WHERE id = ?', (file_id,)) file_data = cursor.fetchone() if not file_data: conn.close() return 'Файл не найден', 404 # Проверяем, принадлежит ли файл пользователю if file_data[2] != user_id: # Проверяем, есть ли у пользователя доступ к файлу cursor.execute('SELECT permission FROM shares WHERE file_id = ? AND shared_with = ?', (file_id, user_id)) share_data = cursor.fetchone() conn.close() if not share_data: return 'Доступ запрещен', 403 # Для просмотра достаточно прав на чтение или запись if share_data[0] not in ['read', 'write']: return 'Доступ запрещен', 403 else: conn.close() # Получаем полный путь к файлу file_path = os.path.join(BASEDIR, file_data[1]) try: with open(file_path, 'r', encoding='utf-8') as file: content = file.read() return content except Exception as e: return str(e), 500 # Маршрут для сохранения отредактированного содержимого файла @app.route('/save_file_content/', methods=['POST']) def save_file_content(file_id): if 'user_id' not in session: return jsonify({'success': False, 'error': 'Необходима авторизация'}), 401 user_id = session['user_id'] # Получаем данные из запроса data = request.get_json() if not data or 'content' not in data: return jsonify({'success': False, 'error': 'Отсутствует содержимое файла'}), 400 content = data['content'] conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Получаем информацию о файле cursor.execute('SELECT filename, path, user_id FROM files WHERE id = ?', (file_id,)) file_data = cursor.fetchone() conn.close() if not file_data: return jsonify({'success': False, 'error': 'Файл не найден'}), 404 # Проверяем, принадлежит ли файл пользователю if file_data[2] != user_id: # Проверяем, есть ли у пользователя доступ к файлу conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute('SELECT permission FROM shares WHERE file_id = ? AND shared_with = ?', (file_id, user_id)) share_data = cursor.fetchone() conn.close() if not share_data or share_data[0] != 'write': return jsonify({'success': False, 'error': 'Доступ запрещен'}), 403 # Получаем полный путь к файлу file_path = os.path.join(BASEDIR, file_data[1]) try: with open(file_path, 'w', encoding='utf-8') as file: file.write(content) return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 init_db() # Проверка авторизации def login_required(f): def decorated_function(*args, **kwargs): if 'user_id' not in session: flash('Пожалуйста, войдите в систему для доступа к этой странице', 'error') return redirect(url_for('login')) return f(*args, **kwargs) decorated_function.__name__ = f.__name__ return decorated_function # Маршруты приложения @app.route('/') def index(): if 'user_id' in session: return redirect(url_for('dashboard')) return render_template('index.html', now=datetime.datetime.now(), background='default.jpg') @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] email = request.form['email'] background_image = 'default.jpg' conn = get_db_connection() # Проверка, существует ли пользователь user = conn.execute('SELECT * FROM users WHERE username = ? OR email = ?', (username, email)).fetchone() if user: flash('Пользователь с таким именем или email уже существует', 'error') conn.close() return redirect(url_for('register')) # Создание нового пользователя hashed_password = hash_password(password) conn.execute('INSERT INTO users (username, password, email, background_image) VALUES (?, ?, ?, ?)', (username, hashed_password, email, background_image)) conn.commit() # Получение ID нового пользователя user_id = conn.execute('SELECT id FROM users WHERE username = ?', (username,)).fetchone()[0] # Создание персональной папки для пользователя user_folder = os.path.join(UPLOAD_FOLDER, str(user_id)) if not os.path.exists(user_folder): os.makedirs(user_folder) conn.close() flash('Регистрация успешна! Теперь вы можете войти в систему', 'success') return redirect(url_for('login')) return render_template('register.html', now=datetime.datetime.now(), background='default.jpg') @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] conn = get_db_connection() user = conn.execute('SELECT * FROM users WHERE username = ?', (username,)).fetchone() if user and user['password'] == hash_password(password): session['user_id'] = user['id'] session['username'] = user['username'] session['background'] = user['background_image'] session['color_scheme'] = user['color_scheme'] conn.close() flash('Вы успешно вошли в систему!', 'success') return redirect(url_for('dashboard')) else: flash('Неверное имя пользователя или пароль', 'error') conn.close() return redirect(url_for('login')) return render_template('login.html', now=datetime.datetime.now(), background='default.jpg') @app.route('/logout') def logout(): session.clear() flash('Вы вышли из системы', 'info') return redirect(url_for('index')) @app.route('/dashboard') @login_required def dashboard(): user_id = session['user_id'] current_folder = request.args.get('folder', '/') # Нормализация текущей папки if current_folder != '/' and not current_folder.endswith('/'): current_folder += '/' conn = get_db_connection() # Исправленный запрос с точным совпадением files = conn.execute( 'SELECT * FROM files WHERE user_id = ? AND parent_folder = ? ORDER BY is_folder DESC, filename', (user_id, current_folder) ).fetchall() # Получение родительской папки parent_folder = os.path.dirname(current_folder.rstrip('/')) if parent_folder == '': parent_folder = '/' # Получение информации о текущей папке current_folder_info = None if current_folder != '/': current_folder_info = conn.execute( 'SELECT * FROM files WHERE user_id = ? AND filename = ? AND is_folder = 1', (user_id, current_folder) ).fetchone() # Получение пути навигации breadcrumbs = [] if current_folder != '/': parts = current_folder.strip('/').split('/') path = '/' breadcrumbs.append(('/', 'Корень')) for i, part in enumerate(parts): if part: path += part + '/' breadcrumbs.append((path, part)) else: breadcrumbs.append(('/', 'Корень')) conn.close() return render_template('dashboard.html', files=files, current_folder=current_folder, breadcrumbs=breadcrumbs, current_folder_info=current_folder_info, now=datetime.datetime.now(), background=session.get('background', 'default.jpg')) @app.route('/upload', methods=['POST']) @login_required def upload_file(): user_id = session['user_id'] current_folder = request.form.get('current_folder', '/') # Проверка, была ли отправлена форма с файлом if 'files[]' not in request.files: flash('Файл не выбран', 'error') return redirect(url_for('dashboard', folder=current_folder)) files = request.files.getlist('files[]') # Если файлы не выбраны if not files or all(file.filename == '' for file in files): flash('Файлы не выбраны', 'error') return redirect(url_for('dashboard', folder=current_folder)) # Обработка всех файлов success_count = 0 for file in files: if file.filename == '': continue try: conn = get_db_connection() filename = secure_filename(file.filename) # Определение пути для сохранения файла user_folder = os.path.join(UPLOAD_FOLDER, str(user_id)) # Создание пути к текущей папке if current_folder == '/': save_path = user_folder else: folder_path = current_folder.strip('/') save_path = os.path.join(user_folder, folder_path) # Проверка существования папки if not os.path.exists(save_path): os.makedirs(save_path) # Полный путь к файлу file_path = os.path.join(save_path, filename) # Сохранение файла file.save(file_path) # Получение размера файла file_size = os.path.getsize(file_path) # Определение типа файла file_type = get_file_type(filename) conn.execute( 'INSERT INTO files (filename, original_filename, path, size, user_id, parent_folder, file_type) VALUES (?, ?, ?, ?, ?, ?, ?)', (filename, file.filename, file_path, file_size, user_id, current_folder, file_type) ) conn.commit() success_count += 1 except Exception as e: print(f'Ошибка при загрузке файла {file.filename}: {str(e)}') finally: conn.close() if success_count > 0: flash(f'Успешно загружено файлов: {success_count}', 'success') else: flash('Не удалось загрузить ни одного файла', 'error') return redirect(url_for('dashboard', folder=current_folder)) # Обновляем функцию создания папки @app.route('/create_folder', methods=['POST']) @login_required def create_folder(): user_id = session['user_id'] folder_name = request.form['folder_name'] current_folder = request.form.get('current_folder', '/') # Нормализация пути if current_folder != '/' and not current_folder.endswith('/'): current_folder += '/' # Формирование полного пути full_path = current_folder + folder_name + '/' # Проверка существования conn = get_db_connection() existing = conn.execute('SELECT * FROM files WHERE filename = ? AND user_id = ?', (full_path, user_id)).fetchone() if existing: flash('Папка уже существует', 'error') return redirect(url_for('dashboard', folder=current_folder)) # Формирование корректного пути для БД db_path = f"{current_folder}{folder_name}/" # Обновленный путь для файловой системы user_folder = os.path.join(UPLOAD_FOLDER, str(user_id)) new_folder_path = os.path.join(user_folder, current_folder.lstrip('/'), folder_name) # Обновленный SQL-запрос conn.execute( 'INSERT INTO files (filename, original_filename, path, size, user_id, parent_folder, is_folder) VALUES (?, ?, ?, ?, ?, ?, ?)', (db_path, folder_name, new_folder_path, 0, user_id, current_folder, 1) ) conn.commit() conn.close() flash(f'Папка {folder_name} успешно создана', 'success') return redirect(url_for('dashboard', folder=current_folder)) @app.route('/download/') @login_required def download_file(file_id): user_id = session['user_id'] conn = get_db_connection() # Проверяем, есть ли у пользователя прямой доступ к файлу file = conn.execute('SELECT * FROM files WHERE id = ? AND user_id = ?', (file_id, user_id)).fetchone() # Если файл не принадлежит пользователю, проверяем общий доступ if not file: file = conn.execute(''' SELECT f.* FROM files f JOIN shares s ON f.id = s.file_id WHERE f.id = ? AND s.shared_with = ? ''', (file_id, user_id)).fetchone() # Если файл не найден через общий доступ, проверяем доступ через сообщения if not file: # Проверяем, является ли пользователь получателем сообщения с этим вложением file = conn.execute(''' SELECT f.* FROM files f JOIN messages m ON f.id = m.attachment_id WHERE f.id = ? AND m.receiver_id = ? ''', (file_id, user_id)).fetchone() # Также проверяем, является ли пользователь отправителем сообщения с этим вложением if not file: file = conn.execute(''' SELECT f.* FROM files f JOIN messages m ON f.id = m.attachment_id WHERE f.id = ? AND m.sender_id = ? ''', (file_id, user_id)).fetchone() conn.close() if file and not file['is_folder']: directory = os.path.dirname(file['path']) return send_from_directory(directory, os.path.basename(file['path']), as_attachment=True, download_name=file['original_filename']) else: flash('Файл не найден или у вас нет прав для его скачивания', 'error') return redirect(url_for('dashboard')) @app.route('/delete/', methods=['POST']) @login_required def delete_file(file_id): user_id = session['user_id'] conn = get_db_connection() file = conn.execute('SELECT * FROM files WHERE id = ? AND user_id = ?', (file_id, user_id)).fetchone() if not file: conn.close() flash('Файл не найден или у вас нет прав для его удаления', 'error') return redirect(url_for('dashboard')) current_folder = file['parent_folder'] # Если это папка, удаляем все вложенные файлы и папки if file['is_folder']: # Удаление физической папки if os.path.exists(file['path']): shutil.rmtree(file['path']) # Удаление всех записей о файлах и папках внутри этой папки folder_path = file['filename'] conn.execute('DELETE FROM files WHERE user_id = ? AND (parent_folder = ? OR parent_folder LIKE ?)', (user_id, folder_path, folder_path + '/%')) else: # Удаление физического файла if os.path.exists(file['path']): os.remove(file['path']) # Удаление записи о файле/папке из базы данных conn.execute('DELETE FROM files WHERE id = ?', (file_id,)) # Удаление всех записей о совместном доступе к этому файлу conn.execute('DELETE FROM shares WHERE file_id = ?', (file_id,)) conn.commit() conn.close() flash('Файл успешно удален', 'success') return redirect(url_for('dashboard', folder=current_folder)) @app.route('/share/', methods=['GET', 'POST']) @login_required def share_file(file_id): user_id = session['user_id'] conn = get_db_connection() file = conn.execute('SELECT * FROM files WHERE id = ? AND user_id = ? AND is_folder = 0', (file_id, user_id)).fetchone() if not file: conn.close() flash('Файл не найден или у вас нет прав для его совместного использования', 'error') return redirect(url_for('dashboard')) if request.method == 'POST': shared_username = request.form['username'] permission = request.form['permission'] # Проверка существования пользователя shared_user = conn.execute('SELECT id FROM users WHERE username = ?', (shared_username,)).fetchone() if not shared_user: conn.close() flash(f'Пользователь {shared_username} не найден', 'error') return redirect(url_for('share_file', file_id=file_id)) # Проверка, не пытается ли пользователь поделиться с самим собой if shared_user['id'] == user_id: conn.close() flash('Вы не можете поделиться файлом с самим собой', 'error') return redirect(url_for('share_file', file_id=file_id)) # Проверка, не поделился ли пользователь уже с этим пользователем existing_share = conn.execute( 'SELECT * FROM shares WHERE file_id = ? AND user_id = ? AND shared_with = ?', (file_id, user_id, shared_user['id']) ).fetchone() if existing_share: # Обновление прав доступа conn.execute( 'UPDATE shares SET permission = ? WHERE id = ?', (permission, existing_share['id']) ) flash(f'Права доступа для пользователя {shared_username} обновлены', 'success') else: # Создание новой записи о совместном доступе conn.execute( 'INSERT INTO shares (file_id, user_id, shared_with, permission) VALUES (?, ?, ?, ?)', (file_id, user_id, shared_user['id'], permission) ) flash(f'Файл успешно предоставлен пользователю {shared_username}', 'success') conn.commit() conn.close() return redirect(url_for('dashboard')) # Получение списка пользователей, с которыми уже поделились shares = conn.execute( '''SELECT s.*, u.username FROM shares s JOIN users u ON s.shared_with = u.id WHERE s.file_id = ? AND s.user_id = ?''', (file_id, user_id) ).fetchall() conn.close() return render_template('share.html', file=file, shares=shares, now=datetime.datetime.now(), background=session.get('background', 'default.jpg')) @app.route('/shared_with_me') @login_required def shared_with_me(): user_id = session['user_id'] conn = get_db_connection() shared_files = conn.execute( '''SELECT f.*, s.permission, u.username as owner_username FROM files f JOIN shares s ON f.id = s.file_id JOIN users u ON f.user_id = u.id WHERE s.shared_with = ? ORDER BY f.is_folder DESC, f.filename''', (user_id,) ).fetchall() # Преобразуем результаты запроса для корректного отображения в шаблоне files_with_access = [] for file in shared_files: file_dict = dict(file) file_dict['access_type'] = file_dict['permission'] # Копируем permission в access_type file_dict['owner'] = file_dict['owner_username'] # Копируем owner_username в owner files_with_access.append(file_dict) conn.close() return render_template('shared_with_me.html', files=files_with_access, now=datetime.datetime.now(), background=session.get('background', 'default.jpg')) @app.route('/settings') @login_required def settings(): conn = get_db_connection() user = conn.execute('SELECT color_scheme FROM users WHERE id = ?', (session['user_id'],)).fetchone() conn.close() return render_template('settings.html', now=datetime.datetime.now(), background=session.get('background', 'default.jpg'), color_scheme=user['color_scheme']) @app.route('/change_color_scheme', methods=['POST']) @login_required def change_color_scheme(): user_id = session['user_id'] color_scheme = request.form['color_scheme'] conn = get_db_connection() conn.execute('UPDATE users SET color_scheme = ? WHERE id = ?', (color_scheme, user_id)) conn.commit() conn.close() session['color_scheme'] = color_scheme flash('Цветовая схема успешно обновлена', 'success') return redirect(url_for('settings')) @app.route('/change_background', methods=['POST']) @login_required def change_background(): user_id = session['user_id'] if 'background_file' not in request.files: flash('Файл не выбран', 'error') return redirect(url_for('settings')) file = request.files['background_file'] if file.filename == '': flash('Файл не выбран', 'error') return redirect(url_for('settings')) # Проверка расширения файла allowed_extensions = {'jpg', 'jpeg', 'png', 'gif'} if not '.' in file.filename or file.filename.rsplit('.', 1)[1].lower() not in allowed_extensions: flash('Недопустимый формат файла. Разрешены только изображения (jpg, jpeg, png, gif)', 'error') return redirect(url_for('settings')) # Создаем уникальное имя файла filename = f"user_{user_id}_{secure_filename(file.filename)}" file_path = os.path.join(BACKGROUNDS_FOLDER, filename) # Сохраняем файл file.save(file_path) # Обновляем информацию в базе данных conn = get_db_connection() conn.execute('UPDATE users SET background_image = ? WHERE id = ?', (filename, user_id)) conn.commit() conn.close() # Обновляем сессию session['background'] = filename flash('Фоновое изображение успешно обновлено', 'success') return redirect(url_for('settings')) # Маршруты для работы с контактами @app.route('/contacts') @login_required def contacts(): user_id = session['user_id'] conn = get_db_connection() # Получаем список контактов пользователя contacts = conn.execute(''' SELECT c.id, c.contact_id, c.status, u.username, u.email, (SELECT COUNT(*) FROM messages WHERE sender_id = c.contact_id AND receiver_id = ? AND is_read = 0) as unread_count FROM contacts c JOIN users u ON c.contact_id = u.id WHERE c.user_id = ? AND c.status = 'accepted' ORDER BY u.username ''', (user_id, user_id)).fetchall() # Получаем входящие запросы на добавление в контакты incoming_requests = conn.execute(''' SELECT c.id, c.user_id, u.username, u.email FROM contacts c JOIN users u ON c.user_id = u.id WHERE c.contact_id = ? AND c.status = 'pending' ORDER BY c.created_at DESC ''', (user_id,)).fetchall() # Получаем исходящие запросы на добавление в контакты outgoing_requests = conn.execute(''' SELECT c.id, c.contact_id, u.username, u.email FROM contacts c JOIN users u ON c.contact_id = u.id WHERE c.user_id = ? AND c.status = 'pending' ORDER BY c.created_at DESC ''', (user_id,)).fetchall() conn.close() return render_template('contacts.html', contacts=contacts, incoming_requests=incoming_requests, outgoing_requests=outgoing_requests, now=datetime.datetime.now(), background=session.get('background', 'default.jpg')) @app.route('/add_contact', methods=['POST']) @login_required def add_contact(): user_id = session['user_id'] username_or_email = request.form['username_or_email'] conn = get_db_connection() # Ищем пользователя по имени или email user = conn.execute('SELECT id FROM users WHERE username = ? OR email = ?', (username_or_email, username_or_email)).fetchone() if not user: flash('Пользователь не найден', 'error') conn.close() return redirect(url_for('contacts')) contact_id = user['id'] # Проверяем, не пытается ли пользователь добавить сам себя if contact_id == user_id: flash('Вы не можете добавить себя в контакты', 'error') conn.close() return redirect(url_for('contacts')) # Проверяем, существует ли уже такой контакт existing = conn.execute('SELECT * FROM contacts WHERE user_id = ? AND contact_id = ?', (user_id, contact_id)).fetchone() if existing: flash('Этот пользователь уже в вашем списке контактов или запросов', 'error') conn.close() return redirect(url_for('contacts')) # Проверяем, есть ли входящий запрос от этого пользователя incoming = conn.execute('SELECT * FROM contacts WHERE user_id = ? AND contact_id = ?', (contact_id, user_id)).fetchone() if incoming: # Если есть входящий запрос, принимаем его conn.execute('UPDATE contacts SET status = "accepted" WHERE user_id = ? AND contact_id = ?', (contact_id, user_id)) # И создаем обратную связь conn.execute('INSERT INTO contacts (user_id, contact_id, status) VALUES (?, ?, "accepted")', (user_id, contact_id)) flash('Контакт успешно добавлен', 'success') else: # Иначе создаем новый запрос conn.execute('INSERT INTO contacts (user_id, contact_id, status) VALUES (?, ?, "pending")', (user_id, contact_id)) flash('Запрос на добавление в контакты отправлен', 'success') conn.commit() conn.close() return redirect(url_for('contacts')) @app.route('/accept_contact/', methods=['POST']) @login_required def accept_contact(contact_id): user_id = session['user_id'] conn = get_db_connection() # Получаем информацию о запросе contact_request = conn.execute('SELECT * FROM contacts WHERE id = ? AND contact_id = ?', (contact_id, user_id)).fetchone() if not contact_request: flash('Запрос не найден', 'error') conn.close() return redirect(url_for('contacts')) # Принимаем запрос conn.execute('UPDATE contacts SET status = "accepted" WHERE id = ?', (contact_id,)) # Создаем обратную связь requester_id = contact_request['user_id'] existing = conn.execute('SELECT * FROM contacts WHERE user_id = ? AND contact_id = ?', (user_id, requester_id)).fetchone() if not existing: conn.execute('INSERT INTO contacts (user_id, contact_id, status) VALUES (?, ?, "accepted")', (user_id, requester_id)) else: conn.execute('UPDATE contacts SET status = "accepted" WHERE user_id = ? AND contact_id = ?', (user_id, requester_id)) conn.commit() conn.close() flash('Контакт успешно добавлен', 'success') return redirect(url_for('contacts')) @app.route('/reject_contact/', methods=['POST']) @login_required def reject_contact(contact_id): user_id = session['user_id'] conn = get_db_connection() # Удаляем запрос conn.execute('DELETE FROM contacts WHERE id = ? AND contact_id = ?', (contact_id, user_id)) conn.commit() conn.close() flash('Запрос отклонен', 'info') return redirect(url_for('contacts')) @app.route('/remove_contact/', methods=['POST']) @login_required def remove_contact(contact_id): user_id = session['user_id'] conn = get_db_connection() # Получаем ID контакта contact = conn.execute('SELECT contact_id FROM contacts WHERE id = ? AND user_id = ?', (contact_id, user_id)).fetchone() if not contact: flash('Контакт не найден', 'error') conn.close() return redirect(url_for('contacts')) other_user_id = contact['contact_id'] # Удаляем связь в обе стороны conn.execute('DELETE FROM contacts WHERE user_id = ? AND contact_id = ?', (user_id, other_user_id)) conn.execute('DELETE FROM contacts WHERE user_id = ? AND contact_id = ?', (other_user_id, user_id)) conn.commit() conn.close() flash('Контакт удален', 'info') return redirect(url_for('contacts')) # Маршруты для работы с сообщениями @app.route('/messages/') @login_required def messages(contact_id): user_id = session['user_id'] conn = get_db_connection() # Получаем список контактов для боковой панели contacts = conn.execute(''' SELECT c.id, c.contact_id, u.username, (SELECT COUNT(*) FROM messages WHERE sender_id = c.contact_id AND receiver_id = ? AND is_read = 0) as unread_count FROM contacts c JOIN users u ON c.contact_id = u.id WHERE c.user_id = ? AND c.status = 'accepted' ORDER BY u.username ''', (user_id, user_id)).fetchall() # Если contact_id = 0, это значит, что пользователь перешел из меню # В этом случае перенаправляем на первый контакт или показываем пустую страницу if contact_id == 0: # Если есть контакты, перенаправляем на первый if contacts: conn.close() return redirect(url_for('messages', contact_id=contacts[0]['contact_id'])) # Иначе показываем пустую страницу сообщений conn.close() return render_template('messages.html', contact=None, messages=[], contacts=contacts, user_id=user_id, now=datetime.datetime.now(), background=session.get('background', 'default.jpg')) # Проверяем, есть ли этот пользователь в контактах contact = conn.execute(''' SELECT c.id, c.contact_id, u.username, u.email FROM contacts c JOIN users u ON c.contact_id = u.id WHERE c.user_id = ? AND c.contact_id = ? AND c.status = 'accepted' ''', (user_id, contact_id)).fetchone() if not contact: flash('Контакт не найден или не подтвержден', 'error') conn.close() return redirect(url_for('contacts')) # Получаем историю сообщений messages = conn.execute(''' SELECT * FROM messages WHERE (sender_id = ? AND receiver_id = ?) OR (sender_id = ? AND receiver_id = ?) ORDER BY created_at ''', (user_id, contact_id, contact_id, user_id)).fetchall() # Отмечаем все сообщения как прочитанные conn.execute('UPDATE messages SET is_read = 1 WHERE sender_id = ? AND receiver_id = ?', (contact_id, user_id)) conn.commit() conn.close() return render_template('messages.html', contact=contact, messages=messages, contacts=contacts, user_id=user_id, now=datetime.datetime.now(), background=session.get('background', 'default.jpg')) @app.route('/send_message/', methods=['POST']) @login_required def send_message(contact_id): user_id = session['user_id'] content = request.form['content'] if not content or content.strip() == '': flash('Сообщение не может быть пустым', 'error') return redirect(url_for('messages', contact_id=contact_id)) conn = get_db_connection() # Проверяем, есть ли этот пользователь в контактах contact = conn.execute(''' SELECT * FROM contacts WHERE user_id = ? AND contact_id = ? AND status = 'accepted' ''', (user_id, contact_id)).fetchone() if not contact: flash('Контакт не найден или не подтвержден', 'error') conn.close() return redirect(url_for('contacts')) # Обработка прикрепленного файла attachment_id = None if 'attachment' in request.files and request.files['attachment'].filename != '': file = request.files['attachment'] filename = secure_filename(file.filename) # Создаем папку для вложений, если она не существует attachments_folder = os.path.join(UPLOAD_FOLDER, str(user_id), 'attachments') if not os.path.exists(attachments_folder): os.makedirs(attachments_folder) # Сохраняем файл file_path = os.path.join(attachments_folder, filename) file.save(file_path) # Получаем размер файла file_size = os.path.getsize(file_path) # Определяем тип файла file_type = get_file_type(filename) # Сохраняем информацию о файле в базу данных cursor = conn.cursor() cursor.execute( 'INSERT INTO files (filename, original_filename, path, size, user_id, parent_folder, file_type) VALUES (?, ?, ?, ?, ?, ?, ?)', (filename, file.filename, file_path, file_size, user_id, '/attachments/', file_type) ) conn.commit() # Получаем ID созданного файла attachment_id = cursor.lastrowid # Сохраняем сообщение с прикрепленным файлом (если есть) conn.execute('INSERT INTO messages (sender_id, receiver_id, content, attachment_id) VALUES (?, ?, ?, ?)', (user_id, contact_id, content, attachment_id)) conn.commit() conn.close() return redirect(url_for('messages', contact_id=contact_id)) @app.route('/get_unread_count') @login_required def get_unread_count(): user_id = session['user_id'] conn = get_db_connection() # Получаем общее количество непрочитанных сообщений unread_count = conn.execute(''' SELECT COUNT(*) as count FROM messages WHERE receiver_id = ? AND is_read = 0 ''', (user_id,)).fetchone()['count'] conn.close() return jsonify({'unread_count': unread_count}) @app.route('/get_new_messages/') @login_required def get_new_messages(contact_id): user_id = session['user_id'] last_id = request.args.get('last_id', 0, type=int) conn = get_db_connection() # Проверяем, есть ли этот пользователь в контактах contact = conn.execute(''' SELECT * FROM contacts WHERE user_id = ? AND contact_id = ? AND status = 'accepted' ''', (user_id, contact_id)).fetchone() if not contact: conn.close() return jsonify({'error': 'Контакт не найден или не подтвержден', 'messages': []}) # Получаем новые сообщения после указанного ID messages = conn.execute(''' SELECT * FROM messages WHERE ((sender_id = ? AND receiver_id = ?) OR (sender_id = ? AND receiver_id = ?)) AND id > ? ORDER BY created_at ''', (user_id, contact_id, contact_id, user_id, last_id)).fetchall() # Отмечаем все сообщения как прочитанные conn.execute('UPDATE messages SET is_read = 1 WHERE sender_id = ? AND receiver_id = ?', (contact_id, user_id)) conn.commit() # Преобразуем сообщения в словари и добавляем информацию о вложениях message_list = [] for message in messages: message_dict = dict(message) # Если у сообщения есть вложение, добавляем информацию о нем if message['attachment_id']: attachment = conn.execute('SELECT * FROM files WHERE id = ?', (message['attachment_id'],)).fetchone() if attachment: attachment_dict = dict(attachment) attachment_dict['size_formatted'] = format_size(attachment['size']) message_dict['attachment'] = attachment_dict message_list.append(message_dict) conn.close() return jsonify({ 'messages': message_list, 'user_id': user_id }) @app.route('/search_users') @login_required def search_users(): user_id = session['user_id'] query = request.args.get('q', '') if len(query) < 3: return jsonify([]) conn = get_db_connection() # Ищем пользователей по имени или email users = conn.execute(''' SELECT id, username, email FROM users WHERE (username LIKE ? OR email LIKE ?) AND id != ? LIMIT 10 ''', (f'%{query}%', f'%{query}%', user_id)).fetchall() # Проверяем, есть ли уже контакты с найденными пользователями result = [] for user in users: contact = conn.execute(''' SELECT status FROM contacts WHERE user_id = ? AND contact_id = ? ''', (user_id, user['id'])).fetchone() status = contact['status'] if contact else None result.append({ 'id': user['id'], 'username': user['username'], 'email': user['email'], 'status': status }) conn.close() return jsonify(result) @app.route('/view_shared/') @login_required def view_shared_file(file_id): user_id = session['user_id'] conn = get_db_connection() # Проверка прав доступа share = conn.execute( 'SELECT s.*, f.* FROM shares s JOIN files f ON s.file_id = f.id WHERE s.file_id = ? AND s.shared_with = ?', (file_id, user_id) ).fetchone() if not share: conn.close() flash('Файл не найден или у вас нет прав для его просмотра', 'error') return redirect(url_for('shared_with_me')) # Если это папка, показываем ее содержимое if share['is_folder']: # Получение файлов в общей папке folder_path = share['filename'] shared_files = conn.execute( '''SELECT f.*, s.permission FROM files f JOIN shares s ON f.id = s.file_id WHERE f.user_id = ? AND f.parent_folder = ? AND s.shared_with = ? ORDER BY f.is_folder DESC, f.filename''', (share['user_id'], folder_path, user_id) ).fetchall() conn.close() return render_template('view_shared_folder.html', folder=share, files=shared_files, now=datetime.datetime.now(), background=session.get('background', 'default.jpg')) else: # Если это файл, скачиваем его conn.close() directory = os.path.dirname(share['path']) return send_from_directory(directory, os.path.basename(share['path']), as_attachment=True, download_name=share['original_filename']) @app.route('/delete_share/', methods=['POST']) @login_required def delete_share(share_id): user_id = session['user_id'] conn = get_db_connection() # Получаем информацию о записи общего доступа share = conn.execute('SELECT * FROM shares WHERE id = ? AND user_id = ?', (share_id, user_id)).fetchone() if not share: conn.close() flash('Запись не найдена или у вас нет прав для ее удаления', 'error') return redirect(url_for('dashboard')) # Получаем информацию о файле для перенаправления file_id = share['file_id'] # Удаляем запись об общем доступе conn.execute('DELETE FROM shares WHERE id = ?', (share_id,)) conn.commit() conn.close() flash('Общий доступ успешно отменен', 'success') return redirect(url_for('share_file', file_id=file_id)) @app.route('/search') @login_required def search(): user_id = session['user_id'] query = request.args.get('query', '') if not query: return redirect(url_for('dashboard')) conn = get_db_connection() # Поиск по имени файла files = conn.execute( '''SELECT * FROM files WHERE user_id = ? AND (filename LIKE ? OR original_filename LIKE ?) ORDER BY is_folder DESC, filename''', (user_id, f'%{query}%', f'%{query}%') ).fetchall() # Поиск в общих файлах shared_files = conn.execute( '''SELECT f.*, s.permission, u.username as owner_username FROM files f JOIN shares s ON f.id = s.file_id JOIN users u ON f.user_id = u.id WHERE s.shared_with = ? AND (f.filename LIKE ? OR f.original_filename LIKE ?) ORDER BY f.is_folder DESC, f.filename''', (user_id, f'%{query}%', f'%{query}%') ).fetchall() conn.close() return render_template('search_results.html', files=files, shared_files=shared_files, query=query, now=datetime.datetime.now(), background=session.get('background', 'default.jpg')) # Запуск приложения if __name__ == '__main__': app.run(host='0.0.0.0', debug=True)