import sys from pathlib import Path import numpy as np from joblib import Parallel, delayed from neus_v.automaton.video_automaton import VideoAutomaton from neus_v.model_checking.stormpy import StormModelChecker from neus_v.veval.parse import parse_tl_formula, parse_until_to_next_frame from neus_v.video.frame import VideoFrame from neus_v.video.read_video import read_video def create_frame_windows(frames: list, window_size: int) -> list[list]: """Create non-overlapping windows of frames, with remainder in last window. Args: frames: List of frames window_size: Size of each window Returns: List of frame windows """ windows = [] for i in range(0, len(frames), window_size): windows.append(frames[i : i + window_size]) return windows def evaluate_video( vision_language_model, confidence_as_token_probability: bool, video_path: Path | str, proposition_set: list, tl_spec: str, parallel_inference: bool = False, threshold: float = 0.1, num_of_frame_in_sequence: int = 1, ) -> dict: """Evaluate a video using the given vision language model.""" output_log = { "specification": None, "propositions": None, "probability": None, "min_probability": None, "max_probability": None, "propositions_avg_probability": {}, } if isinstance(video_path, str): video_path = Path(video_path) video = read_video(video_path=video_path) # TODO: if there's F in the tl_spec ltl_formula = parse_tl_formula(tl_spec) ltl_formula = parse_until_to_next_frame(ltl_formula) video_automaton = VideoAutomaton(include_initial_state=True) video_automaton.set_up(proposition_set=proposition_set) model_checker = StormModelChecker( proposition_set=proposition_set, ltl_formula=ltl_formula, ) proposition_probability_record = {} for proposition in proposition_set: proposition_probability_record[proposition] = [] if model_checker.validate_tl_specification(ltl_formula): frame_count = 0 all_frames: list[np.ndarray] = video.get_all_frames_of_video( return_format="ndarray", desired_interval_in_sec=1, ) try: # for frame_img in all_frames: def process_frame(frame_img: np.ndarray, frame_count: int): sys.stdout.write(f"\rProcessing frame: {frame_count+1}/{len(all_frames)} ") sys.stdout.flush() object_of_interest = {} for proposition in proposition_set: detected_object = vision_language_model.detect( frame_img=frame_img, scene_description=proposition, confidence_as_token_probability=confidence_as_token_probability, threshold=threshold, ) object_of_interest[proposition] = detected_object # proposition_probability_record.get(proposition).append( # detected_object.probability # ) video_frame = VideoFrame( frame_idx=frame_count, timestamp=None, frame_image=frame_img, object_of_interest=object_of_interest, ) return video_frame, object_of_interest if parallel_inference: frame_windows = create_frame_windows(frames=all_frames, window_size=num_of_frame_in_sequence) results = Parallel(n_jobs=len(all_frames))( delayed(process_frame)(frame_img, i) for i, frame_img in enumerate(all_frames) ) else: frame_windows = create_frame_windows(frames=all_frames, window_size=num_of_frame_in_sequence) results = [process_frame(frame_img, i) for i, frame_img in enumerate(all_frames)] for video_frame, object_of_interest in results: video_automaton.add_frame(frame=video_frame) for proposition, detected_object in object_of_interest.items(): proposition_probability_record[proposition].append(detected_object.probability) video_automaton.add_terminal_state(add_with_terminal_label=True) sys.stdout.write("\n") # Move to the next line after processing all frames result = model_checker.check_automaton( states=video_automaton.states, transitions=video_automaton.transitions, model_type="dtmc", use_filter=True, ) output_log["specification"] = tl_spec output_log["propositions"] = proposition_set output_log["probability"] = round(float(str(result)), 6) output_log["min_probability"] = round(float(str(result.min)), 6) output_log["max_probability"] = round(float(str(result.max)), 6) for ( proposition, probabilities, ) in proposition_probability_record.items(): avg_probability = sum(probabilities) / len(probabilities) output_log["propositions_avg_probability"][proposition] = round(avg_probability, 3) except Exception as e: # noqa: BLE001 # print(f"\nError processing frame {frame_count}: {e}") import traceback print(f"\nError processing frame {frame_count}: {e}") traceback.print_exc() return output_log def evaluate_video_with_sequence_of_images( vision_language_model, confidence_as_token_probability: bool, video_path: Path | str, proposition_set: list, tl_spec: str, parallel_inference: bool = False, num_of_frame_in_sequence: int = 3, threshold: float = 0.1, ) -> dict: """Evaluate a video using the given vision language model.""" output_log = { "specification": None, "propositions": None, "probability": None, "min_probability": None, "max_probability": None, "propositions_avg_probability": {}, } if isinstance(video_path, str): video_path = Path(video_path) video = read_video(video_path=video_path) # TODO: if there's F in the tl_spec ltl_formula = parse_tl_formula(tl_spec) ltl_formula = parse_until_to_next_frame(ltl_formula) video_automaton = VideoAutomaton(include_initial_state=True) video_automaton.set_up(proposition_set=proposition_set) model_checker = StormModelChecker( proposition_set=proposition_set, ltl_formula=ltl_formula, ) proposition_probability_record = {} for proposition in proposition_set: proposition_probability_record[proposition] = [] if model_checker.validate_tl_specification(ltl_formula): frame_count = 0 all_frames: list[np.ndarray] = video.get_all_frames_of_video( return_format="ndarray", desired_interval_in_sec=0.5, ) try: # for frame_img in all_frames: def process_frame(sequence_of_frames: list[np.ndarray], frame_count: int): sys.stdout.write(f"\rProcessing frame window: {frame_count+1}/{len(frame_windows)} ") sys.stdout.flush() object_of_interest = {} for proposition in proposition_set: detected_object = vision_language_model.detect( seq_of_frames=sequence_of_frames, scene_description=proposition, # confidence_as_token_probability=confidence_as_token_probability, threshold=threshold, ) object_of_interest[proposition] = detected_object # proposition_probability_record.get(proposition).append( # detected_object.probability # ) print(f"{proposition}: {detected_object.probability}") video_frame = VideoFrame( frame_idx=frame_count, timestamp=None, frame_image=sequence_of_frames, object_of_interest=object_of_interest, ) return video_frame, object_of_interest if parallel_inference: frame_windows = create_frame_windows(frames=all_frames, window_size=num_of_frame_in_sequence) results = Parallel(n_jobs=len(frame_windows))( delayed(process_frame)(frame_img, i) for i, frame_img in enumerate(frame_windows) ) else: frame_windows = create_frame_windows(frames=all_frames, window_size=num_of_frame_in_sequence) results = [process_frame(sequence_of_frames, i) for i, sequence_of_frames in enumerate(frame_windows)] for video_frame, object_of_interest in results: video_automaton.add_frame(frame=video_frame) for proposition, detected_object in object_of_interest.items(): proposition_probability_record[proposition].append(detected_object.probability) video_automaton.add_terminal_state(add_with_terminal_label=False) sys.stdout.write("\n") # Move to the next line after processing all frames result = model_checker.check_automaton( states=video_automaton.states, transitions=video_automaton.transitions, model_type="dtmc", use_filter=False, ) output_log["specification"] = tl_spec output_log["propositions"] = proposition_set output_log["probability"] = round(float(str(result.at(0))), 6) output_log["min_probability"] = round(float(str(result.min)), 6) output_log["max_probability"] = round(float(str(result.max)), 6) for ( proposition, probabilities, ) in proposition_probability_record.items(): avg_probability = sum(probabilities) / len(probabilities) output_log["propositions_avg_probability"][proposition] = round(avg_probability, 3) except Exception as e: # noqa: BLE001 # print(f"\nError processing frame {frame_count}: {e}") import traceback print(f"\nError processing frame {frame_count}: {e}") traceback.print_exc() return output_log