Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, Request, HTTPException | |
from fastapi.responses import JSONResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel | |
from typing import List, Optional | |
from datetime import datetime | |
import uuid | |
import json | |
import os | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# In-memory database (for demo purposes) | |
users_db = {} | |
chats_db = {} | |
messages_db = {} | |
# Helper functions | |
def save_data(): | |
with open('users.json', 'w') as f: | |
json.dump(users_db, f) | |
with open('chats.json', 'w') as f: | |
json.dump(chats_db, f) | |
with open('messages.json', 'w') as f: | |
json.dump(messages_db, f) | |
def load_data(): | |
global users_db, chats_db, messages_db | |
if os.path.exists('users.json'): | |
with open('users.json', 'r') as f: | |
users_db = json.load(f) | |
if os.path.exists('chats.json'): | |
with open('chats.json', 'r') as f: | |
chats_db = json.load(f) | |
if os.path.exists('messages.json'): | |
with open('messages.json', 'r') as f: | |
messages_db = json.load(f) | |
load_data() | |
# Request Models | |
class CreateUserRequest(BaseModel): | |
username: str | |
email: str | |
class SendMessageRequest(BaseModel): | |
sender_id: str | |
receiver_id: Optional[str] = None | |
chat_id: Optional[str] = None | |
content: str | |
class CreateGroupRequest(BaseModel): | |
creator_id: str | |
group_name: str | |
participants: List[str] | |
class UpdatePresenceRequest(BaseModel): | |
user_id: str | |
online: bool | |
# API Endpoints | |
async def create_user(data: CreateUserRequest): | |
user_id = str(uuid.uuid4()) | |
users_db[user_id] = { | |
'username': data.username, | |
'email': data.email, | |
'online': False, | |
'last_seen': None, | |
'created_at': datetime.now().isoformat() | |
} | |
save_data() | |
return {"user_id": user_id, "status": "success"} | |
async def get_user(user_id: str): | |
if user_id in users_db: | |
return {"user": users_db[user_id], "status": "success"} | |
raise HTTPException(status_code=404, detail="User not found") | |
async def send_message(data: SendMessageRequest): | |
message_id = str(uuid.uuid4()) | |
timestamp = datetime.now().isoformat() | |
chat_id = data.chat_id | |
if not chat_id: | |
participants = sorted([data.sender_id, data.receiver_id]) | |
chat_id = 'private_' + '_'.join(participants) | |
if chat_id not in chats_db: | |
chats_db[chat_id] = { | |
'type': 'private', | |
'participants': participants, | |
'created_at': timestamp | |
} | |
elif chat_id not in chats_db: | |
raise HTTPException(status_code=404, detail="Chat not found") | |
messages_db[message_id] = { | |
'chat_id': chat_id, | |
'sender_id': data.sender_id, | |
'content': data.content, | |
'timestamp': timestamp, | |
'status': 'sent' | |
} | |
save_data() | |
return {"message_id": message_id, "status": "success"} | |
async def get_messages(chat_id: str, user_id: str): | |
if chat_id not in chats_db: | |
raise HTTPException(status_code=404, detail="Chat not found") | |
if user_id not in chats_db[chat_id]['participants']: | |
raise HTTPException(status_code=403, detail="Unauthorized") | |
chat_messages = [msg for msg in messages_db.values() if msg['chat_id'] == chat_id] | |
chat_messages.sort(key=lambda x: x['timestamp']) | |
for msg in chat_messages: | |
if msg['sender_id'] != user_id and msg['status'] == 'sent': | |
msg['status'] = 'delivered' | |
save_data() | |
return {"messages": chat_messages} | |
async def get_user_chats(user_id: str): | |
if user_id not in users_db: | |
raise HTTPException(status_code=404, detail="User not found") | |
user_chats = [] | |
for chat_id, chat in chats_db.items(): | |
if user_id in chat['participants']: | |
chat_messages = [msg for msg in messages_db.values() if msg['chat_id'] == chat_id] | |
chat_messages.sort(key=lambda x: x['timestamp']) | |
last_message = chat_messages[-1] if chat_messages else None | |
if chat['type'] == 'private': | |
other_user_id = [uid for uid in chat['participants'] if uid != user_id][0] | |
chat_name = users_db.get(other_user_id, {}).get('username', f'User {other_user_id[:4]}') | |
else: | |
chat_name = chat.get('group_name', 'Group Chat') | |
user_chats.append({ | |
'chat_id': chat_id, | |
'type': chat['type'], | |
'name': chat_name, | |
'participants': chat['participants'], | |
'last_message': last_message, | |
'created_at': chat['created_at'] | |
}) | |
user_chats.sort(key=lambda x: x['last_message']['timestamp'] if x['last_message'] else x['created_at'], reverse=True) | |
return {"chats": user_chats} | |
async def create_group(data: CreateGroupRequest): | |
user_id = data.creator_id | |
group_name = data.group_name | |
participants = data.participants | |
if user_id not in participants: | |
participants.append(user_id) | |
if len(participants) < 2: | |
raise HTTPException(status_code=400, detail="Group must have at least 2 participants") | |
chat_id = 'group_' + str(uuid.uuid4()) | |
timestamp = datetime.now().isoformat() | |
chats_db[chat_id] = { | |
'type': 'group', | |
'group_name': group_name, | |
'creator_id': user_id, | |
'participants': participants, | |
'created_at': timestamp | |
} | |
save_data() | |
return {"chat_id": chat_id, "status": "success"} | |
async def update_presence(data: UpdatePresenceRequest): | |
user_id = data.user_id | |
online = data.online | |
if user_id in users_db: | |
users_db[user_id]['online'] = online | |
users_db[user_id]['last_seen'] = None if online else datetime.now().isoformat() | |
save_data() | |
return {"status": "success"} | |
raise HTTPException(status_code=404, detail="User not found") | |
async def search_users(query: str = ""): | |
query = query.lower() | |
matching_users = [] | |
for user_id, user in users_db.items(): | |
if (query in user['username'].lower() or | |
query in user['email'].lower() or | |
query in user_id.lower()): | |
matching_users.append({ | |
'user_id': user_id, | |
'username': user['username'], | |
'email': user['email'] | |
}) | |
return {"users": matching_users} | |