Spaces:
Sleeping
Sleeping
# app.py - الكود النهائي المعدل ليتوافق مع بنية measurements.json الحقيقية | |
import os | |
import json | |
import gradio as gr | |
import base64 | |
from PIL import Image | |
import io | |
import requests | |
# ============================================================================== | |
# 1. إعدادات الـ API | |
# ============================================================================== | |
API_URL_BASE = "https://BaseerAI-baseer-server.hf.space" | |
API_START_SESSION_URL = f"{API_URL_BASE}/start_session" | |
API_RUN_STEP_URL = f"{API_URL_BASE}/run_step" | |
API_END_SESSION_URL = f"{API_URL_BASE}/end_session" | |
EXAMPLES_DIR = "examples" | |
# ============================================================================== | |
# 2. الدوال المساعدة والدوال التي تتصل بالـ API | |
# ============================================================================== | |
def image_to_base64(pil_image): | |
buffered = io.BytesIO() | |
pil_image.save(buffered, format="JPEG") | |
return base64.b64encode(buffered.getvalue()).decode('utf-8') | |
def base64_to_image(b64_string): | |
img_data = base64.b64decode(b64_string) | |
return Image.open(io.BytesIO(img_data)) | |
def start_new_session(): | |
"""يبدأ جلسة جديدة على الخادم ويعيد معرف الجلسة وأمر إظهار الواجهة.""" | |
print("Requesting to start a new session...") | |
try: | |
response = requests.post(API_START_SESSION_URL, timeout=30) | |
response.raise_for_status() | |
session_data = response.json() | |
session_id = session_data.get("session_id") | |
if not session_id: | |
raise gr.Error("لم يتمكن الخادم من بدء جلسة جديدة.") | |
status_message = f"✅ تم بدء جلسة جديدة بنجاح. أنت الآن متصل بالخادم." | |
print(f"Session started successfully: {session_id}") | |
return session_id, status_message, gr.update(visible=True) | |
except requests.exceptions.RequestException as e: | |
error_message = f"فشل الاتصال بخادم الـ API: {e}" | |
print(error_message) | |
raise gr.Error(error_message) | |
def handle_end_session(session_id): | |
"""ينهي الجلسة الحالية على الخادم.""" | |
if not session_id: | |
return "لا توجد جلسة نشطة لإنهائها.", None, gr.update(visible=False) | |
print(f"Requesting to end session: {session_id}") | |
try: | |
response = requests.post(f"{API_END_SESSION_URL}?session_id={session_id}", timeout=30) | |
response.raise_for_status() | |
status_message = f"🔌 تم إنهاء الجلسة بنجاح. انقطع الاتصال بالخادم." | |
print(status_message) | |
return status_message, None, gr.update(visible=False) | |
except requests.exceptions.RequestException as e: | |
error_message = f"حدث خطأ أثناء إنهاء الجلسة: {e}" | |
print(error_message) | |
raise gr.Error(error_message) | |
# --- [تم تعديل هذه الدالة] --- | |
def run_single_frame_via_api(session_id, rgb_image_path, measurements_path): | |
if not session_id: | |
raise gr.Error("لا توجد جلسة نشطة. يرجى بدء جلسة جديدة أولاً.") | |
if not (rgb_image_path and measurements_path): | |
raise gr.Error("الرجاء توفير الصورة الأمامية وملف القياسات على الأقل.") | |
try: | |
rgb_image_pil = Image.open(rgb_image_path).convert("RGB") | |
with open(measurements_path, 'r') as f: | |
measurements_dict = json.load(f) | |
image_b64 = image_to_base64(rgb_image_pil) | |
# --- [التعديل الرئيسي هنا] --- | |
# بناء الحمولة (Payload) بناءً على بنية الملف الحقيقية | |
payload = { | |
"session_id": session_id, | |
"image_b64": image_b64, | |
"measurements": { | |
"pos_global": measurements_dict.get('pos_global', [0.0, 0.0]), | |
"theta": measurements_dict.get('theta', 0.0), | |
"speed": measurements_dict.get('speed', 0.0), | |
"target_point": measurements_dict.get('target_point', [0.0, 100.0]) | |
} | |
} | |
print(f"Sending run_step request for session: {session_id}") | |
response = requests.post(API_RUN_STEP_URL, json=payload, timeout=60) | |
response.raise_for_status() | |
api_result = response.json() | |
dashboard_b64 = api_result.get("dashboard_b64") | |
control_commands = api_result.get("control_commands", {}) | |
if not dashboard_b64: | |
raise gr.Error("لم يتم إرجاع صورة لوحة التحكم من الـ API.") | |
output_image = base64_to_image(dashboard_b64) | |
return output_image, control_commands | |
except requests.exceptions.RequestException as e: | |
raise gr.Error(f"حدث خطأ أثناء الاتصال بالـ API: {e}") | |
except Exception as e: | |
raise gr.Error(f"حدث خطأ غير متوقع: {e}") | |
# ============================================================================== | |
# 4. تعريف واجهة Gradio | |
# ============================================================================== | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"), css=".gradio-container {max-width: 100% !important;}") as demo: | |
session_id_state = gr.State(value=None) | |
gr.Markdown("# 🚗 واجهة التحكم لنظام Baseer للقيادة الذاتية") | |
gr.Markdown("هذه الواجهة تعمل كـ **عميل (Client)** يتصل بـ **[Baseer Self-Driving API](https://huggingface.co/spaces/BaseerAI/baseer-server)** لمعالجة البيانات.") | |
gr.Markdown("---") | |
with gr.Group(): | |
gr.Markdown("## ⚙️ التحكم بالجلسة") | |
with gr.Row(): | |
start_session_button = gr.Button("🟢 بدء جلسة جديدة", variant="primary") | |
end_session_button = gr.Button("🔴 إنهاء الجلسة الحالية") | |
status_textbox = gr.Textbox(label="حالة الجلسة", interactive=False, value="في انتظار بدء الجلسة...") | |
gr.Markdown("") | |
with gr.Group(visible=False) as main_controls_group: | |
with gr.Row(): | |
# -- العمود الأيسر: الإعدادات والمدخلات -- | |
with gr.Column(scale=2): | |
with gr.Group(): | |
gr.Markdown("### 🗂️ ارفع ملفات السيناريو") | |
api_rgb_image_path = gr.Image(type="filepath", label="صورة الكاميرا الأمامية (RGB)") | |
api_measurements_path = gr.File(label="ملف القياسات (JSON)", type="filepath") | |
# --- [تم حذف gr.JSON من هنا] --- | |
api_run_button = gr.Button("🚀 أرسل البيانات للمعالجة", variant="primary") | |
gr.Markdown("") | |
# ... (داخل with gr.Group(visible=False) as main_controls_group:) | |
with gr.Group(): | |
gr.Markdown("### ✨ أمثلة جاهزة") | |
# التحقق من وجود مجلد الأمثلة لتجنب الأخطاء | |
if os.path.isdir(EXAMPLES_DIR) and \ | |
os.path.exists(os.path.join(EXAMPLES_DIR, "sample1", "measurements.json")) and \ | |
os.path.exists(os.path.join(EXAMPLES_DIR, "sample2", "measurements.json")): | |
# قراءة بيانات المثال الأول | |
with open(os.path.join(EXAMPLES_DIR, "sample1", "measurements.json"), 'r') as f: | |
data1 = json.load(f) | |
# قراءة بيانات المثال الثاني | |
with open(os.path.join(EXAMPLES_DIR, "sample2", "measurements.json"), 'r') as f: | |
data2 = json.load(f) | |
# --- [تعديل] 1: إنشاء مكونات جديدة لكل البيانات المستخدمة --- | |
# سيتم استخدام عناوينها كعناوين للأعمدة في الجدول | |
with gr.Column(visible=False): | |
example_speed = gr.Textbox(label="السرعة (m/s)") | |
example_theta = gr.Textbox(label="الاتجاه (Theta)") | |
example_pos = gr.Textbox(label="الموقع العام") | |
example_target = gr.Textbox(label="النقطة المستهدفة") | |
gr.Examples( | |
examples=[ | |
# --- [تعديل] 2: بناء البيانات للأعمدة الجديدة --- | |
[ | |
# المدخلات الأساسية | |
os.path.join(EXAMPLES_DIR, "sample1", "rgb.jpg"), | |
os.path.join(EXAMPLES_DIR, "sample1", "measurements.json"), | |
# البيانات للعرض فقط | |
f"{data1.get('speed', 0.0):.2f}", | |
f"{data1.get('theta', 0.0):.2f}", | |
str(data1.get('pos_global', 'N/A')), | |
str(data1.get('target_point', 'N/A')) | |
], | |
[ | |
# المدخلات الأساسية | |
os.path.join(EXAMPLES_DIR, "sample2", "rgb.jpg"), | |
os.path.join(EXAMPLES_DIR, "sample2", "measurements.json"), | |
# البيانات للعرض فقط | |
f"{data2.get('speed', 0.0):.2f}", | |
f"{data2.get('theta', 0.0):.2f}", | |
str(data2.get('pos_global', 'N/A')), | |
str(data2.get('target_point', 'N/A')) | |
] | |
], | |
# --- [تعديل] 3: تحديث المدخلات لتشمل كل الأعمدة --- | |
inputs=[ | |
api_rgb_image_path, | |
api_measurements_path, | |
example_speed, | |
example_theta, | |
example_pos, | |
example_target | |
], | |
label="اختر سيناريو اختبار", | |
) | |
else: | |
gr.Markdown("لم يتم العثور على مجلد الأمثلة (`examples`) أو محتوياته.") | |
# -- العمود الأيمن: المخرجات -- | |
with gr.Column(scale=3): | |
with gr.Group(): | |
gr.Markdown("### 📊 النتائج من الخادم") | |
api_output_image = gr.Image(label="لوحة التحكم المرئية (من الـ API)", type="pil", interactive=False, height=600) | |
api_control_json = gr.JSON(label="أوامر التحكم (من الـ API)") | |
# --- ربط منطق الواجهة --- | |
start_session_button.click( | |
fn=start_new_session, | |
inputs=None, | |
outputs=[session_id_state, status_textbox, main_controls_group] | |
) | |
end_session_button.click( | |
fn=handle_end_session, | |
inputs=[session_id_state], | |
outputs=[status_textbox, session_id_state, main_controls_group] | |
) | |
# --- [تم تعديل هذا الجزء] --- | |
api_run_button.click( | |
fn=run_single_frame_via_api, | |
inputs=[session_id_state, api_rgb_image_path, api_measurements_path], # تم حذف target_point_list | |
outputs=[api_output_image, api_control_json], | |
) | |
if __name__ == "__main__": | |
demo.queue().launch(debug=True) |