|
|
|
from fastapi import APIRouter, HTTPException, Request |
|
from .Model import Message |
|
from .Schema import MessageCreate, MessageResponse |
|
from uuid import UUID |
|
from typing import List, Optional |
|
import re |
|
from datetime import datetime |
|
from decimal import Decimal |
|
|
|
message_router = APIRouter(tags=["Messages"], prefix="/messages") |
|
|
|
|
|
@message_router.post("/sms_received", response_model=MessageResponse) |
|
async def receive_message(request: Request): |
|
|
|
message_raw = await request.json() |
|
print(message_raw) |
|
message_data = MessageCreate(**message_raw) |
|
try: |
|
|
|
text = message_data.payload.message |
|
parsed_data = parse_message_content(text) |
|
|
|
|
|
new_message = await Message.create( |
|
device_id=message_data.deviceId, |
|
event=message_data.event, |
|
message_id=message_data.id, |
|
webhook_id=message_data.webhookId, |
|
message_content=text, |
|
phone_number=message_data.payload.phoneNumber, |
|
received_at=message_data.payload.receivedAt, |
|
sim_number=message_data.payload.simNumber, |
|
parsed_data=parsed_data, |
|
) |
|
return MessageResponse.from_orm(new_message) |
|
except Exception as e: |
|
raise HTTPException(status_code=400, detail=str(e)) |
|
|
|
|
|
def parse_message_content(text: str) -> Optional[dict]: |
|
|
|
pattern = r"(\w+)\sConfirmed\.You have received Tsh([\d,]+\.\d{2}) from (\d{12}) - ([A-Z ]+) on (\d{1,2}/\d{1,2}/\d{2}) at ([\d:]+ [APM]+).*?balance is Tsh([\d,]+\.\d{2})" |
|
|
|
matches = re.search(pattern, text) |
|
if matches: |
|
data = { |
|
"transaction_id": matches.group(1), |
|
"amount_received": parse_decimal(matches.group(2)), |
|
"phone_number": matches.group(3), |
|
"name": matches.group(4).strip(), |
|
"date": parse_date(matches.group(5), matches.group(6)), |
|
"new_balance": parse_decimal(matches.group(7)), |
|
} |
|
return data |
|
else: |
|
|
|
return None |
|
|
|
|
|
def parse_decimal(amount_str: str) -> float: |
|
|
|
amount_str = amount_str.replace(",", "") |
|
return float(Decimal(amount_str)) |
|
|
|
|
|
def parse_date(date_str: str, time_str: str) -> str: |
|
|
|
datetime_str = f"{date_str} {time_str}" |
|
|
|
try: |
|
dt = datetime.strptime(datetime_str, "%d/%m/%y %I:%M %p") |
|
return dt.isoformat() |
|
except ValueError: |
|
return datetime_str |
|
|