athspi / app.py
Athspi's picture
Update app.py
3ec474d verified
# app.py - Flask Backend with URL Shortening System
from flask import Flask, request, jsonify, send_from_directory, make_response
import google.generativeai as genai
from dotenv import load_dotenv
import os
from flask_cors import CORS
import markdown2
import re
import requests
from gtts import gTTS
import uuid
# Load environment variables
load_dotenv()
# Configure paths
AUDIO_FOLDER = os.path.join('static', 'audio')
os.makedirs(AUDIO_FOLDER, exist_ok=True)
app = Flask(__name__, static_folder='static')
CORS(app)
# AI Configuration - Critical for URL detection and formatting
system_instruction = """
You are a helpful AI assistant named Athspi. When responding:
1. Never mention "audio" or technical terms
2. For responses that would benefit from audio (like stories, explanations, or content meant to be heard), include the audio version between these markers:
[AUDIO]content here[/AUDIO]
3. Keep responses natural and friendly
4. Decide automatically when to include audio based on the content type
5. For stories, always include audio version
6. CRITICAL: When you detect a URL in the user's message, respond with:
[SHORTEN]original_url|short_url[/SHORTEN]
Example format ONLY:
[SHORTEN]https://example.com|https://spoo.me/abc123[/SHORTEN]
7. DO NOT add any extra text inside the SHORTEN tags
8. ONLY use this format when you have a shortened URL to provide
9. If you cannot shorten the URL, do not include the SHORTEN tags
"""
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
model = genai.GenerativeModel('gemini-2.5-flash', system_instruction=system_instruction)
# In-memory session storage
chat_sessions = {}
def convert_markdown_to_html(text):
"""Convert markdown to HTML with code formatting"""
html = markdown2.markdown(text, extras=["fenced-code-blocks", "tables"])
html = re.sub(r'<pre><code(.*?)>', r'<pre class="code-block"><code\1>', html)
return html
def process_response(full_response):
"""Extract visible text, audio content, and short links from AI response"""
# Extract audio content
audio_match = re.search(r'\[AUDIO\](.*?)\[/AUDIO\]', full_response, re.DOTALL)
audio_content = audio_match.group(1).strip() if audio_match else None
visible_text = re.sub(r'\[/?AUDIO\]', '', full_response)
# Process short links - CRITICAL PART
short_link_match = re.search(r'\[SHORTEN\](.*?)\|([^|]*?)\[/SHORTEN\]', visible_text, re.DOTALL)
if short_link_match:
original_url = short_link_match.group(1).strip()
short_url = short_link_match.group(2).strip()
# Create clean HTML for display
link_html = f'<div class="link-container">' \
f'<p><strong>πŸ”— Original:</strong> <a href="{original_url}" target="_blank" rel="noopener noreferrer">{original_url}</a></p>' \
f'<p><strong>βœ‚οΈ Shortened:</strong> <a href="{short_url}" target="_blank" rel="noopener noreferrer">{short_url}</a></p>' \
f'</div>'
# Replace the SHORTEN tags with our clean HTML
visible_text = re.sub(r'\[SHORTEN\].*?\[/SHORTEN\]', link_html, visible_text)
return visible_text, audio_content
def generate_audio(text):
"""Generate audio file from text, cleaning special characters"""
text = re.sub(r'[^\w\s.,!?\-]', '', text)
filename = f"audio_{uuid.uuid4()}.mp3"
filepath = os.path.join(AUDIO_FOLDER, filename)
tts = gTTS(text=text, lang='en', slow=False)
tts.save(filepath)
return filename
def shorten_url_with_spoo_me(url):
"""Shorten URL using spoo.me API with proper error handling"""
try:
# Clean the input URL
clean_url = url.strip()
if not clean_url.startswith(('http://', 'https://')):
clean_url = 'https://' + clean_url
payload = {
"url": clean_url
}
headers = {
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded"
}
response = requests.post(
"https://spoo.me/",
data=payload,
headers=headers,
timeout=10
)
if response.status_code == 200:
try:
# Parse the JSON response - THIS IS THE CORRECT WAY
result = response.json()
# Extract short_url and strip whitespace
short_url = result.get("short_url", "").strip()
if short_url and short_url.startswith('http'):
return short_url
except Exception:
# If JSON parsing fails, try plain text response
text_response = response.text.strip()
if text_response.startswith('http'):
return text_response
elif len(text_response) > 3:
return f"https://spoo.me/{text_response}"
print(f"spoo.me error: {response.status_code} - {response.text}")
return None
except Exception as e:
print("Request failed:", str(e))
return None
@app.route('/chat', methods=['POST'])
def chat():
try:
data = request.json
user_message = data.get('message', '').strip()
session_id = request.cookies.get('session_id') or str(uuid.uuid4())
if not user_message:
return jsonify({"error": "Message required"}), 400
# Get or create chat session
if session_id not in chat_sessions:
chat_sessions[session_id] = model.start_chat(history=[])
chat_session = chat_sessions[session_id]
# Check if user message contains a URL
url_match = re.search(r'https?://[^\s<>"{}|\\^`\[\]]+', user_message)
full_text = ""
if url_match:
original_url = url_match.group(0).strip()
short_url = shorten_url_with_spoo_me(original_url)
if short_url:
# Format the SHORTEN tag exactly as required
short_tag = f"[SHORTEN]{original_url}|{short_url}[/SHORTEN]"
# Send message to AI with instruction to include short link
ai_response = chat_session.send_message(
f"{user_message}\n\n"
f"I detected a URL in your message: {original_url}\n"
f"Shortened version: {short_url}\n\n"
f"Please respond naturally and include this exact format:\n{short_tag}"
)
full_text = ai_response.text
else:
# If shortening failed, just send normal message
ai_response = chat_session.send_message(user_message)
full_text = ai_response.text
else:
# No URL found, normal chat
ai_response = chat_session.send_message(user_message)
full_text = ai_response.text
# Process final response
visible_text, audio_content = process_response(full_text)
html_response = convert_markdown_to_html(visible_text)
result = {
"response_html": html_response,
"has_audio": False
}
# Generate audio if needed
if audio_content:
audio_filename = generate_audio(audio_content)
result["audio_filename"] = audio_filename
result["has_audio"] = True
# Return response + set session cookie
resp = make_response(jsonify(result))
resp.set_cookie('session_id', session_id, max_age=3600, httponly=True, samesite='Lax')
return resp
except Exception as e:
print("Error:", str(e))
return jsonify({"error": "Something went wrong."}), 500
@app.route('/new-chat', methods=['POST'])
def new_chat():
"""Reset chat memory"""
session_id = str(uuid.uuid4())
resp = make_response(jsonify({"status": "new chat started"}))
resp.set_cookie('session_id', session_id, max_age=3600, httponly=True, samesite='Lax')
return resp
@app.route('/download/<filename>')
def download_audio(filename):
try:
return send_from_directory(AUDIO_FOLDER, filename, as_attachment=True)
except FileNotFoundError:
return jsonify({"error": "Audio file not found"}), 404
@app.route('/')
def serve_index():
return send_from_directory('static', 'index.html')
@app.route('/<path:path>')
def serve_static(path):
return send_from_directory('static', path)
if __name__ == '__main__':
app.run(host="0.0.0.0", port=7860)