|
|
|
from flask import Flask, request, jsonify, send_from_directory, make_response |
|
import google.generativeai as genai |
|
from dotenv import load_dotenv |
|
import os |
|
from flask_cors import CORS |
|
import markdown2 |
|
import re |
|
import requests |
|
from gtts import gTTS |
|
import uuid |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
AUDIO_FOLDER = os.path.join('static', 'audio') |
|
os.makedirs(AUDIO_FOLDER, exist_ok=True) |
|
|
|
app = Flask(__name__, static_folder='static') |
|
CORS(app) |
|
|
|
|
|
system_instruction = """ |
|
You are a helpful AI assistant named Athspi. When responding: |
|
1. Never mention "audio" or technical terms |
|
2. For responses that would benefit from audio (like stories, explanations, or content meant to be heard), include the audio version between these markers: |
|
[AUDIO]content here[/AUDIO] |
|
3. Keep responses natural and friendly |
|
4. Decide automatically when to include audio based on the content type |
|
5. For stories, always include audio version |
|
6. CRITICAL: When you detect a URL in the user's message, respond with: |
|
[SHORTEN]original_url|short_url[/SHORTEN] |
|
Example format ONLY: |
|
[SHORTEN]https://example.com|https://spoo.me/abc123[/SHORTEN] |
|
7. DO NOT add any extra text inside the SHORTEN tags |
|
8. ONLY use this format when you have a shortened URL to provide |
|
9. If you cannot shorten the URL, do not include the SHORTEN tags |
|
""" |
|
|
|
genai.configure(api_key=os.getenv("GEMINI_API_KEY")) |
|
model = genai.GenerativeModel('gemini-2.5-flash', system_instruction=system_instruction) |
|
|
|
|
|
chat_sessions = {} |
|
|
|
def convert_markdown_to_html(text): |
|
"""Convert markdown to HTML with code formatting""" |
|
html = markdown2.markdown(text, extras=["fenced-code-blocks", "tables"]) |
|
html = re.sub(r'<pre><code(.*?)>', r'<pre class="code-block"><code\1>', html) |
|
return html |
|
|
|
def process_response(full_response): |
|
"""Extract visible text, audio content, and short links from AI response""" |
|
|
|
audio_match = re.search(r'\[AUDIO\](.*?)\[/AUDIO\]', full_response, re.DOTALL) |
|
audio_content = audio_match.group(1).strip() if audio_match else None |
|
visible_text = re.sub(r'\[/?AUDIO\]', '', full_response) |
|
|
|
|
|
short_link_match = re.search(r'\[SHORTEN\](.*?)\|([^|]*?)\[/SHORTEN\]', visible_text, re.DOTALL) |
|
if short_link_match: |
|
original_url = short_link_match.group(1).strip() |
|
short_url = short_link_match.group(2).strip() |
|
|
|
|
|
link_html = f'<div class="link-container">' \ |
|
f'<p><strong>π Original:</strong> <a href="{original_url}" target="_blank" rel="noopener noreferrer">{original_url}</a></p>' \ |
|
f'<p><strong>βοΈ Shortened:</strong> <a href="{short_url}" target="_blank" rel="noopener noreferrer">{short_url}</a></p>' \ |
|
f'</div>' |
|
|
|
|
|
visible_text = re.sub(r'\[SHORTEN\].*?\[/SHORTEN\]', link_html, visible_text) |
|
|
|
return visible_text, audio_content |
|
|
|
def generate_audio(text): |
|
"""Generate audio file from text, cleaning special characters""" |
|
text = re.sub(r'[^\w\s.,!?\-]', '', text) |
|
filename = f"audio_{uuid.uuid4()}.mp3" |
|
filepath = os.path.join(AUDIO_FOLDER, filename) |
|
tts = gTTS(text=text, lang='en', slow=False) |
|
tts.save(filepath) |
|
return filename |
|
|
|
def shorten_url_with_spoo_me(url): |
|
"""Shorten URL using spoo.me API with proper error handling""" |
|
try: |
|
|
|
clean_url = url.strip() |
|
if not clean_url.startswith(('http://', 'https://')): |
|
clean_url = 'https://' + clean_url |
|
|
|
payload = { |
|
"url": clean_url |
|
} |
|
headers = { |
|
"Accept": "application/json", |
|
"Content-Type": "application/x-www-form-urlencoded" |
|
} |
|
|
|
response = requests.post( |
|
"https://spoo.me/", |
|
data=payload, |
|
headers=headers, |
|
timeout=10 |
|
) |
|
|
|
if response.status_code == 200: |
|
try: |
|
|
|
result = response.json() |
|
|
|
short_url = result.get("short_url", "").strip() |
|
if short_url and short_url.startswith('http'): |
|
return short_url |
|
except Exception: |
|
|
|
text_response = response.text.strip() |
|
if text_response.startswith('http'): |
|
return text_response |
|
elif len(text_response) > 3: |
|
return f"https://spoo.me/{text_response}" |
|
|
|
print(f"spoo.me error: {response.status_code} - {response.text}") |
|
return None |
|
|
|
except Exception as e: |
|
print("Request failed:", str(e)) |
|
return None |
|
|
|
@app.route('/chat', methods=['POST']) |
|
def chat(): |
|
try: |
|
data = request.json |
|
user_message = data.get('message', '').strip() |
|
session_id = request.cookies.get('session_id') or str(uuid.uuid4()) |
|
|
|
if not user_message: |
|
return jsonify({"error": "Message required"}), 400 |
|
|
|
|
|
if session_id not in chat_sessions: |
|
chat_sessions[session_id] = model.start_chat(history=[]) |
|
chat_session = chat_sessions[session_id] |
|
|
|
|
|
url_match = re.search(r'https?://[^\s<>"{}|\\^`\[\]]+', user_message) |
|
full_text = "" |
|
|
|
if url_match: |
|
original_url = url_match.group(0).strip() |
|
short_url = shorten_url_with_spoo_me(original_url) |
|
|
|
if short_url: |
|
|
|
short_tag = f"[SHORTEN]{original_url}|{short_url}[/SHORTEN]" |
|
|
|
|
|
ai_response = chat_session.send_message( |
|
f"{user_message}\n\n" |
|
f"I detected a URL in your message: {original_url}\n" |
|
f"Shortened version: {short_url}\n\n" |
|
f"Please respond naturally and include this exact format:\n{short_tag}" |
|
) |
|
full_text = ai_response.text |
|
else: |
|
|
|
ai_response = chat_session.send_message(user_message) |
|
full_text = ai_response.text |
|
else: |
|
|
|
ai_response = chat_session.send_message(user_message) |
|
full_text = ai_response.text |
|
|
|
|
|
visible_text, audio_content = process_response(full_text) |
|
html_response = convert_markdown_to_html(visible_text) |
|
|
|
result = { |
|
"response_html": html_response, |
|
"has_audio": False |
|
} |
|
|
|
|
|
if audio_content: |
|
audio_filename = generate_audio(audio_content) |
|
result["audio_filename"] = audio_filename |
|
result["has_audio"] = True |
|
|
|
|
|
resp = make_response(jsonify(result)) |
|
resp.set_cookie('session_id', session_id, max_age=3600, httponly=True, samesite='Lax') |
|
return resp |
|
|
|
except Exception as e: |
|
print("Error:", str(e)) |
|
return jsonify({"error": "Something went wrong."}), 500 |
|
|
|
@app.route('/new-chat', methods=['POST']) |
|
def new_chat(): |
|
"""Reset chat memory""" |
|
session_id = str(uuid.uuid4()) |
|
resp = make_response(jsonify({"status": "new chat started"})) |
|
resp.set_cookie('session_id', session_id, max_age=3600, httponly=True, samesite='Lax') |
|
return resp |
|
|
|
@app.route('/download/<filename>') |
|
def download_audio(filename): |
|
try: |
|
return send_from_directory(AUDIO_FOLDER, filename, as_attachment=True) |
|
except FileNotFoundError: |
|
return jsonify({"error": "Audio file not found"}), 404 |
|
|
|
@app.route('/') |
|
def serve_index(): |
|
return send_from_directory('static', 'index.html') |
|
|
|
@app.route('/<path:path>') |
|
def serve_static(path): |
|
return send_from_directory('static', path) |
|
|
|
if __name__ == '__main__': |
|
app.run(host="0.0.0.0", port=7860) |