Spaces:
Sleeping
Sleeping
import os | |
import uuid | |
import json | |
from datetime import datetime | |
from flask import Flask, render_template, request, jsonify, session, redirect, url_for | |
from flask_cors import CORS | |
import logging | |
# Configure logging | |
logging.basicConfig(level=logging.DEBUG) | |
app = Flask(__name__) | |
app.secret_key = os.environ.get("SESSION_SECRET", "whatsapp-clone-secret-key") | |
CORS(app) | |
# In-memory storage | |
users = {} # user_id: {name, email, unique_id, online, last_seen} | |
conversations = {} # conversation_id: {type: 'private'/'group', participants: [], messages: []} | |
user_conversations = {} # user_id: [conversation_ids] | |
def generate_unique_id(): | |
"""Generate a unique 8-character ID for users""" | |
return str(uuid.uuid4())[:8].upper() | |
def get_current_user(): | |
"""Get current user from session""" | |
user_id = session.get('user_id') | |
return users.get(user_id) if user_id else None | |
def landing(): | |
"""Landing page""" | |
if 'user_id' in session and session['user_id'] in users: | |
return redirect(url_for('chat')) | |
return render_template('landing.html') | |
def register_page(): | |
"""Registration page""" | |
return render_template('register.html') | |
def register(): | |
"""Register a new user""" | |
try: | |
data = request.get_json() | |
name = data.get('name', '').strip() | |
email = data.get('email', '').strip() | |
if not name or not email: | |
return jsonify({'success': False, 'message': 'Name and email are required'}) | |
# Check if email already exists | |
for user_id, user_data in users.items(): | |
if user_data['email'] == email: | |
return jsonify({'success': False, 'message': 'Email already registered'}) | |
# Create new user | |
user_id = str(uuid.uuid4()) | |
unique_id = generate_unique_id() | |
users[user_id] = { | |
'name': name, | |
'email': email, | |
'unique_id': unique_id, | |
'online': True, | |
'last_seen': datetime.now().isoformat(), | |
'user_id': user_id | |
} | |
user_conversations[user_id] = [] | |
# Set session | |
session['user_id'] = user_id | |
return jsonify({ | |
'success': True, | |
'user': { | |
'user_id': user_id, | |
'name': name, | |
'email': email, | |
'unique_id': unique_id | |
} | |
}) | |
except Exception as e: | |
logging.error(f"Registration error: {e}") | |
return jsonify({'success': False, 'message': 'Registration failed'}) | |
def chat(): | |
"""Main chat interface""" | |
if 'user_id' not in session or session['user_id'] not in users: | |
return redirect(url_for('landing')) | |
user = users[session['user_id']] | |
return render_template('chat.html', user=user) | |
def settings(): | |
"""Settings page""" | |
if 'user_id' not in session or session['user_id'] not in users: | |
return redirect(url_for('landing')) | |
user = users[session['user_id']] | |
return render_template('settings.html', user=user) | |
def find_user(): | |
"""Find user by unique ID""" | |
try: | |
data = request.get_json() | |
unique_id = data.get('unique_id', '').strip().upper() | |
if not unique_id: | |
return jsonify({'success': False, 'message': 'Unique ID is required'}) | |
# Find user by unique_id | |
for user_id, user_data in users.items(): | |
if user_data['unique_id'] == unique_id: | |
return jsonify({ | |
'success': True, | |
'user': { | |
'user_id': user_id, | |
'name': user_data['name'], | |
'unique_id': user_data['unique_id'], | |
'online': user_data['online'] | |
} | |
}) | |
return jsonify({'success': False, 'message': 'User not found'}) | |
except Exception as e: | |
logging.error(f"Find user error: {e}") | |
return jsonify({'success': False, 'message': 'Search failed'}) | |
def start_conversation(): | |
"""Start a private conversation or create a group""" | |
try: | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}) | |
current_user_id = session['user_id'] | |
data = request.get_json() | |
conversation_type = data.get('type') # 'private' or 'group' | |
if conversation_type == 'private': | |
target_user_id = data.get('target_user_id') | |
if not target_user_id or target_user_id not in users: | |
return jsonify({'success': False, 'message': 'Target user not found'}) | |
# Check if conversation already exists | |
for conv_id, conv_data in conversations.items(): | |
if (conv_data['type'] == 'private' and | |
set(conv_data['participants']) == {current_user_id, target_user_id}): | |
return jsonify({'success': True, 'conversation_id': conv_id}) | |
# Create new private conversation | |
conv_id = str(uuid.uuid4()) | |
conversations[conv_id] = { | |
'type': 'private', | |
'participants': [current_user_id, target_user_id], | |
'messages': [], | |
'created_at': datetime.now().isoformat() | |
} | |
# Add to user conversations | |
if current_user_id not in user_conversations: | |
user_conversations[current_user_id] = [] | |
if target_user_id not in user_conversations: | |
user_conversations[target_user_id] = [] | |
user_conversations[current_user_id].append(conv_id) | |
user_conversations[target_user_id].append(conv_id) | |
return jsonify({'success': True, 'conversation_id': conv_id}) | |
elif conversation_type == 'group': | |
group_name = data.get('group_name', '').strip() | |
participant_ids = data.get('participant_ids', []) | |
if not group_name: | |
return jsonify({'success': False, 'message': 'Group name is required'}) | |
if len(participant_ids) < 2 or len(participant_ids) > 9: | |
return jsonify({'success': False, 'message': 'Groups must have 3-10 members (including you)'}) | |
# Add current user to participants | |
all_participants = list(set([current_user_id] + participant_ids)) | |
# Validate all participants exist | |
for pid in all_participants: | |
if pid not in users: | |
return jsonify({'success': False, 'message': f'User {pid} not found'}) | |
# Create group conversation | |
conv_id = str(uuid.uuid4()) | |
conversations[conv_id] = { | |
'type': 'group', | |
'name': group_name, | |
'participants': all_participants, | |
'messages': [], | |
'created_at': datetime.now().isoformat(), | |
'created_by': current_user_id | |
} | |
# Add to all participants' conversations | |
for pid in all_participants: | |
if pid not in user_conversations: | |
user_conversations[pid] = [] | |
user_conversations[pid].append(conv_id) | |
return jsonify({'success': True, 'conversation_id': conv_id}) | |
return jsonify({'success': False, 'message': 'Invalid conversation type'}) | |
except Exception as e: | |
logging.error(f"Start conversation error: {e}") | |
return jsonify({'success': False, 'message': 'Failed to start conversation'}) | |
def get_conversations(): | |
"""Get user's conversations""" | |
try: | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}) | |
current_user_id = session['user_id'] | |
user_convs = user_conversations.get(current_user_id, []) | |
result = [] | |
for conv_id in user_convs: | |
if conv_id in conversations: | |
conv = conversations[conv_id] | |
# Get conversation info | |
conv_info = { | |
'id': conv_id, | |
'type': conv['type'], | |
'participants': [] | |
} | |
if conv['type'] == 'private': | |
# For private chat, get the other user's info | |
other_user_id = next(pid for pid in conv['participants'] if pid != current_user_id) | |
other_user = users.get(other_user_id, {}) | |
conv_info['name'] = other_user.get('name', 'Unknown User') | |
conv_info['online'] = other_user.get('online', False) | |
else: | |
# For group chat | |
conv_info['name'] = conv.get('name', 'Group Chat') | |
conv_info['online'] = True # Groups are always "online" | |
# Get participant info | |
for pid in conv['participants']: | |
if pid in users: | |
user_data = users[pid] | |
conv_info['participants'].append({ | |
'user_id': pid, | |
'name': user_data['name'], | |
'unique_id': user_data['unique_id'], | |
'online': user_data['online'] | |
}) | |
# Get last message | |
if conv['messages']: | |
last_msg = conv['messages'][-1] | |
conv_info['last_message'] = { | |
'content': last_msg['content'], | |
'timestamp': last_msg['timestamp'], | |
'sender_name': users.get(last_msg['sender_id'], {}).get('name', 'Unknown') | |
} | |
else: | |
conv_info['last_message'] = None | |
result.append(conv_info) | |
return jsonify({'success': True, 'conversations': result}) | |
except Exception as e: | |
logging.error(f"Get conversations error: {e}") | |
return jsonify({'success': False, 'message': 'Failed to load conversations'}) | |
def get_messages(conversation_id): | |
"""Get messages for a conversation""" | |
try: | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}) | |
current_user_id = session['user_id'] | |
if conversation_id not in conversations: | |
return jsonify({'success': False, 'message': 'Conversation not found'}) | |
conv = conversations[conversation_id] | |
# Check if user is participant | |
if current_user_id not in conv['participants']: | |
return jsonify({'success': False, 'message': 'Access denied'}) | |
# Format messages | |
messages = [] | |
for msg in conv['messages']: | |
sender = users.get(msg['sender_id'], {}) | |
messages.append({ | |
'id': msg['id'], | |
'content': msg['content'], | |
'sender_id': msg['sender_id'], | |
'sender_name': sender.get('name', 'Unknown'), | |
'timestamp': msg['timestamp'], | |
'status': msg.get('status', 'sent'), | |
'seen_by': msg.get('seen_by', []) | |
}) | |
return jsonify({'success': True, 'messages': messages}) | |
except Exception as e: | |
logging.error(f"Get messages error: {e}") | |
return jsonify({'success': False, 'message': 'Failed to load messages'}) | |
def send_message(): | |
"""Send a message""" | |
try: | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}) | |
current_user_id = session['user_id'] | |
data = request.get_json() | |
conversation_id = data.get('conversation_id') | |
content = data.get('content', '').strip() | |
if not conversation_id or not content: | |
return jsonify({'success': False, 'message': 'Conversation ID and content required'}) | |
if conversation_id not in conversations: | |
return jsonify({'success': False, 'message': 'Conversation not found'}) | |
conv = conversations[conversation_id] | |
# Check if user is participant | |
if current_user_id not in conv['participants']: | |
return jsonify({'success': False, 'message': 'Access denied'}) | |
# Create message | |
message = { | |
'id': str(uuid.uuid4()), | |
'content': content, | |
'sender_id': current_user_id, | |
'timestamp': datetime.now().isoformat(), | |
'status': 'sent', | |
'seen_by': [current_user_id] # Sender has seen the message | |
} | |
conversations[conversation_id]['messages'].append(message) | |
return jsonify({'success': True, 'message': message}) | |
except Exception as e: | |
logging.error(f"Send message error: {e}") | |
return jsonify({'success': False, 'message': 'Failed to send message'}) | |
def mark_seen(): | |
"""Mark messages as seen""" | |
try: | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}) | |
current_user_id = session['user_id'] | |
data = request.get_json() | |
conversation_id = data.get('conversation_id') | |
if not conversation_id or conversation_id not in conversations: | |
return jsonify({'success': False, 'message': 'Invalid conversation'}) | |
conv = conversations[conversation_id] | |
# Mark all messages as seen by current user | |
for message in conv['messages']: | |
if current_user_id not in message.get('seen_by', []): | |
message['seen_by'].append(current_user_id) | |
return jsonify({'success': True}) | |
except Exception as e: | |
logging.error(f"Mark seen error: {e}") | |
return jsonify({'success': False, 'message': 'Failed to mark as seen'}) | |
def update_status(): | |
"""Update user online status""" | |
try: | |
if 'user_id' not in session: | |
return jsonify({'success': False, 'message': 'Not authenticated'}) | |
current_user_id = session['user_id'] | |
data = request.get_json() | |
online = data.get('online', True) | |
if current_user_id in users: | |
users[current_user_id]['online'] = online | |
users[current_user_id]['last_seen'] = datetime.now().isoformat() | |
return jsonify({'success': True}) | |
except Exception as e: | |
logging.error(f"Update status error: {e}") | |
return jsonify({'success': False, 'message': 'Failed to update status'}) | |
def logout(): | |
"""Logout user""" | |
if 'user_id' in session: | |
user_id = session['user_id'] | |
if user_id in users: | |
users[user_id]['online'] = False | |
users[user_id]['last_seen'] = datetime.now().isoformat() | |
session.pop('user_id', None) | |
return redirect(url_for('landing')) | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=5000, debug=True) | |