from ultralytics import YOLO import time import numpy as np import mediapipe as mp from flask import Flask, request, jsonify, send_file import uvicorn from socketio import ASGIApp import cv2 from flask import Flask, render_template, request, Response, session, redirect, url_for, make_response from flask_socketio import SocketIO from flask_socketio import emit from flask_cors import CORS from flask_socketio import SocketIO import yt_dlp as youtube_dl import uvicorn import base64 import matplotlib.pyplot as plt import numpy as np import base64 from io import BytesIO from PIL import Image def plot_base64_image(image_base64): # Decode base64 string image_data = base64.b64decode(image_base64) # Convert bytes to PIL Image image = Image.open(BytesIO(image_data)) # Convert PIL Image to numpy array image_array = np.array(image) # Plot image plt.imshow(image_array) plt.axis('off') plt.show() # Example usage: # base64_image = "..." # Your base64 encoded image string # plot_base64_image(base64_image) model_object_detection = YOLO("bisindov2.pt") app = Flask(__name__) socketio = SocketIO(app, cors_allowed_origins="*") CORS(app) app.secret_key = 'flask-sockets-builds' ###################################################### classes_translation = { "all": "الكل", "A": "أ", "B": "ب", "C": "ج", "D": "د", "F": "ف", "H": "هـ", "I": "أنا", "J": "جيم", "L": "إل", "M": "إم", "O": "أو", "R": "ر", "T": "ت", "U": "يو", "V": "في", "W": "دبليو", "Z": "زد", "additional": "إضافي", "alcohol": "مدرسة", "allergy": "حساسية", "bacon": "لحم المقدد", "bag": "حقيبة", "barbecue": "شواء", "bill": "فاتورة", "biscuit": "بسكويت", "bitter": "مر", "bread": "خبز", "burger": "برغر", "bye": "وداعاً", "cheese": "جبن", "chicken": "دجاج", "coke": "كوكاكولا", "cold": "بارد", "cost": "تكلفة", "coupon": "كوبون", "cup": "كوب", "dessert": "حلوى", "drink": "شراب", "drive": "قيادة", "eat": "تناول الطعام", "eggs": "بيض", "enjoy": "استمتع", "fork": "شوكة", "french fries": "بطاطس مقلية", "fresh": "طازج", "hello": "مرحبا", "hot": "ساخن", "icecream": "آيس كريم", "ingredients": "مكونات", "juicy": "عصيري", "ketchup": "كاتشب", "lactose": "لاكتوز", "lettuce": "خس", "lid": "غطاء", "manager": "مدير", "menu": "قائمة الطعام", "milk": "حليب", "mustard": "خردل", "napkin": "منديل", "no": "لا", "order": "طلب", "pepper": "فلفل", "pickle": "مخلل", "pizza": "بيتزا", "please": "من فضلك", "ready": "جاهز", "refill": "إعادة ملء", "repeat": "كرر", "safe": "آمن", "salt": "ملح", "sandwich": "ساندويتش", "sauce": "صلصة", "small": "صغير", "soda": "صودا", "sorry": "آسف", "spicy": "حار", "spoon": "ملعقة", "straw": "قش", "sugar": "سكر", "sweet": "حلو", "tissues": "مناديل", "total": "مجموع", "urgent": "عاجل", "vegetables": "خضروات", "warm": "دافئ", "water": "ماء", "what": "ماذا", "yoghurt": "زبادي", "your": "لك", "ILoveYou":"أحبك", "Halo":"مرحبًا" } ###################################################### class VideoStreaming(object): def __init__(self): super(VideoStreaming, self).__init__() print ("===== Video Streaming =====") self._preview = False self._flipH = False self._detect = False self._model = False self._mediaPipe = False self._confidence = 75.0 self.mp_hands = mp.solutions.hands self.hands = self.mp_hands.Hands() @property def confidence(self): return self._confidence @confidence.setter def confidence(self, value): self._confidence = int(value) @property def preview(self): return self._preview @preview.setter def preview(self, value): self._preview = bool(value) @property def flipH(self): return self._flipH @flipH.setter def flipH(self, value): self._flipH = bool(value) @property def detect(self): return self._detect @detect.setter def detect(self, value): self._detect = bool(value) @property def mediaPipe(self): return self._mediaPipe @mediaPipe.setter def mediaPipe(self, value): self._mediaPipe = bool(value) def show(self, url): print(url) self._preview = False self._flipH = False self._detect = False self._mediaPipe = False self._confidence = 75.0 ydl_opts = { "quiet": True, "no_warnings": True, "format": "best", "forceurl": True, } if url == '4': print("am here with 0 to start cam") cap = cv2.VideoCapture(0) else: ydl = youtube_dl.YoutubeDL(ydl_opts) info = ydl.extract_info("https://www.youtube.com/watch?v=j4YZBRwVFFo", download=False) url = info["url"] cap = cv2.VideoCapture(url) while True: if self._preview: if stop_flag: print("Process Stopped") return grabbed, frame = cap.read() if not grabbed: break if self.flipH: print("flip part :") frame = cv2.flip(frame, 1) if self.detect: frame_yolo = frame.copy() results_yolo = model_object_detection.predict(frame_yolo, conf=self._confidence / 100) frame_yolo, labels = results_yolo[0].plot() list_labels = [] # labels_confidences for label in labels: confidence = label.split(" ")[-1] label_name = " ".join(label.split(" ")[:-1]) # Translate the label if it exists in the translation dictionary translated_label = classes_translation.get(label_name, label_name) list_labels.append(translated_label) list_labels.append(confidence) socketio.emit('label', list_labels) if self.mediaPipe: # Convert the image to RGB for processing with MediaPipe image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = self.hands.process(image) if results.multi_hand_landmarks: for hand_landmarks in results.multi_hand_landmarks: mp.solutions.drawing_utils.draw_landmarks( frame, hand_landmarks, self.mp_hands.HAND_CONNECTIONS, landmark_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(255, 0, 0), thickness=4, circle_radius=3), connection_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(255, 255, 255), thickness=2, circle_radius=2), ) print("frame information in here : ") frame = cv2.imencode(".jpg", frame)[1].tobytes() yield ( b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n' ) else: snap = np.zeros(( 1000, 1000 ), np.uint8) label = "Streaming Off" H, W = snap.shape font = cv2.FONT_HERSHEY_PLAIN color = (255, 255, 255) cv2.putText(snap, label, (W//2 - 100, H//2), font, 2, color, 2) frame = cv2.imencode(".jpg", snap)[1].tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') def show1(self, url): print("url") self._preview = False self._flipH = False self._detect = False self._mediaPipe = False self._confidence = 75.0 ydl_opts = { "quiet": True, "no_warnings": True, "format": "best", "forceurl": True, } while True: # Decoding the Base64 string to get the frame data frame_bytes = base64.b64decode(url) # Converting the frame data to an OpenCV image frame_np = np.frombuffer(frame_bytes, np.uint8) frame = cv2.imdecode(frame_np, cv2.IMREAD_COLOR) # Encode the frame data to bytes _, frame_encoded = cv2.imencode(".jpg", frame) frame_bytes = frame_encoded.tobytes() yield ( b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n' ) # check_settings() VIDEO = VideoStreaming() @app.route('/', methods=['GET', 'POST']) def homepage(): return render_template('hompage.html') @app.route('/index', methods=['GET', 'POST']) def index(): print("index") global stop_flag stop_flag = False if request.method == 'POST': print("Index post request") url = request.form['url'] print("index: ", url) # Create a response object resp = make_response(redirect(url_for('index'))) # Set a cookie containing the URL resp.set_cookie('url', url) return resp return render_template('index.html') @app.route('/video_feed') def video_feed(): #url = session.get('url', None) #print("video feed: ", url) # Retrieve the URL from the cookie url = request.cookies.get('url') url = '0' print("video feed: ", url) if url is None: return redirect(url_for('homepage')) print("video feed: ", url) return Response(VIDEO.show(url), mimetype='multipart/x-mixed-replace; boundary=frame') # * Button requests @app.route("/request_preview_switch") def request_preview_switch(): VIDEO.preview = not VIDEO.preview print("*"*10, VIDEO.preview) return "nothing" @app.route("/request_flipH_switch") def request_flipH_switch(): VIDEO.flipH = not VIDEO.flipH print("*"*10, VIDEO.flipH) return "nothing" @app.route("/request_run_model_switch") def request_run_model_switch(): VIDEO.detect = not VIDEO.detect print("*"*10, VIDEO.detect) return "nothing" @app.route("/request_mediapipe_switch") def request_mediapipe_switch(): VIDEO.mediaPipe = not VIDEO.mediaPipe print("*"*10, VIDEO.mediaPipe) return "nothing" @app.route('/update_slider_value', methods=['POST']) def update_slider_value(): slider_value = request.form['sliderValue'] VIDEO.confidence = slider_value return 'OK' @app.route('/stop_process') def stop_process(): print("Process stop Request") global stop_flag stop_flag = True return 'Process Stop Request' @socketio.on('connect') def test_connect(): print('Connected') #emit('message', data, broadcast=True) ###################### def preprocess_frame(frame_data): if frame_data is None: return None # Return None if frame_data is None try: # Convert base64 image string to numpy array # Split frame_data to extract base64 part base64_data = frame_data # Convert base64 image string to numpy array imgdata = base64.b64decode(base64_data) imgarray = np.frombuffer(imgdata, np.uint8) # Decode the image using cv2.imdecode frame = cv2.imdecode(imgarray, cv2.IMREAD_COLOR) print("rani dezte hna koolchi mezian") # Apply image processing here (example: grayscale conversion) processed_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # Convert processed frame back to base64 image string _, buffer = cv2.imencode('.jpg', processed_frame) processed_frame_data = base64.b64encode(buffer).decode('utf-8') #plot_base64_image(processed_frame_data) return processed_frame_data except Exception as e: print("Error processing frame:", e) return None #@socketio.on('stream_frame') #def handle_stream_frame(frame_data): # processed_frame_data = preprocess_frame(frame_data) # #emit('receive_frame', processed_frame_data, broadcast=True) # return Response(VIDEO.show1(frame_data), mimetype='multipart/x-mixed-replace; boundary=frame') # Route to receive a frame for processing @app.route('/process_frame', methods=['POST']) def process_frame(): if 'frame' in request.files: frame = request.files['frame'] # Process the frame here # For example, you can save the frame to a file frame_path = 'result_frame.jpg' frame.save(frame_path) # Return the processed frame return send_file(frame_path, mimetype='image/jpeg') else: return 'No frame data received' if __name__ == '__main__': socketio.run(app, host="0.0.0.0", allow_unsafe_werkzeug=True,port=7860)