hotspot / App /Messages /MessagesRoute.py
Mbonea's picture
schema errors
2046386
raw
history blame
2.82 kB
# App/Messages/Routes.py
from fastapi import APIRouter, HTTPException, Request
from .Model import Message
from .Schema import MessageCreate, MessageResponse
from uuid import UUID
from typing import List, Optional
import re
from datetime import datetime
from decimal import Decimal
message_router = APIRouter(tags=["Messages"], prefix="/messages")
@message_router.post("/sms_received", response_model=MessageResponse)
async def receive_message(request: Request):
message_raw = await request.json()
print(message_raw)
message_data = MessageCreate(**message_raw)
try:
# Extract data from the message content using regex
text = message_data.payload.message
parsed_data = parse_message_content(text)
# Create a new message record with parsed_data
new_message = await Message.create(
device_id=message_data.deviceId,
event=message_data.event,
message_id=message_data.id,
webhook_id=message_data.webhookId,
message_content=text,
phone_number=message_data.payload.phoneNumber,
received_at=message_data.payload.receivedAt,
sim_number=message_data.payload.simNumber,
parsed_data=parsed_data,
)
return MessageResponse.from_orm(new_message)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
def parse_message_content(text: str) -> Optional[dict]:
# Regular expression to capture the data from the message
pattern = r"(\w+)\sConfirmed\.You have received Tsh([\d,]+\.\d{2}) from (\d{12}) - ([A-Z ]+) on (\d{1,2}/\d{1,2}/\d{2}) at ([\d:]+ [APM]+).*?balance is Tsh([\d,]+\.\d{2})"
matches = re.search(pattern, text)
if matches:
data = {
"transaction_id": matches.group(1),
"amount_received": parse_decimal(matches.group(2)),
"phone_number": matches.group(3),
"name": matches.group(4).strip(),
"date": parse_date(matches.group(5), matches.group(6)),
"new_balance": parse_decimal(matches.group(7)),
}
return data
else:
# Return None if the message doesn't match the expected format
return None
def parse_decimal(amount_str: str) -> float:
# Remove commas and convert to Decimal
amount_str = amount_str.replace(",", "")
return float(Decimal(amount_str))
def parse_date(date_str: str, time_str: str) -> str:
# Combine date and time strings and parse into ISO format
datetime_str = f"{date_str} {time_str}"
# Adjust the format as per the actual format in the message
try:
dt = datetime.strptime(datetime_str, "%d/%m/%y %I:%M %p")
return dt.isoformat()
except ValueError:
return datetime_str # Return as-is if parsing fails