# # app.py # import os # import json # import traceback # import torch # import gradio as gr # import numpy as np # from PIL import Image # import cv2 # import math # # --- استيراد من الملفات المنظمة في مشروعك --- # from model import build_interfuser_model # from logic import ( # transform, lidar_transform, InterfuserController, ControllerConfig, # Tracker, DisplayInterface, render, render_waypoints, render_self_car, # ensure_rgb, WAYPOINT_SCALE_FACTOR, T1_FUTURE_TIME, T2_FUTURE_TIME # ) # # ============================================================================== # # 1. إعدادات ومسارات النماذج # # ============================================================================== # WEIGHTS_DIR = "model" # EXAMPLES_DIR = "examples" # device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # MODELS_SPECIFIC_CONFIGS = { # "interfuser_baseline": { "rgb_backbone_name": "r50", "embed_dim": 256, "direct_concat": True }, # "interfuser_lightweight": { "rgb_backbone_name": "r26", "embed_dim": 128, "enc_depth": 4, "dec_depth": 4, "direct_concat": True } # } # def find_available_models(): # if not os.path.isdir(WEIGHTS_DIR): return [] # return [f.replace(".pth", "") for f in os.listdir(WEIGHTS_DIR) if f.endswith(".pth")] # # ============================================================================== # # 2. الدوال الأساسية # # ============================================================================== # def load_model(model_name: str): # if not model_name or "لم يتم" in model_name: # return None, "الرجاء اختيار نموذج صالح." # weights_path = os.path.join(WEIGHTS_DIR, f"{model_name}.pth") # print(f"Building model: '{model_name}'") # model_config = MODELS_SPECIFIC_CONFIGS.get(model_name, {}) # model = build_interfuser_model(model_config) # if not os.path.exists(weights_path): # gr.Warning(f"ملف الأوزان '{weights_path}' غير موجود.") # else: # try: # state_dic = torch.load(weights_path, map_location=device, weights_only=True) # model.load_state_dict(state_dic) # print(f"تم تحميل أوزان النموذج '{model_name}' بنجاح.") # except Exception as e: # gr.Warning(f"فشل تحميل الأوزان للنموذج '{model_name}': {e}.") # model.to(device) # model.eval() # return model, f"تم تحميل نموذج: {model_name}" # def run_single_frame( # model_from_state, rgb_image_path, rgb_left_image_path, rgb_right_image_path, # rgb_center_image_path, lidar_image_path, measurements_path, target_point_list # ): # """ # (نسخة أكثر قوة مع معالجة أخطاء مفصلة) # """ # if model_from_state is None: # print("API session detected or model not loaded. Loading default model...") # available_models = find_available_models() # if not available_models: raise gr.Error("لا توجد نماذج متاحة للتحميل.") # model_to_use, _ = load_model(available_models[0]) # else: # model_to_use = model_from_state # if model_to_use is None: # raise gr.Error("فشل تحميل النموذج. تحقق من السجلات (Logs).") # try: # # --- 1. التحقق من المدخلات المطلوبة --- # if not (rgb_image_path and measurements_path): # raise gr.Error("الرجاء توفير الصورة الأمامية وملف القياسات على الأقل.") # # --- 2. قراءة ومعالجة المدخلات مع معالجة أخطاء مفصلة --- # try: # rgb_image_pil = Image.open(rgb_image_path).convert("RGB") # except Exception as e: # raise gr.Error(f"فشل تحميل صورة الكاميرا الأمامية. تأكد من أن الملف صحيح. الخطأ: {e}") # def load_optional_image(path, default_image): # if path: # try: # return Image.open(path).convert("RGB") # except Exception as e: # raise gr.Error(f"فشل تحميل الصورة الاختيارية '{os.path.basename(path)}'. الخطأ: {e}") # return default_image # rgb_left_pil = load_optional_image(rgb_left_image_path, rgb_image_pil) # rgb_right_pil = load_optional_image(rgb_right_image_path, rgb_image_pil) # rgb_center_pil = load_optional_image(rgb_center_image_path, rgb_image_pil) # if lidar_image_path: # try: # lidar_array = np.load(lidar_image_path) # if lidar_array.max() > 0: lidar_array = (lidar_array / lidar_array.max()) * 255.0 # lidar_pil = Image.fromarray(lidar_array.astype(np.uint8)).convert('RGB') # except Exception as e: # raise gr.Error(f"فشل تحميل ملف الليدار (.npy). تأكد من أن الملف صحيح. الخطأ: {e}") # else: # lidar_pil = Image.fromarray(np.zeros((112, 112, 3), dtype=np.uint8)) # try: # with open(measurements_path, 'r') as f: m_dict = json.load(f) # except Exception as e: # raise gr.Error(f"فشل تحميل أو قراءة ملف القياسات (.json). تأكد من أنه بصيغة صحيحة. الخطأ: {e}") # # --- 3. تحويل البيانات إلى تنسورات --- # front_tensor = transform(rgb_image_pil).unsqueeze(0).to(device) # left_tensor = transform(rgb_left_pil).unsqueeze(0).to(device) # right_tensor = transform(rgb_right_pil).unsqueeze(0).to(device) # center_tensor = transform(rgb_center_pil).unsqueeze(0).to(device) # lidar_tensor = lidar_transform(lidar_pil).unsqueeze(0).to(device) # measurements_tensor = torch.tensor([[ # m_dict.get('x',0.0), m_dict.get('y',0.0), m_dict.get('theta',0.0), m_dict.get('speed',5.0), # m_dict.get('steer',0.0), m_dict.get('throttle',0.0), float(m_dict.get('brake',0.0)), # m_dict.get('command',2.0), float(m_dict.get('is_junction',0.0)), float(m_dict.get('should_brake',0.0)) # ]], dtype=torch.float32).to(device) # target_point_tensor = torch.tensor([target_point_list], dtype=torch.float32).to(device) # inputs = {'rgb': front_tensor, 'rgb_left': left_tensor, 'rgb_right': right_tensor, 'rgb_center': center_tensor, 'lidar': lidar_tensor, 'measurements': measurements_tensor, 'target_point': target_point_tensor} # # --- 4. تشغيل النموذج --- # with torch.no_grad(): # outputs = model_to_use(inputs) # traffic, waypoints, is_junction, traffic_light, stop_sign, _ = outputs # # --- 5. المعالجة اللاحقة والتصوّر --- # speed, pos, theta = m_dict.get('speed',5.0), [m_dict.get('x',0.0), m_dict.get('y',0.0)], m_dict.get('theta',0.0) # traffic_np, waypoints_np = traffic[0].detach().cpu().numpy().reshape(20,20,-1), waypoints[0].detach().cpu().numpy() * WAYPOINT_SCALE_FACTOR # tracker, controller = Tracker(), InterfuserController(ControllerConfig()) # updated_traffic = tracker.update_and_predict(traffic_np.copy(), pos, theta, 0) # steer, throttle, brake, metadata = controller.run_step(speed, waypoints_np, is_junction.sigmoid()[0,1].item(), traffic_light.sigmoid()[0,0].item(), stop_sign.sigmoid()[0,1].item(), updated_traffic) # # ... (كود الرسم) # map_t0, counts_t0 = render(updated_traffic, t=0) # map_t1, counts_t1 = render(updated_traffic, t=T1_FUTURE_TIME) # map_t2, counts_t2 = render(updated_traffic, t=T2_FUTURE_TIME) # wp_map = render_waypoints(waypoints_np) # self_car_map = render_self_car(np.array([0,0]), [math.cos(0), math.sin(0)], [4.0, 2.0]) # map_t0 = cv2.add(cv2.add(map_t0, wp_map), self_car_map) # map_t0 = cv2.resize(map_t0, (400, 400)) # map_t1 = cv2.add(ensure_rgb(map_t1), ensure_rgb(self_car_map)); map_t1 = cv2.resize(map_t1, (200, 200)) # map_t2 = cv2.add(ensure_rgb(map_t2), ensure_rgb(self_car_map)); map_t2 = cv2.resize(map_t2, (200, 200)) # display = DisplayInterface() # light_state, stop_sign_state = "Red" if traffic_light.sigmoid()[0,0].item() > 0.5 else "Green", "Yes" if stop_sign.sigmoid()[0,1].item() > 0.5 else "No" # interface_data = {'camera_view': np.array(rgb_image_pil),'map_t0': map_t0,'map_t1': map_t1,'map_t2': map_t2, # 'text_info': {'Control': f"S:{steer:.2f} T:{throttle:.2f} B:{int(brake)}",'Light': f"L: {light_state}",'Stop': f"St: {stop_sign_state}"}, # 'object_counts': {'t0': counts_t0,'t1': counts_t1,'t2': counts_t2}} # dashboard_image = display.run_interface(interface_data) # # --- 6. تجهيز المخرجات --- # control_commands_dict = {"steer": steer, "throttle": throttle, "brake": bool(brake)} # return Image.fromarray(dashboard_image), control_commands_dict # except gr.Error as e: # raise e # أعد إظهار أخطاء Gradio كما هي # except Exception as e: # print(traceback.format_exc()) # raise gr.Error(f"حدث خطأ غير متوقع أثناء معالجة الإطار: {e}") # # ============================================================================== # # 5. تعريف واجهة Gradio (لا تغيير هنا) # # ============================================================================== # # ... (كود الواجهة بالكامل يبقى كما هو من النسخة السابقة) ... # available_models = find_available_models() # with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"), css=".gradio-container {max-width: 95% !important;}") as demo: # model_state = gr.State(value=None) # gr.Markdown("# 🚗 محاكاة القيادة الذاتية باستخدام Interfuser") # gr.Markdown("مرحباً بك في واجهة اختبار نموذج Interfuser. اتبع الخطوات أدناه لتشغيل المحاكاة على إطار واحد.") # with gr.Row(): # # -- العمود الأيسر: الإعدادات والمدخلات -- # with gr.Column(scale=1): # with gr.Group(): # gr.Markdown("## ⚙️ الخطوة 1: اختر النموذج") # with gr.Row(): # model_selector = gr.Dropdown( # label="النماذج المتاحة", # choices=available_models, # value=available_models[0] if available_models else "لم يتم العثور على نماذج" # ) # status_textbox = gr.Textbox(label="حالة النموذج", interactive=False) # with gr.Group(): # gr.Markdown("## 🗂️ الخطوة 2: ارفع ملفات السيناريو") # with gr.Group(): # gr.Markdown("**(مطلوب)**") # api_rgb_image_path = gr.File(label="صورة الكاميرا الأمامية (RGB)", type="filepath") # api_measurements_path = gr.File(label="ملف القياسات (JSON)", type="filepath") # with gr.Accordion("📷 مدخلات اختيارية (كاميرات ومستشعرات إضافية)", open=False): # api_rgb_left_image_path = gr.File(label="كاميرا اليسار (RGB)", type="filepath") # api_rgb_right_image_path = gr.File(label="كاميرا اليمين (RGB)", type="filepath") # api_rgb_center_image_path = gr.File(label="كاميرا الوسط (RGB)", type="filepath") # api_lidar_image_path = gr.File(label="بيانات الليدار (NPY)", type="filepath") # api_target_point_list = gr.JSON(label="📍 النقطة المستهدفة (x, y)", value=[0.0, 100.0]) # api_run_button = gr.Button("🚀 شغل المحاكاة", variant="primary", scale=2) # with gr.Group(): # gr.Markdown("### ✨ أمثلة جاهزة") # gr.Markdown("انقر على مثال لتعبئة الحقول تلقائياً (يتطلب وجود مجلد `examples`).") # gr.Examples( # examples=[ # [os.path.join(EXAMPLES_DIR, "sample1", "rgb.jpg"), os.path.join(EXAMPLES_DIR, "sample1", "measurements.json")], # [os.path.join(EXAMPLES_DIR, "sample2", "rgb.jpg"), os.path.join(EXAMPLES_DIR, "sample2", "measurements.json")] # ], # inputs=[api_rgb_image_path, api_measurements_path], # label="اختر سيناريو اختبار" # ) # # -- العمود الأيمن: المخرجات -- # with gr.Column(scale=2): # with gr.Group(): # gr.Markdown("## 📊 الخطوة 3: شاهد النتائج") # api_output_image = gr.Image(label="لوحة التحكم المرئية (Dashboard)", type="pil", interactive=False) # api_control_json = gr.JSON(label="أوامر التحكم (JSON)") # # --- ربط منطق الواجهة --- # if available_models: # demo.load(fn=load_model, inputs=model_selector, outputs=[model_state, status_textbox]) # model_selector.change(fn=load_model, inputs=model_selector, outputs=[model_state, status_textbox]) # api_run_button.click( # fn=run_single_frame, # inputs=[model_state, api_rgb_image_path, api_rgb_left_image_path, api_rgb_right_image_path, # api_rgb_center_image_path, api_lidar_image_path, api_measurements_path, api_target_point_list], # outputs=[api_output_image, api_control_json], # api_name="run_single_frame" # ) # # ============================================================================== # # 6. تشغيل التطبيق # # ============================================================================== # if __name__ == "__main__": # if not available_models: # print("تحذير: لم يتم العثور على أي ملفات نماذج (.pth) في مجلد 'model/weights'.") # demo.queue().launch(debug=True, share=True, show_api=True) # # الحديد # # app.py (النسخة المدمجة مع FastAPI) # import os # import json # import traceback # import torch # import gradio as gr # import numpy as np # from PIL import Image # import io # import base64 # import cv2 # import math # from fastapi import FastAPI, UploadFile, File, Form, HTTPException # ✅ استيراد FastAPI # from typing import List # ✅ استيراد للـ Type Hinting # # --- استيراد من الملفات المنظمة في مشروعك --- # from model import build_interfuser_model # from logic import ( # transform, lidar_transform, InterfuserController, ControllerConfig, # Tracker, DisplayInterface, render, render_waypoints, render_self_car, # ensure_rgb, WAYPOINT_SCALE_FACTOR, T1_FUTURE_TIME, T2_FUTURE_TIME # ) # # ✅ ============================================================================== # # ✅ 0. إنشاء تطبيق FastAPI الرئيسي # # ✅ ============================================================================== # # هذا هو التطبيق الرئيسي الذي سيتم تشغيله. # # سيحتوي على كل من واجهة Gradio وواجهة API المخصصة. # app = FastAPI() # # ============================================================================== # # 1. إعدادات ومسارات النماذج (لا تغيير) # # ============================================================================== # WEIGHTS_DIR = "model" # EXAMPLES_DIR = "examples" # device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # MODELS_SPECIFIC_CONFIGS = { # "interfuser_baseline": { "rgb_backbone_name": "r50", "embed_dim": 256, "direct_concat": True }, # "interfuser_lightweight": { "rgb_backbone_name": "r26", "embed_dim": 128, "enc_depth": 4, "dec_depth": 4, "direct_concat": True } # } # def find_available_models(): # if not os.path.isdir(WEIGHTS_DIR): return [] # return [f.replace(".pth", "") for f in os.listdir(WEIGHTS_DIR) if f.endswith(".pth")] # # ============================================================================== # # 2. الدوال الأساسية (لا تغيير) # # ============================================================================== # # ... (دالة load_model تبقى كما هي تمامًا) ... # def load_model(model_name: str): # # ... نفس الكود ... # if not model_name or "لم يتم" in model_name: # return None, "الرجاء اختيار نموذج صالح." # weights_path = os.path.join(WEIGHTS_DIR, f"{model_name}.pth") # print(f"Building model: '{model_name}'") # model_config = MODELS_SPECIFIC_CONFIGS.get(model_name, {}) # model = build_interfuser_model(model_config) # if not os.path.exists(weights_path): # gr.Warning(f"ملف الأوزان '{weights_path}' غير موجود.") # else: # try: # state_dic = torch.load(weights_path, map_location=device, weights_only=True) # model.load_state_dict(state_dic) # print(f"تم تحميل أوزان النموذج '{model_name}' بنجاح.") # except Exception as e: # gr.Warning(f"فشل تحميل الأوزان للنموذج '{model_name}': {e}.") # model.to(device) # model.eval() # return model, f"تم تحميل نموذج: {model_name}" # # ... (دالة run_single_frame تبقى كما هي تمامًا) ... # def run_single_frame( # model_from_state, rgb_image_path, rgb_left_image_path, rgb_right_image_path, # rgb_center_image_path, lidar_image_path, measurements_path, target_point_list # ): # # ... نفس الكود ... # if model_from_state is None: # print("API session detected or model not loaded. Loading default model...") # available_models = find_available_models() # if not available_models: raise gr.Error("لا توجد نماذج متاحة للتحميل.") # model_to_use, _ = load_model(available_models[0]) # else: # model_to_use = model_from_state # if model_to_use is None: # raise gr.Error("فشل تحميل النموذج. تحقق من السجلات (Logs).") # try: # # ... (بقية الكود داخل الدالة لا يتغير) ... # if not (rgb_image_path and measurements_path): # raise gr.Error("الرجاء توفير الصورة الأمامية وملف القياسات على الأقل.") # try: # rgb_image_pil = Image.open(rgb_image_path).convert("RGB") # except Exception as e: # raise gr.Error(f"فشل تحميل صورة الكاميرا الأمامية. تأكد من أن الملف صحيح. الخطأ: {e}") # def load_optional_image(path, default_image): # if path: # try: return Image.open(path).convert("RGB") # except Exception as e: raise gr.Error(f"فشل تحميل الصورة الاختيارية '{os.path.basename(path)}'. الخطأ: {e}") # return default_image # rgb_left_pil = load_optional_image(rgb_left_image_path, rgb_image_pil) # rgb_right_pil = load_optional_image(rgb_right_image_path, rgb_image_pil) # rgb_center_pil = load_optional_image(rgb_center_image_path, rgb_image_pil) # if lidar_image_path: # try: # lidar_array = np.load(lidar_image_path) # if lidar_array.max() > 0: lidar_array = (lidar_array / lidar_array.max()) * 255.0 # lidar_pil = Image.fromarray(lidar_array.astype(np.uint8)).convert('RGB') # except Exception as e: raise gr.Error(f"فشل تحميل ملف الليدار (.npy). تأكد من أن الملف صحيح. الخطأ: {e}") # else: # lidar_pil = Image.fromarray(np.zeros((112, 112, 3), dtype=np.uint8)) # try: # with open(measurements_path, 'r') as f: m_dict = json.load(f) # except Exception as e: raise gr.Error(f"فشل تحميل أو قراءة ملف القياسات (.json). تأكد من أنه بصيغة صحيحة. الخطأ: {e}") # front_tensor = transform(rgb_image_pil).unsqueeze(0).to(device) # left_tensor = transform(rgb_left_pil).unsqueeze(0).to(device) # right_tensor = transform(rgb_right_pil).unsqueeze(0).to(device) # center_tensor = transform(rgb_center_pil).unsqueeze(0).to(device) # lidar_tensor = lidar_transform(lidar_pil).unsqueeze(0).to(device) # measurements_tensor = torch.tensor([[m_dict.get('x',0.0), m_dict.get('y',0.0), m_dict.get('theta',0.0), m_dict.get('speed',5.0), m_dict.get('steer',0.0), m_dict.get('throttle',0.0), float(m_dict.get('brake',0.0)), m_dict.get('command',2.0), float(m_dict.get('is_junction',0.0)), float(m_dict.get('should_brake',0.0))]], dtype=torch.float32).to(device) # target_point_tensor = torch.tensor([target_point_list], dtype=torch.float32).to(device) # inputs = {'rgb': front_tensor, 'rgb_left': left_tensor, 'rgb_right': right_tensor, 'rgb_center': center_tensor, 'lidar': lidar_tensor, 'measurements': measurements_tensor, 'target_point': target_point_tensor} # with torch.no_grad(): # outputs = model_to_use(inputs) # traffic, waypoints, is_junction, traffic_light, stop_sign, _ = outputs # speed, pos, theta = m_dict.get('speed',5.0), [m_dict.get('x',0.0), m_dict.get('y',0.0)], m_dict.get('theta',0.0) # traffic_np, waypoints_np = traffic[0].detach().cpu().numpy().reshape(20,20,-1), waypoints[0].detach().cpu().numpy() * WAYPOINT_SCALE_FACTOR # tracker, controller = Tracker(), InterfuserController(ControllerConfig()) # updated_traffic = tracker.update_and_predict(traffic_np.copy(), pos, theta, 0) # steer, throttle, brake, metadata = controller.run_step(speed, waypoints_np, is_junction.sigmoid()[0,1].item(), traffic_light.sigmoid()[0,0].item(), stop_sign.sigmoid()[0,1].item(), updated_traffic) # map_t0, counts_t0 = render(updated_traffic, t=0) # map_t1, counts_t1 = render(updated_traffic, t=T1_FUTURE_TIME) # map_t2, counts_t2 = render(updated_traffic, t=T2_FUTURE_TIME) # wp_map = render_waypoints(waypoints_np) # self_car_map = render_self_car(np.array([0,0]), [math.cos(0), math.sin(0)], [4.0, 2.0]) # map_t0 = cv2.add(cv2.add(map_t0, wp_map), self_car_map); map_t0 = cv2.resize(map_t0, (400, 400)) # map_t1 = cv2.add(ensure_rgb(map_t1), ensure_rgb(self_car_map)); map_t1 = cv2.resize(map_t1, (200, 200)) # map_t2 = cv2.add(ensure_rgb(map_t2), ensure_rgb(self_car_map)); map_t2 = cv2.resize(map_t2, (200, 200)) # display = DisplayInterface() # light_state, stop_sign_state = "Red" if traffic_light.sigmoid()[0,0].item() > 0.5 else "Green", "Yes" if stop_sign.sigmoid()[0,1].item() > 0.5 else "No" # interface_data = {'camera_view': np.array(rgb_image_pil),'map_t0': map_t0,'map_t1': map_t1,'map_t2': map_t2, 'text_info': {'Control': f"S:{steer:.2f} T:{throttle:.2f} B:{int(brake)}",'Light': f"L: {light_state}",'Stop': f"St: {stop_sign_state}"}, 'object_counts': {'t0': counts_t0,'t1': counts_t1,'t2': counts_t2}} # dashboard_image = display.run_interface(interface_data) # control_commands_dict = {"steer": steer, "throttle": throttle, "brake": bool(brake)} # return Image.fromarray(dashboard_image), control_commands_dict # except gr.Error as e: raise e # except Exception as e: # print(traceback.format_exc()) # raise gr.Error(f"حدث خطأ غير متوقع أثناء معالجة الإطار: {e}") # # ✅ ============================================================================== # # ✅ 3. تعريف نقطة النهاية المخصصة (Custom API) باستخدام FastAPI # # ✅ ============================================================================== # @app.post("/api/predict_flutter", tags=["Flutter API"]) # async def flutter_predict_endpoint( # rgb_image: UploadFile = File(..., description="صورة الكاميرا الأمامية المطلوبة"), # measurements_json: UploadFile = File(..., description="ملف القياسات المطلوب بصيغة JSON"), # target_point: str = Form(default='[0.0, 100.0]', description="النقطة المستهدفة كـ JSON string"), # # المدخلات الاختيارية # rgb_left_image: UploadFile = File(None), # rgb_right_image: UploadFile = File(None), # rgb_center_image: UploadFile = File(None), # lidar_data: UploadFile = File(None), # ): # """ # نقطة نهاية بسيطة ومخصصة لتطبيق فلاتر. # تستقبل الملفات مباشرة وتستدعي دالة النموذج. # """ # print("✅ Custom API endpoint /api/predict_flutter called!") # # دالة داخلية لحفظ الملفات المرفوعة مؤقتاً # async def save_upload_file(upload_file: UploadFile, destination: str): # if not upload_file: return None # try: # with open(destination, "wb") as f: # f.write(await upload_file.read()) # return destination # except Exception as e: # raise HTTPException(status_code=500, detail=f"Could not save file: {e}") # # حفظ الملفات المطلوبة والاختيارية في مسارات مؤقتة # temp_rgb_path = await save_upload_file(rgb_image, "temp_rgb.png") # temp_measurements_path = await save_upload_file(measurements_json, "temp_measurements.json") # temp_left_path = await save_upload_file(rgb_left_image, "temp_left.png") # temp_right_path = await save_upload_file(rgb_right_image, "temp_right.png") # temp_center_path = await save_upload_file(rgb_center_image, "temp_center.png") # temp_lidar_path = await save_upload_file(lidar_data, "temp_lidar.npy") # try: # target_point_list = json.loads(target_point) # except json.JSONDecodeError: # raise HTTPException(status_code=400, detail="Invalid JSON format for target_point.") # try: # # استدعاء دالة النموذج مباشرة بالمسارات المؤقتة # # لا نحتاج لـ model_from_state لأننا سنقوم بتحميل النموذج مباشرة # dashboard_pil, commands_dict = run_single_frame( # model_from_state=None, # سيتم تحميل النموذج الافتراضي داخل الدالة # rgb_image_path=temp_rgb_path, # rgb_left_image_path=temp_left_path, # rgb_right_image_path=temp_right_path, # rgb_center_image_path=temp_center_path, # lidar_image_path=temp_lidar_path, # measurements_path=temp_measurements_path, # target_point_list=target_point_list # ) # # --- ✅ التعديل هنا --- # # تحويل صورة PIL إلى بيانات ثنائية في الذاكرة # buffered = io.BytesIO() # dashboard_pil.save(buffered, format="PNG") # # تشفير البيانات الثنائية إلى نص Base64 # img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") # print("✅ Model execution successful. Returning commands and Base64 image.") # # إرجاع كائن JSON يحتوي على كل من الأوامر والصورة المشفرة # return { # "control_commands": commands_dict, # "dashboard_image_base64": img_str # } # # # FastAPI لا يمكنه إرجاع كائن PIL مباشرة، يجب تحويله # # # يمكننا إعادته كـ Base64 أو حفظه وإرجاع مساره # # # للتبسيط، سنرجع فقط أوامر التحكم # # print("✅ Model execution successful. Returning control commands.") # # return commands_dict # except gr.Error as e: # # تحويل أخطاء Gradio إلى أخطاء HTTP # raise HTTPException(status_code=400, detail=str(e)) # except Exception as e: # print(traceback.format_exc()) # raise HTTPException(status_code=500, detail=f"An internal server error occurred: {e}") # finally: # # ✅ تنظيف الملفات المؤقتة بعد الاستخدام # for path in [temp_rgb_path, temp_measurements_path, temp_left_path, temp_right_path, temp_center_path, temp_lidar_path]: # if path and os.path.exists(path): # os.remove(path) # # ============================================================================== # # 4. تعريف واجهة Gradio (لا تغيير) # # ============================================================================== # available_models = find_available_models() # with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"), css=".gradio-container {max-width: 95% !important;}") as demo: # # ... (كل كود واجهة Gradio يبقى كما هو تمامًا) ... # model_state = gr.State(value=None) # gr.Markdown("# 🚗 محاكاة القيادة الذاتية باستخدام Interfuser") # gr.Markdown("مرحباً بك في واجهة اختبار نموذج Interfuser. اتبع الخطوات أدناه لتشغيل المحاكاة على إطار واحد.") # with gr.Row(): # with gr.Column(scale=1): # with gr.Group(): # gr.Markdown("## ⚙️ الخطوة 1: اختر النموذج") # with gr.Row(): # model_selector = gr.Dropdown(label="النماذج المتاحة", choices=available_models, value=available_models[0] if available_models else "لم يتم العثور على نماذج") # status_textbox = gr.Textbox(label="حالة النموذج", interactive=False) # with gr.Group(): # gr.Markdown("## 🗂️ الخطوة 2: ارفع ملفات السيناريو") # with gr.Group(): # gr.Markdown("**(مطلوب)**") # api_rgb_image_path = gr.File(label="صورة الكاميرا الأمامية (RGB)", type="filepath") # api_measurements_path = gr.File(label="ملف القياسات (JSON)", type="filepath") # with gr.Accordion("📷 مدخلات اختيارية (كاميرات ومستشعرات إضافية)", open=False): # api_rgb_left_image_path = gr.File(label="كاميرا اليسار (RGB)", type="filepath") # api_rgb_right_image_path = gr.File(label="كاميرا اليمين (RGB)", type="filepath") # api_rgb_center_image_path = gr.File(label="كاميرا الوسط (RGB)", type="filepath") # api_lidar_image_path = gr.File(label="بيانات الليدار (NPY)", type="filepath") # api_target_point_list = gr.JSON(label="📍 النقطة المستهدفة (x, y)", value=[0.0, 100.0]) # api_run_button = gr.Button("🚀 شغل المحاكاة", variant="primary", scale=2) # with gr.Group(): # gr.Markdown("### ✨ أمثلة جاهزة") # gr.Markdown("انقر على مثال لتعبئة الحقول تلقائياً (يتطلب وجود مجلد `examples`).") # gr.Examples(examples=[[os.path.join(EXAMPLES_DIR, "sample1", "rgb.jpg"), os.path.join(EXAMPLES_DIR, "sample1", "measurements.json")], [os.path.join(EXAMPLES_DIR, "sample2", "rgb.jpg"), os.path.join(EXAMPLES_DIR, "sample2", "measurements.json")]], inputs=[api_rgb_image_path, api_measurements_path], label="اختر سيناريو اختبار") # with gr.Column(scale=2): # with gr.Group(): # gr.Markdown("## 📊 الخطوة 3: شاهد النتائج") # api_output_image = gr.Image(label="لوحة التحكم المرئية (Dashboard)", type="pil", interactive=False) # api_control_json = gr.JSON(label="أوامر التحكم (JSON)") # if available_models: # demo.load(fn=load_model, inputs=model_selector, outputs=[model_state, status_textbox]) # model_selector.change(fn=load_model, inputs=model_selector, outputs=[model_state, status_textbox]) # api_run_button.click(fn=run_single_frame, inputs=[model_state, api_rgb_image_path, api_rgb_left_image_path, api_rgb_right_image_path, api_rgb_center_image_path, api_lidar_image_path, api_measurements_path, api_target_point_list], outputs=[api_output_image, api_control_json], api_name="run_single_frame") # # ✅ ============================================================================== # # ✅ 5. تركيب واجهة Gradio على تطبيق FastAPI # # ✅ ============================================================================== # # هذه هي الخطوة السحرية التي تدمج العالمين معًا. # # app = gr.mount_ публіk(app, demo, path="/") # app = gr.mount_gradio_app(app, demo, path="/") # # ✅ ============================================================================== # # ✅ 6. تشغيل الخادم المدمج (نقطة الدخول) # # ✅ ============================================================================== # # هذا الجزء يخبر السكربت أنه عند تشغيله مباشرة، # # يجب أن يقوم بتشغيل تطبيق FastAPI باستخدام خادم uvicorn. # if __name__ == "__main__": # import uvicorn # # Hugging Face Spaces يتوقع أن يعمل التطبيق على المنفذ 7860 # # و host="0.0.0.0" يجعله متاحًا للوصول من خارج الحاوية (container) # uvicorn.run(app, host="0.0.0.0", port=7860) # app.py (النسخة النهائية المدمجة مع توثيق FastAPI) # ------------------------------------------------- ##-- 1. إضافة الاستيرادات اللازمة للتوثيق # ------------------------------------------------- import os import json import traceback import torch import gradio as gr import numpy as np from PIL import Image import io import base64 import cv2 import math from fastapi import FastAPI, UploadFile, File, Form, HTTPException from pydantic import BaseModel, Field from typing import List, Dict # --- استيراد من الملفات المنظمة في مشروعك --- from model import build_interfuser_model from logic import ( transform, lidar_transform, InterfuserController, ControllerConfig, Tracker, DisplayInterface, render, render_waypoints, render_self_car, ensure_rgb, WAYPOINT_SCALE_FACTOR, T1_FUTURE_TIME, T2_FUTURE_TIME ) # ------------------------------------------------- ##-- 2. تعريف تطبيق FastAPI مع وصف عام # ------------------------------------------------- app = FastAPI( title="API لمحاكاة القيادة الذاتية (Interfuser)", description=""" واجهة برمجة تطبيقات مخصصة للتحكم في نموذج Interfuser. يحتوي هذا التطبيق على: - **واجهة رسومية (UI)** على المسار الرئيسي (`/`) للتفاعل البصري. - **واجهة برمجية (API)** على المسار (`/api/predict_flutter`) مخصصة للتطبيقات مثل فلاتر. - **توثيق تفاعلي** على المسار (`/docs`). """, version="1.1.0" ) # ------------------------------------------------- ##-- 3. تعريف هياكل البيانات (Schemas) للمدخلات والمخرجات # ------------------------------------------------- class ControlCommands(BaseModel): steer: float = Field(..., example=-0.61, description="قيمة التوجيه (Steering). تتراوح بين -1 (يسار) و 1 (يمين).") throttle: float = Field(..., example=0.75, description="قيمة التسارع (Throttle). تتراوح بين 0 و 1.") brake: bool = Field(..., example=False, description="هل يجب الضغط على المكابح (Brake)؟") class PredictionResponse(BaseModel): control_commands: ControlCommands = Field(..., description="كائن يحتوي على أوامر التحكم المتوقعة.") dashboard_image_base64: str = Field(..., description="صورة لوحة التحكم كـ نص مشفر بصيغة Base64.") # ============================================================================== # 1. إعدادات ومسارات النماذج (لا تغيير) # ============================================================================== # ... (هذا الجزء يبقى كما هو تمامًا) ... WEIGHTS_DIR = "model" EXAMPLES_DIR = "examples" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") MODELS_SPECIFIC_CONFIGS = { "interfuser_baseline": { "rgb_backbone_name": "r50", "embed_dim": 256, "direct_concat": True }, "interfuser_lightweight": { "rgb_backbone_name": "r26", "embed_dim": 128, "enc_depth": 4, "dec_depth": 4, "direct_concat": True } } def find_available_models(): if not os.path.isdir(WEIGHTS_DIR): return [] return [f.replace(".pth", "") for f in os.listdir(WEIGHTS_DIR) if f.endswith(".pth")] # ============================================================================== # 2. الدوال الأساسية (لا تغيير) # ============================================================================== # ... (دالة load_model ودالة run_single_frame تبقيان كما هما تمامًا) ... def load_model(model_name: str): if not model_name or "لم يتم" in model_name: return None, "الرجاء اختيار نموذج صالح." weights_path = os.path.join(WEIGHTS_DIR, f"{model_name}.pth") print(f"Building model: '{model_name}'") model_config = MODELS_SPECIFIC_CONFIGS.get(model_name, {}) model = build_interfuser_model(model_config) if not os.path.exists(weights_path): gr.Warning(f"ملف الأوزان '{weights_path}' غير موجود.") else: try: state_dic = torch.load(weights_path, map_location=device, weights_only=True) model.load_state_dict(state_dic) print(f"تم تحميل أوزان النموذج '{model_name}' بنجاح.") except Exception as e: gr.Warning(f"فشل تحميل الأوزان للنموذج '{model_name}': {e}.") model.to(device) model.eval() return model, f"تم تحميل نموذج: {model_name}" def run_single_frame(model_from_state, rgb_image_path, rgb_left_image_path, rgb_right_image_path, rgb_center_image_path, lidar_image_path, measurements_path, target_point_list): if model_from_state is None: print("API session detected or model not loaded. Loading default model...") available_models = find_available_models() if not available_models: raise gr.Error("لا توجد نماذج متاحة للتحميل.") model_to_use, _ = load_model(available_models[0]) else: model_to_use = model_from_state if model_to_use is None: raise gr.Error("فشل تحميل النموذج. تحقق من السجلات (Logs).") try: if not (rgb_image_path and measurements_path): raise gr.Error("الرجاء توفير الصورة الأمامية وملف القياسات على الأقل.") try: rgb_image_pil = Image.open(rgb_image_path).convert("RGB") except Exception as e: raise gr.Error(f"فشل تحميل صورة الكاميرا الأمامية. تأكد من أن الملف صحيح. الخطأ: {e}") def load_optional_image(path, default_image): if path: try: return Image.open(path).convert("RGB") except Exception as e: raise gr.Error(f"فشل تحميل الصورة الاختيارية '{os.path.basename(path)}'. الخطأ: {e}") return default_image rgb_left_pil = load_optional_image(rgb_left_image_path, rgb_image_pil) rgb_right_pil = load_optional_image(rgb_right_image_path, rgb_image_pil) rgb_center_pil = load_optional_image(rgb_center_image_path, rgb_image_pil) if lidar_image_path: try: lidar_array = np.load(lidar_image_path) if lidar_array.max() > 0: lidar_array = (lidar_array / lidar_array.max()) * 255.0 lidar_pil = Image.fromarray(lidar_array.astype(np.uint8)).convert('RGB') except Exception as e: raise gr.Error(f"فشل تحميل ملف الليدار (.npy). تأكد من أن الملف صحيح. الخطأ: {e}") else: lidar_pil = Image.fromarray(np.zeros((112, 112, 3), dtype=np.uint8)) try: with open(measurements_path, 'r') as f: m_dict = json.load(f) except Exception as e: raise gr.Error(f"فشل تحميل أو قراءة ملف القياسات (.json). تأكد من أنه بصيغة صحيحة. الخطأ: {e}") front_tensor = transform(rgb_image_pil).unsqueeze(0).to(device) left_tensor = transform(rgb_left_pil).unsqueeze(0).to(device) right_tensor = transform(rgb_right_pil).unsqueeze(0).to(device) center_tensor = transform(rgb_center_pil).unsqueeze(0).to(device) lidar_tensor = lidar_transform(lidar_pil).unsqueeze(0).to(device) measurements_tensor = torch.tensor([[m_dict.get('x',0.0), m_dict.get('y',0.0), m_dict.get('theta',0.0), m_dict.get('speed',5.0), m_dict.get('steer',0.0), m_dict.get('throttle',0.0), float(m_dict.get('brake',0.0)), m_dict.get('command',2.0), float(m_dict.get('is_junction',0.0)), float(m_dict.get('should_brake',0.0))]], dtype=torch.float32).to(device) target_point_tensor = torch.tensor([target_point_list], dtype=torch.float32).to(device) inputs = {'rgb': front_tensor, 'rgb_left': left_tensor, 'rgb_right': right_tensor, 'rgb_center': center_tensor, 'lidar': lidar_tensor, 'measurements': measurements_tensor, 'target_point': target_point_tensor} with torch.no_grad(): outputs = model_to_use(inputs) traffic, waypoints, is_junction, traffic_light, stop_sign, _ = outputs speed, pos, theta = m_dict.get('speed',5.0), [m_dict.get('x',0.0), m_dict.get('y',0.0)], m_dict.get('theta',0.0) traffic_np, waypoints_np = traffic[0].detach().cpu().numpy().reshape(20,20,-1), waypoints[0].detach().cpu().numpy() * WAYPOINT_SCALE_FACTOR tracker, controller = Tracker(), InterfuserController(ControllerConfig()) updated_traffic = tracker.update_and_predict(traffic_np.copy(), pos, theta, 0) steer, throttle, brake, metadata = controller.run_step(speed, waypoints_np, is_junction.sigmoid()[0,1].item(), traffic_light.sigmoid()[0,0].item(), stop_sign.sigmoid()[0,1].item(), updated_traffic) map_t0, counts_t0 = render(updated_traffic, t=0) map_t1, counts_t1 = render(updated_traffic, t=T1_FUTURE_TIME) map_t2, counts_t2 = render(updated_traffic, t=T2_FUTURE_TIME) wp_map = render_waypoints(waypoints_np) self_car_map = render_self_car(np.array([0,0]), [math.cos(0), math.sin(0)], [4.0, 2.0]) map_t0 = cv2.add(cv2.add(map_t0, wp_map), self_car_map); map_t0 = cv2.resize(map_t0, (400, 400)) map_t1 = cv2.add(ensure_rgb(map_t1), ensure_rgb(self_car_map)); map_t1 = cv2.resize(map_t1, (200, 200)) map_t2 = cv2.add(ensure_rgb(map_t2), ensure_rgb(self_car_map)); map_t2 = cv2.resize(map_t2, (200, 200)) display = DisplayInterface() light_state, stop_sign_state = "Red" if traffic_light.sigmoid()[0,0].item() > 0.5 else "Green", "Yes" if stop_sign.sigmoid()[0,1].item() > 0.5 else "No" interface_data = {'camera_view': np.array(rgb_image_pil),'map_t0': map_t0,'map_t1': map_t1,'map_t2': map_t2, 'text_info': {'Control': f"S:{steer:.2f} T:{throttle:.2f} B:{int(brake)}",'Light': f"L: {light_state}",'Stop': f"St: {stop_sign_state}"}, 'object_counts': {'t0': counts_t0,'t1': counts_t1,'t2': counts_t2}} dashboard_image = display.run_interface(interface_data) control_commands_dict = {"steer": steer, "throttle": throttle, "brake": bool(brake)} return Image.fromarray(dashboard_image), control_commands_dict except gr.Error as e: raise e except Exception as e: print(traceback.format_exc()) raise gr.Error(f"حدث خطأ غير متوقع أثناء معالجة الإطار: {e}") # ------------------------------------------------- ##-- 4. تعديل نقطة النهاية المخصصة (API Endpoint) بالتوثيق # ------------------------------------------------- @app.post( "/api/predict_flutter", tags=["Flutter API"], summary="التنبؤ بأوامر القيادة لإطار واحد", description=""" يقوم هذا الـ Endpoint بمعالجة بيانات إطار واحد من مستشعرات السيارة (صور، قياسات) ويتنبأ بأوامر التحكم اللازمة (التوجيه، التسارع، المكابح)، بالإضافة إلى إرجاع صورة لوحة التحكم البصرية (Dashboard). """, response_model=PredictionResponse, # استخدام نموذج المخرجات المحدد responses={ 400: {"description": "خطأ في مدخلات العميل (مثل JSON غير صالح)"}, 422: {"description": "خطأ في التحقق من صحة البيانات (مثل ملف مطلوب مفقود)"}, 500: {"description": "خطأ داخلي في الخادم أثناء معالجة النموذج"}, } ) async def flutter_predict_endpoint( rgb_image: UploadFile = File(..., description="صورة الكاميرا الأمامية بصيغة PNG أو JPG."), measurements_json: UploadFile = File(..., description="ملف القياسات الحالي بصيغة JSON."), target_point: str = Form( default='[0.0, 100.0]', description="النقطة المستهدفة كـ JSON string. مثال: '[50.0, 20.0]'" ), rgb_left_image: UploadFile = File(None, description="صورة اختيارية من كاميرا اليسار."), rgb_right_image: UploadFile = File(None, description="صورة اختيارية من كاميرا اليمين."), rgb_center_image: UploadFile = File(None, description="صورة اختيارية من كاميرا الوسط."), lidar_data: UploadFile = File(None, description="ملف بيانات الليدار الاختياري بصيغة .npy."), ): print("✅ Custom API endpoint /api/predict_flutter called!") async def save_upload_file(upload_file: UploadFile, destination: str): if not upload_file: return None try: with open(destination, "wb") as f: f.write(await upload_file.read()) return destination except Exception as e: raise HTTPException(status_code=500, detail=f"Could not save file: {e}") temp_rgb_path = await save_upload_file(rgb_image, "temp_rgb.png") temp_measurements_path = await save_upload_file(measurements_json, "temp_measurements.json") temp_left_path = await save_upload_file(rgb_left_image, "temp_left.png") temp_right_path = await save_upload_file(rgb_right_image, "temp_right.png") temp_center_path = await save_upload_file(rgb_center_image, "temp_center.png") temp_lidar_path = await save_upload_file(lidar_data, "temp_lidar.npy") try: target_point_list = json.loads(target_point) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON format for target_point.") try: dashboard_pil, commands_dict = run_single_frame( model_from_state=None, rgb_image_path=temp_rgb_path, rgb_left_image_path=temp_left_path, rgb_right_image_path=temp_right_path, rgb_center_image_path=temp_center_path, lidar_image_path=temp_lidar_path, measurements_path=temp_measurements_path, target_point_list=target_point_list ) buffered = io.BytesIO() dashboard_pil.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") print("✅ Model execution successful. Returning commands and Base64 image.") # التأكد من أن الرد يتبع هيكل Pydantic المحدد return PredictionResponse( control_commands=ControlCommands(**commands_dict), dashboard_image_base64=img_str ) except gr.Error as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: print(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"An internal server error occurred: {e}") finally: for path in [temp_rgb_path, temp_measurements_path, temp_left_path, temp_right_path, temp_center_path, temp_lidar_path]: if path and os.path.exists(path): os.remove(path) # ============================================================================== # 5. تعريف واجهة Gradio (لا تغيير) # ============================================================================== # ... (هذا الجزء يبقى كما هو تمامًا) ... available_models = find_available_models() with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"), css=".gradio-container {max-width: 95% !important;}") as demo: model_state = gr.State(value=None) gr.Markdown("# 🚗 محاكاة القيادة الذاتية باستخدام Interfuser") gr.Markdown("مرحباً بك في واجهة اختبار نموذج Interfuser. اتبع الخطوات أدناه لتشغيل المحاكاة على إطار واحد.") with gr.Row(): with gr.Column(scale=1): with gr.Group(): gr.Markdown("## ⚙️ الخطوة 1: اختر النموذج") with gr.Row(): model_selector = gr.Dropdown(label="النماذج المتاحة", choices=available_models, value=available_models[0] if available_models else "لم يتم العثور على نماذج") status_textbox = gr.Textbox(label="حالة النموذج", interactive=False) with gr.Group(): gr.Markdown("## 🗂️ الخطوة 2: ارفع ملفات السيناريو") with gr.Group(): gr.Markdown("**(مطلوب)**") api_rgb_image_path = gr.File(label="صورة الكاميرا الأمامية (RGB)", type="filepath") api_measurements_path = gr.File(label="ملف القياسات (JSON)", type="filepath") with gr.Accordion("📷 مدخلات اختيارية (كاميرات ومستشعرات إضافية)", open=False): api_rgb_left_image_path = gr.File(label="كاميرا اليسار (RGB)", type="filepath") api_rgb_right_image_path = gr.File(label="كاميرا اليمين (RGB)", type="filepath") api_rgb_center_image_path = gr.File(label="كاميرا الوسط (RGB)", type="filepath") api_lidar_image_path = gr.File(label="بيانات الليدار (NPY)", type="filepath") api_target_point_list = gr.JSON(label="📍 النقطة المستهدفة (x, y)", value=[0.0, 100.0]) api_run_button = gr.Button("🚀 شغل المحاكاة", variant="primary", scale=2) with gr.Group(): gr.Markdown("### ✨ أمثلة جاهزة") gr.Markdown("انقر على مثال لتعبئة الحقول تلقائياً (يتطلب وجود مجلد `examples`).") gr.Examples(examples=[[os.path.join(EXAMPLES_DIR, "sample1", "rgb.jpg"), os.path.join(EXAMPLES_DIR, "sample1", "measurements.json")], [os.path.join(EXAMPLES_DIR, "sample2", "rgb.jpg"), os.path.join(EXAMPLES_DIR, "sample2", "measurements.json")]], inputs=[api_rgb_image_path, api_measurements_path], label="اختر سيناريو اختبار") with gr.Column(scale=2): with gr.Group(): gr.Markdown("## 📊 الخطوة 3: شاهد النتائج") api_output_image = gr.Image(label="لوحة التحكم المرئية (Dashboard)", type="pil", interactive=False) api_control_json = gr.JSON(label="أوامر التحكم (JSON)") if available_models: demo.load(fn=load_model, inputs=model_selector, outputs=[model_state, status_textbox]) model_selector.change(fn=load_model, inputs=model_selector, outputs=[model_state, status_textbox]) api_run_button.click(fn=run_single_frame, inputs=[model_state, api_rgb_image_path, api_rgb_left_image_path, api_rgb_right_image_path, api_rgb_center_image_path, api_lidar_image_path, api_measurements_path, api_target_point_list], outputs=[api_output_image, api_control_json], api_name="run_single_frame") # ============================================================================== # 6. تركيب واجهة Gradio على تطبيق FastAPI # ============================================================================== app = gr.mount_gradio_app(app, demo, path="/") # ============================================================================== # 7. تشغيل الخادم المدمج (نقطة الدخول) # ============================================================================== if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)