Spaces:
Sleeping
Sleeping
import os | |
import mimetypes | |
from datetime import datetime | |
from flask import ( | |
render_template, request, jsonify, | |
session, redirect, url_for, send_file | |
) | |
from werkzeug.utils import secure_filename | |
from app import app, db | |
from models import User, Conversation, ConversationParticipant, Message, MessageSeen | |
# Utilities | |
def allowed_file(filename): | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS'] | |
def get_file_icon(file_type): | |
"""Return appropriate Font Awesome icon for file type""" | |
if file_type.startswith('image/'): | |
return 'fas fa-image' | |
elif file_type.startswith('audio/'): | |
return 'fas fa-music' | |
elif file_type.startswith('video/'): | |
return 'fas fa-video' | |
elif 'pdf' in file_type: | |
return 'fas fa-file-pdf' | |
elif any(word in file_type for word in ['word', 'document']): | |
return 'fas fa-file-word' | |
elif any(word in file_type for word in ['excel', 'sheet']): | |
return 'fas fa-file-excel' | |
elif any(word in file_type for word in ['powerpoint', 'presentation']): | |
return 'fas fa-file-powerpoint' | |
elif 'zip' in file_type or 'archive' in file_type: | |
return 'fas fa-file-archive' | |
elif 'apk' in file_type: | |
return 'fab fa-android' | |
else: | |
return 'fas fa-file' | |
def format_file_size(size_bytes): | |
"""Convert bytes to human readable format""" | |
if size_bytes == 0: | |
return "0B" | |
size_names = ["B", "KB", "MB", "GB", "TB"] | |
i = 0 | |
while size_bytes >= 1024.0 and i < len(size_names) - 1: | |
size_bytes /= 1024.0 | |
i += 1 | |
return f"{size_bytes:.1f}{size_names[i]}" | |
# Web Routes | |
def landing(): | |
if session.get('user_id'): | |
return redirect(url_for('chat')) | |
return render_template('landing.html') | |
def register(): | |
if session.get('user_id'): | |
return redirect(url_for('chat')) | |
return render_template('register.html') | |
def chat(): | |
user_id = session.get('user_id') | |
if not user_id: | |
return redirect(url_for('landing')) | |
user = User.query.get(user_id) | |
if not user: | |
session.clear() | |
return redirect(url_for('landing')) | |
user.online = True | |
user.last_seen = datetime.utcnow() | |
db.session.commit() | |
return render_template('chat.html', user=user) | |
def settings(): | |
user_id = session.get('user_id') | |
if not user_id: | |
return redirect(url_for('landing')) | |
user = User.query.get(user_id) | |
if not user: | |
session.clear() | |
return redirect(url_for('landing')) | |
return render_template('settings.html', user=user) | |
def logout(): | |
user_id = session.get('user_id') | |
if user_id: | |
user = User.query.get(user_id) | |
if user: | |
user.online = False | |
user.last_seen = datetime.utcnow() | |
db.session.commit() | |
session.clear() | |
return redirect(url_for('landing')) | |
# API Routes | |
def api_register(): | |
try: | |
data = request.get_json() or {} | |
name = data.get('name', '').strip() | |
email = data.get('email', '').strip().lower() | |
if not name or not email: | |
return jsonify(success=False, message='Name and email are required') | |
if User.query.filter_by(email=email).first(): | |
return jsonify(success=False, message='Email already registered') | |
user = User(name=name, email=email) | |
db.session.add(user) | |
db.session.commit() | |
session['user_id'] = user.user_id | |
app.logger.debug(f"Session after register: {dict(session)}") | |
return jsonify(success=True, message='Account created successfully') | |
except Exception as e: | |
app.logger.error(f"Registration error: {e}") | |
return jsonify(success=False, message='Registration failed') | |
def api_conversations(): | |
if 'user_id' not in session: | |
return jsonify(success=False, message='Not authenticated'), 401 | |
user_id = session['user_id'] | |
try: | |
convs = Conversation.query.join(ConversationParticipant).filter( | |
ConversationParticipant.user_id == user_id | |
).all() | |
result = [] | |
for conv in convs: | |
participants = [ | |
p for p in | |
User.query.join(ConversationParticipant).filter( | |
ConversationParticipant.conversation_id == conv.id | |
).all() | |
] | |
last_msg = Message.query.filter_by(conversation_id=conv.id)\ | |
.order_by(Message.timestamp.desc()).first() | |
last_message = None | |
if last_msg: | |
sender = User.query.get(last_msg.sender_id) | |
summary = last_msg.content or f"📎 {last_msg.file_name}" | |
last_message = { | |
'content': summary, 'timestamp': last_msg.timestamp.isoformat(), | |
'sender_name': sender.name if sender else 'Unknown' | |
} | |
conv_name = conv.name | |
online = False | |
if conv.type == 'private': | |
other = next((p for p in participants if p.user_id != user_id), None) | |
if other: | |
conv_name = other.name | |
online = other.online | |
else: | |
online = any(p.online for p in participants if p.user_id != user_id) | |
result.append({ | |
'id': conv.id, 'name': conv_name, 'type': conv.type, | |
'online': online, | |
'participants': [{'id': p.user_id, 'name': p.name, 'online': p.online} | |
for p in participants], | |
'last_message': last_message | |
}) | |
result.sort(key=lambda c: c['last_message']['timestamp'] if c['last_message'] else '1970-01-01', reverse=True) | |
return jsonify(success=True, conversations=result) | |
except Exception as e: | |
app.logger.error(f"Error loading conversations: {e}") | |
return jsonify(success=False, message='Failed to load conversations'), 500 | |
def api_messages(conversation_id): | |
if 'user_id' not in session: | |
return jsonify(success=False, message='Not authenticated'), 401 | |
user_id = session['user_id'] | |
try: | |
if not ConversationParticipant.query.filter_by(conversation_id=conversation_id, user_id=user_id).first(): | |
return jsonify(success=False, message='Access denied'), 403 | |
msgs = Message.query.filter_by(conversation_id=conversation_id).order_by(Message.timestamp).all() | |
result = [] | |
for m in msgs: | |
seen = [s.user_id for s in MessageSeen.query.filter_by(message_id=m.id).all()] | |
sender = User.query.get(m.sender_id) | |
md = { | |
'id': m.id, | |
'sender_id': m.sender_id, | |
'sender_name': sender.name if sender else 'Unknown', | |
'content': m.content, | |
'message_type': m.message_type, | |
'timestamp': m.timestamp.isoformat(), | |
'seen_by': seen | |
} | |
if m.message_type in ('file', 'image', 'audio'): | |
md.update({ | |
'file_name': m.file_name, | |
'file_size': m.file_size, | |
'file_type': m.file_type, | |
'file_size_formatted': format_file_size(m.file_size), | |
'file_icon': get_file_icon(m.file_type or ''), | |
'audio_duration': m.audio_duration | |
}) | |
result.append(md) | |
return jsonify(success=True, messages=result) | |
except Exception as e: | |
app.logger.error(f"Error loading messages: {e}") | |
return jsonify(success=False, message='Failed to load messages'), 500 | |
# Continue similarly for send_message, upload_file, upload_audio, download... | |
# Just ensure: | |
# - Session check at route start | |
# - session.clear() when user missing or invalid | |
# - Appropriate HTTP status codes | |
# - Logging session and debugging output | |
def api_send_message(): | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}) | |
user_id = session['user_id'] | |
try: | |
data = request.get_json() | |
conversation_id = data.get('conversation_id') | |
content = data.get('content', '').strip() | |
if not conversation_id or not content: | |
return jsonify({'success': False, 'message': 'Conversation ID and content are required'}) | |
# Verify user is participant | |
participant = ConversationParticipant.query.filter_by( | |
conversation_id=conversation_id, | |
user_id=user_id | |
).first() | |
if not participant: | |
return jsonify({'success': False, 'message': 'Access denied'}) | |
# Create message | |
message = Message( | |
conversation_id=conversation_id, | |
sender_id=user_id, | |
content=content, | |
message_type='text' | |
) | |
db.session.add(message) | |
db.session.commit() | |
# Get sender info | |
sender = User.query.get(user_id) | |
# Return complete message data for instant display | |
message_data = { | |
'id': message.id, | |
'sender_id': message.sender_id, | |
'sender_name': sender.name if sender else 'Unknown', | |
'content': message.content, | |
'message_type': message.message_type, | |
'timestamp': message.timestamp.isoformat(), | |
'seen_by': [] # Initially empty | |
} | |
return jsonify({'success': True, 'message': message_data}) | |
except Exception as e: | |
app.logger.error(f"Error sending message: {e}") | |
return jsonify({'success': False, 'message': 'Failed to send message'}) | |
def api_upload_file(): | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}) | |
user_id = session['user_id'] | |
try: | |
if 'file' not in request.files: | |
return jsonify({'success': False, 'message': 'No file uploaded'}) | |
file = request.files['file'] | |
conversation_id = request.form.get('conversation_id') | |
if not conversation_id: | |
return jsonify({'success': False, 'message': 'Conversation ID is required'}) | |
if file.filename == '': | |
return jsonify({'success': False, 'message': 'No file selected'}) | |
# Verify user is participant | |
participant = ConversationParticipant.query.filter_by( | |
conversation_id=conversation_id, | |
user_id=user_id | |
).first() | |
if not participant: | |
return jsonify({'success': False, 'message': 'Access denied'}) | |
if file and file.filename and allowed_file(file.filename): | |
filename = secure_filename(file.filename) or 'unnamed_file' | |
# Create unique filename to avoid conflicts | |
base_name, ext = os.path.splitext(filename) | |
unique_filename = f"{base_name}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}{ext}" | |
file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename) | |
file.save(file_path) | |
# Get file info | |
file_size = os.path.getsize(file_path) | |
file_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream' | |
# Determine message type based on file type | |
message_type = 'file' | |
if file_type.startswith('image/'): | |
message_type = 'image' | |
elif file_type.startswith('audio/'): | |
message_type = 'audio' | |
# Create message | |
message = Message( | |
conversation_id=conversation_id, | |
sender_id=user_id, | |
content=f"📎 {filename}", | |
message_type=message_type, | |
file_path=unique_filename, | |
file_name=filename, | |
file_size=file_size, | |
file_type=file_type | |
) | |
db.session.add(message) | |
db.session.commit() | |
return jsonify({'success': True, 'message': 'File uploaded successfully'}) | |
else: | |
return jsonify({'success': False, 'message': 'File type not allowed'}) | |
except Exception as e: | |
app.logger.error(f"Error uploading file: {e}") | |
return jsonify({'success': False, 'message': 'Failed to upload file'}) | |
def api_upload_audio(): | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}) | |
user_id = session['user_id'] | |
try: | |
if 'audio' not in request.files: | |
return jsonify({'success': False, 'message': 'No audio file uploaded'}) | |
audio_file = request.files['audio'] | |
conversation_id = request.form.get('conversation_id') | |
duration = float(request.form.get('duration', 0)) | |
if not conversation_id: | |
return jsonify({'success': False, 'message': 'Conversation ID is required'}) | |
# Verify user is participant | |
participant = ConversationParticipant.query.filter_by( | |
conversation_id=conversation_id, | |
user_id=user_id | |
).first() | |
if not participant: | |
return jsonify({'success': False, 'message': 'Access denied'}) | |
# Create unique filename | |
filename = f"voice_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.webm" | |
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
audio_file.save(file_path) | |
# Get file info | |
file_size = os.path.getsize(file_path) | |
# Create message | |
message = Message( | |
conversation_id=conversation_id, | |
sender_id=user_id, | |
content="🎵 Voice message", | |
message_type='audio', | |
file_path=filename, | |
file_name="Voice message", | |
file_size=file_size, | |
file_type='audio/webm', | |
audio_duration=duration | |
) | |
db.session.add(message) | |
db.session.commit() | |
return jsonify({'success': True, 'message': 'Voice message sent'}) | |
except Exception as e: | |
app.logger.error(f"Error uploading audio: {e}") | |
return jsonify({'success': False, 'message': 'Failed to send voice message'}) | |
def api_download(message_id): | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}), 401 | |
user_id = session['user_id'] | |
try: | |
# Get message | |
message = Message.query.get(message_id) | |
if not message or not message.file_path: | |
return jsonify({'success': False, 'message': 'File not found'}), 404 | |
# Verify user has access to this conversation | |
participant = ConversationParticipant.query.filter_by( | |
conversation_id=message.conversation_id, | |
user_id=user_id | |
).first() | |
if not participant: | |
return jsonify({'success': False, 'message': 'Access denied'}), 403 | |
file_path = os.path.join(app.config['UPLOAD_FOLDER'], message.file_path) | |
if not os.path.exists(file_path): | |
return jsonify({'success': False, 'message': 'File not found on server'}), 404 | |
return send_file( | |
file_path, | |
as_attachment=True, | |
download_name=message.file_name, | |
mimetype=message.file_type | |
) | |
except Exception as e: | |
app.logger.error(f"Error downloading file: {e}") | |
return jsonify({'success': False, 'message': 'Download failed'}), 500 | |
def api_find_user(): | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}) | |
try: | |
data = request.get_json() | |
unique_id = data.get('unique_id', '').strip().upper() | |
if not unique_id: | |
return jsonify({'success': False, 'message': 'User ID is required'}) | |
user = User.query.filter_by(unique_id=unique_id).first() | |
if not user: | |
return jsonify({'success': False, 'message': 'User not found'}) | |
if user.user_id == session['user_id']: | |
return jsonify({'success': False, 'message': 'Cannot start chat with yourself'}) | |
return jsonify({ | |
'success': True, | |
'user': { | |
'user_id': user.user_id, | |
'name': user.name, | |
'unique_id': user.unique_id, | |
'online': user.online | |
} | |
}) | |
except Exception as e: | |
app.logger.error(f"Error finding user: {e}") | |
return jsonify({'success': False, 'message': 'Search failed'}) | |
def api_start_private_chat(): | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}) | |
user_id = session['user_id'] | |
try: | |
data = request.get_json() | |
other_user_id = data.get('user_id') | |
if not other_user_id: | |
return jsonify({'success': False, 'message': 'User ID is required'}) | |
if other_user_id == user_id: | |
return jsonify({'success': False, 'message': 'Cannot start chat with yourself'}) | |
# Check if conversation already exists | |
existing_conv = db.session.query(Conversation).join( | |
ConversationParticipant, Conversation.id == ConversationParticipant.conversation_id | |
).filter( | |
Conversation.type == 'private', | |
ConversationParticipant.user_id.in_([user_id, other_user_id]) | |
).group_by(Conversation.id).having( | |
db.func.count(ConversationParticipant.user_id) == 2 | |
).first() | |
if existing_conv: | |
# Check if both users are participants | |
participants = ConversationParticipant.query.filter_by(conversation_id=existing_conv.id).all() | |
participant_ids = [p.user_id for p in participants] | |
if set(participant_ids) == {user_id, other_user_id}: | |
return jsonify({'success': True, 'conversation_id': existing_conv.id}) | |
# Create new conversation | |
other_user = User.query.get(other_user_id) | |
if not other_user: | |
return jsonify({'success': False, 'message': 'User not found'}) | |
conversation = Conversation( | |
name=f"Private chat with {other_user.name}", | |
type='private', | |
created_by=user_id | |
) | |
db.session.add(conversation) | |
db.session.flush() # Get the ID | |
# Add participants | |
participant1 = ConversationParticipant(conversation_id=conversation.id, user_id=user_id) | |
participant2 = ConversationParticipant(conversation_id=conversation.id, user_id=other_user_id) | |
db.session.add(participant1) | |
db.session.add(participant2) | |
db.session.commit() | |
return jsonify({'success': True, 'conversation_id': conversation.id}) | |
except Exception as e: | |
app.logger.error(f"Error starting private chat: {e}") | |
return jsonify({'success': False, 'message': 'Failed to start chat'}) | |
def api_create_group(): | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}) | |
user_id = session['user_id'] | |
try: | |
data = request.get_json() | |
group_name = data.get('name', '').strip() | |
member_ids = data.get('members', []) | |
if not group_name: | |
return jsonify({'success': False, 'message': 'Group name is required'}) | |
if len(member_ids) < 1 or len(member_ids) > 9: | |
return jsonify({'success': False, 'message': 'Group must have 2-10 members (including you)'}) | |
# Verify all members exist | |
members = User.query.filter(User.unique_id.in_(member_ids)).all() | |
if len(members) != len(member_ids): | |
return jsonify({'success': False, 'message': 'Some members not found'}) | |
# Create conversation | |
conversation = Conversation( | |
name=group_name, | |
type='group', | |
created_by=user_id | |
) | |
db.session.add(conversation) | |
db.session.flush() # Get the ID | |
# Add creator as participant | |
creator_participant = ConversationParticipant(conversation_id=conversation.id, user_id=user_id) | |
db.session.add(creator_participant) | |
# Add other participants | |
for member in members: | |
if member.user_id != user_id: # Don't add creator twice | |
participant = ConversationParticipant(conversation_id=conversation.id, user_id=member.user_id) | |
db.session.add(participant) | |
db.session.commit() | |
return jsonify({'success': True, 'conversation_id': conversation.id}) | |
except Exception as e: | |
app.logger.error(f"Error creating group: {e}") | |
return jsonify({'success': False, 'message': 'Failed to create group'}) | |
def api_update_status(): | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}) | |
try: | |
user = User.query.get(session['user_id']) | |
if user: | |
data = request.get_json() | |
user.online = data.get('online', True) | |
user.last_seen = datetime.utcnow() | |
db.session.commit() | |
return jsonify({'success': True}) | |
except Exception as e: | |
app.logger.error(f"Error updating status: {e}") | |
return jsonify({'success': False, 'message': 'Failed to update status'}) | |
# Double Blue Tick System - Mark messages as seen | |
def api_mark_seen(): | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}) | |
user_id = session['user_id'] | |
try: | |
data = request.get_json() | |
message_ids = data.get('message_ids', []) | |
if not message_ids: | |
return jsonify({'success': False, 'message': 'Message IDs required'}) | |
# Mark messages as seen for this user | |
for message_id in message_ids: | |
# Check if already seen | |
existing_seen = MessageSeen.query.filter_by( | |
message_id=message_id, | |
user_id=user_id | |
).first() | |
if not existing_seen: | |
message_seen = MessageSeen( | |
message_id=message_id, | |
user_id=user_id | |
) | |
db.session.add(message_seen) | |
db.session.commit() | |
return jsonify({'success': True}) | |
except Exception as e: | |
app.logger.error(f"Error marking messages as seen: {e}") | |
return jsonify({'success': False, 'message': 'Failed to mark messages as seen'}) | |
# Get message seen status for double blue tick display | |
def api_message_status(message_id): | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}) | |
try: | |
# Get message | |
message = Message.query.get(message_id) | |
if not message: | |
return jsonify({'success': False, 'message': 'Message not found'}) | |
# Check if user has access to this conversation | |
participant = ConversationParticipant.query.filter_by( | |
conversation_id=message.conversation_id, | |
user_id=session['user_id'] | |
).first() | |
if not participant: | |
return jsonify({'success': False, 'message': 'Access denied'}) | |
# Get all participants in the conversation except sender | |
conversation_participants = ConversationParticipant.query.filter( | |
ConversationParticipant.conversation_id == message.conversation_id, | |
ConversationParticipant.user_id != message.sender_id | |
).all() | |
# Count how many have seen the message | |
seen_count = MessageSeen.query.filter_by(message_id=message_id).count() | |
total_recipients = len(conversation_participants) | |
# Determine status: sent (1 tick), delivered (2 gray ticks), seen (2 blue ticks) | |
if seen_count == 0: | |
status = 'delivered' # 2 gray ticks | |
elif seen_count == total_recipients: | |
status = 'seen' # 2 blue ticks | |
else: | |
status = 'partially_seen' # 2 blue ticks | |
return jsonify({ | |
'success': True, | |
'status': status, | |
'seen_count': seen_count, | |
'total_recipients': total_recipients | |
}) | |
except Exception as e: | |
app.logger.error(f"Error getting message status: {e}") | |
return jsonify({'success': False, 'message': 'Failed to get message status'}) | |
# Image preview endpoint for WhatsApp-like image viewing | |
def api_view_image(message_id): | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}), 401 | |
user_id = session['user_id'] | |
try: | |
# Get message | |
message = Message.query.get(message_id) | |
if not message or message.message_type != 'image': | |
return jsonify({'success': False, 'message': 'Image not found'}), 404 | |
# Verify user has access to this conversation | |
participant = ConversationParticipant.query.filter_by( | |
conversation_id=message.conversation_id, | |
user_id=user_id | |
).first() | |
if not participant: | |
return jsonify({'success': False, 'message': 'Access denied'}), 403 | |
# Return image file | |
file_path = os.path.join(app.config['UPLOAD_FOLDER'], message.file_path) | |
if os.path.exists(file_path): | |
return send_file(file_path) | |
else: | |
return jsonify({'success': False, 'message': 'File not found on server'}), 404 | |
except Exception as e: | |
app.logger.error(f"Error viewing image: {e}") | |
return jsonify({'success': False, 'message': 'Failed to load image'}), 500 | |
def debug(): | |
return str(session) | |