# ================================ # api-data.py # ================================ import json import asyncio import os import re import requests import chainlit as cl from dotenv import load_dotenv # --------------------------------=== API URL ===----------------------------------- API_URL = "https://firmahytte.daysoff.no/api/bookings" # --------------------------------=== environment ===------------------------------- load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") token = os.environ.get("DAYSOFF_API_TOKEN") # ----------------------------=== API error-handling ===-------------------------------- class APIConnectionError(Exception): """Raised when API connection fails""" pass class APIResponseError(Exception): """Raised when API returns an invalid response""" pass class BookingNotFoundError(Exception): """Raised when booking ID is not found""" pass async def async_post_request(url, headers, data): try: response = await asyncio.to_thread(requests.post, url, headers=headers, json=data) response.raise_for_status() return response except requests.ConnectionError: raise APIConnectionError("Failed to connect to booking service") except requests.Timeout: raise APIConnectionError("Request timed out") except requests.RequestException as e: raise APIResponseError(f"API request failed: {str(e)}") # ----------------------------=== booking_agent_system ===------------------------------ async def booking_agent_system(message: cl.Message, msg: cl.Message) -> bool: user_message = message.content # -- doc: extract message text booking_pattern = r'\b[A-Z]{6}\d{6}\b' # -- raw string (r'') match = re.search(booking_pattern, user_message) if match: preliminary = ( f"Takk for bookingnummer 👍🏻 " "Jeg skal sjekke dette for deg nå. Vennligst et øyeblikk bare..\n" ) for token in preliminary.split(): await msg.stream_token(token + " ") # --——> streams each token await asyncio.sleep(0.05) # --——> simulates d/-streaming delay await msg.send() bookingnumber = match.group() headers = { 'Authorization': f"Bearer {token}", "Content-Type": "application/json" } payload = { "token": "0v8mYvhh22REJidE2foWvlu2YQd", "bookingnumber": bookingnumber } try: response = await async_post_request(API_URL, headers, payload) response.raise_for_status() response_data = response.json() booking_data = response_data.get('data', {}) print("API Response Data:", json.dumps(booking_data, indent=2)) if not booking_data: raise BookingNotFoundError("Ingen booking funnet med dette bookingnummeret.") if "error" in booking_data: raise APIResponseError(booking_data["error"]) table = ( "\n\n## 𝘽𝙤𝙤𝙠𝙞𝙣𝙜 𝙞𝙣𝙛𝙤𝙧𝙢𝙖𝙨𝙟𝙤𝙣\n\n" f"| 𝐵𝑜𝑜𝑘𝑖𝑛𝑔𝑛𝑢𝑚𝑚𝑒𝑟 | {booking_data.get('order_number', 'N/A')} |\n" f"|:-----|:------------------|\n" f"| 𝙉𝙖𝙢𝙚 | {booking_data.get('name', 'N/A')} |\n" f"| 𝙏𝙞𝙢𝙚 𝙁𝙧𝙤𝙢 | {booking_data.get('time_from', 'N/A')} |\n" f"| 𝘼𝙧𝙧𝙞𝙫𝙖𝙡 𝙏𝙞𝙢𝙚 | {booking_data.get('timearrival', 'N/A')} |\n" f"| 𝙏𝙞𝙢𝙚 𝙏𝙤 | {booking_data.get('time_to', 'N/A')} |\n" f"| 𝘿𝙚𝙥𝙖𝙧𝙩𝙪𝙧𝙚 𝙏𝙞𝙢𝙚| {booking_data.get('timedeparture', 'N/A')} |\n" f"| 𝙏𝙞𝙢𝙚𝙯𝙤𝙣𝙚 | {booking_data.get('timezone', 'N/A')} |\n" f"| 𝘼𝙙𝙧𝙚𝙨𝙨 | {booking_data.get('adress', 'N/A')} |\n" f"| 𝙕𝙞𝙥 𝘾𝙤𝙙𝙚 | {booking_data.get('zip', 'N/A')} |\n" f"| 𝙋𝙡𝙖𝙘𝙚 | {booking_data.get('place', 'N/A')} |\n" f"| 𝘾𝙤𝙪𝙣𝙩𝙧𝙮 | {booking_data.get('country', 'N/A')} |\n" f"| 𝙇𝙤𝙘𝙖𝙩𝙞𝙤𝙣 | Lat: {booking_data.get('lat', 'N/A')}, Long: {booking_data.get('long', 'N/A')} |\n" f"| 𝙂𝙪𝙚𝙨𝙩𝙨 | {booking_data.get('guests', 'N/A')} |\n" f"| 𝙂𝙪𝙚𝙨𝙩 𝙎𝙪𝙢 | {booking_data.get('guest_sum', 'N/A')} |\n" f"| 𝘾𝙤𝙢𝙥𝙖𝙣𝙮 𝘾𝙤𝙣𝙩𝙧.| {booking_data.get('company_contribution', 'N/A')} |\n" f"| 𝙋𝙖𝙞𝙙 | {booking_data.get('paid', 'N/A')} |\n" f"| 𝘼𝙘𝙘𝙚𝙥𝙩𝙚𝙙 | {booking_data.get('accepted', 'N/A')} |\n" f"| 𝘾𝙖𝙣𝙘𝙚𝙡𝙡𝙚𝙙 | {'Yes' if booking_data.get('cancelled', 0) else 'No'} |\n" f"| 𝘾𝙤𝙣𝙩𝙖𝙘𝙩 𝙉𝙖𝙢𝙚 | {booking_data.get('contactname', 'N/A')} |\n" f"| 𝘾𝙤𝙣𝙩𝙖𝙘𝙩 𝙀𝙢𝙖𝙞𝙡| {booking_data.get('contactemail', 'N/A')} |\n" f"| 𝘾𝙤𝙣𝙩𝙖𝙘𝙩 𝙋𝙝𝙤𝙣𝙚| {booking_data.get('contactphone', 'N/A')} |\n" f"| 𝘾𝙤𝙪𝙣𝙩𝙧𝙮 𝘾𝙤𝙙𝙚 | {booking_data.get('country_code', 'N/A')} |\n" ) await msg.stream_token(table) await asyncio.sleep(0.07) await msg.send() header = ( "\n\n# 𝗩𝗶𝗸𝘁𝗶𝗴 𝗜𝗻𝗳𝗼𝗿𝗺𝗮𝘀𝗷𝗼𝗻 𝗼𝗺 𝗢𝗽𝗽𝗵𝗼𝗹𝗱𝗲𝘁\n\n" ) await msg.stream_token(header) await asyncio.sleep(0.07) await msg.send() # 𝘿𝙚𝙨𝙘𝙧𝙞𝙥𝙩𝙞𝙤𝙣 markdown = ( "\n\n# 🇩 🇪 🇸 🇨 🇷 🇮 🇵 🇹 🇮 🇴 🇳\n\n" f"{booking_data.get('description', 'N/A')}\n\n" f"————————————————————————————————————————————" "\n\n# 🇦 🇷 🇷 🇮 🇻 🇦 🇱\n\n" f"{booking_data.get('arrival', 'N/A')}\n\n" f"————————————————————————————————————————————" "\n\n# 🇩 🇪 🇵 🇦 🇷 🇹 🇺 🇷 🇪\n\n" f"{booking_data.get('departure', 'N/A')}\n\n" f"————————————————————————————————————————————" "\n\n# 🇧 🇪 🇩 🇩 🇮 🇳 🇬\n\n" f"{booking_data.get('bedding', 'N/A')}" ) await msg.stream_token(markdown) await asyncio.sleep(0.07) await msg.send() except (APIConnectionError, APIResponseError, BookingNotFoundError) as e: error_msg = ( f"🙄 {str(e)} " ) for token in error_msg.split(): await msg.stream_token(token + " ") #await asyncio.sleep(0.01) await msg.send() return True except requests.exceptions.RequestException as e: unexpected = ( f"🤯 {str(e)} " "En uventet feil oppstod. Vennligst kontakt Daysoff tech support" ) for token in unexpected.split(): await msg.stream_token(token + " ") #await asyncio.sleep(0.01) await msg.send() return True