| """ |
| MazeProcessor - Maze generation, solving, rendering, and video frame generation. |
| |
| Mirrors the SudokuProcessor pattern: a single class encapsulating all maze logic |
| including DFS generation, BFS solving, image/video rendering, path verification, |
| and text serialization. |
| """ |
| import random |
| from collections import deque |
| from typing import List, Tuple, Optional, Dict |
|
|
| import numpy as np |
| from PIL import Image, ImageDraw |
|
|
| |
| WALL_MASKS = {"N": 1, "S": 2, "W": 4, "E": 8} |
| OPPOSITE = {"N": "S", "S": "N", "E": "W", "W": "E"} |
| MOVES = { |
| "U": (-1, 0, "N"), |
| "D": (1, 0, "S"), |
| "L": (0, -1, "W"), |
| "R": (0, 1, "E"), |
| } |
| NEIGHBORS = { |
| "N": (-1, 0), |
| "S": (1, 0), |
| "E": (0, 1), |
| "W": (0, -1), |
| } |
|
|
|
|
| |
| |
| |
|
|
| Grid = List[List[Dict[str, bool]]] |
|
|
|
|
| class MazeProcessor: |
| """Handles maze generation, solving, image rendering, and video frame creation.""" |
|
|
| def __init__(self, img_size: int = 512): |
| self.img_size = img_size |
|
|
| |
| self.bg_color = "black" |
| self.cell_color = "white" |
| self.wall_color = "black" |
| self.grid_color = (224, 224, 224) |
| self.start_color = "yellow" |
| self.end_color = "blue" |
| self.path_color = "red" |
|
|
| |
|
|
| @staticmethod |
| def _empty_grid(n: int) -> Grid: |
| """Create an n×n grid with all walls present.""" |
| return [ |
| [{"N": True, "E": True, "S": True, "W": True} for _ in range(n)] |
| for _ in range(n) |
| ] |
|
|
| @staticmethod |
| def _remove_wall(grid: Grid, r: int, c: int, direction: str) -> None: |
| """Remove wall between (r,c) and its neighbour in *direction*.""" |
| dr, dc = NEIGHBORS[direction] |
| grid[r][c][direction] = False |
| grid[r + dr][c + dc][OPPOSITE[direction]] = False |
|
|
| def generate( |
| self, size: int, min_path_len: int = 1, max_attempts: int = 200 |
| ) -> Tuple[Grid, Tuple[int, int], Tuple[int, int], np.ndarray]: |
| """ |
| Generate a perfect maze and a start/end pair whose shortest path |
| length >= *min_path_len*. |
| |
| Returns: |
| (grid, start, end, path) where path is an (L, 2) int array. |
| """ |
| for _ in range(max_attempts): |
| grid = self._gen_dfs(size) |
| nodes = [(r, c) for r in range(size) for c in range(size)] |
| start, end = random.sample(nodes, 2) |
| path = self.solve_bfs(grid, start, end) |
| if path is not None and len(path) >= min_path_len: |
| return grid, tuple(start), tuple(end), path |
| raise RuntimeError( |
| f"Failed to generate maze (size={size}, min_path_len={min_path_len}) " |
| f"after {max_attempts} attempts." |
| ) |
|
|
| def _gen_dfs(self, n: int) -> Grid: |
| """Randomised DFS (iterative) to carve a perfect maze.""" |
| grid = self._empty_grid(n) |
| visited = [[False] * n for _ in range(n)] |
| sr, sc = random.randrange(n), random.randrange(n) |
| visited[sr][sc] = True |
| stack = [(sr, sc)] |
|
|
| while stack: |
| r, c = stack[-1] |
| nbrs = [] |
| for d, (dr, dc) in NEIGHBORS.items(): |
| nr, nc = r + dr, c + dc |
| if 0 <= nr < n and 0 <= nc < n and not visited[nr][nc]: |
| nbrs.append((d, nr, nc)) |
| if nbrs: |
| d, nr, nc = random.choice(nbrs) |
| self._remove_wall(grid, r, c, d) |
| visited[nr][nc] = True |
| stack.append((nr, nc)) |
| else: |
| stack.pop() |
| return grid |
|
|
| |
|
|
| def solve_bfs( |
| self, grid: Grid, start: Tuple[int, int], end: Tuple[int, int] |
| ) -> Optional[np.ndarray]: |
| """BFS shortest path. Returns (L,2) int ndarray or None.""" |
| n = len(grid) |
| q: deque = deque([(start, [start])]) |
| visited = {start} |
|
|
| while q: |
| (r, c), path = q.popleft() |
| if (r, c) == end: |
| return np.array(path, dtype=int) |
| cell = grid[r][c] |
| for d, (dr, dc) in NEIGHBORS.items(): |
| nr, nc = r + dr, c + dc |
| if ( |
| 0 <= nr < n |
| and 0 <= nc < n |
| and not cell[d] |
| and (nr, nc) not in visited |
| ): |
| visited.add((nr, nc)) |
| q.append(((nr, nc), path + [(nr, nc)])) |
| return None |
|
|
| |
|
|
| @staticmethod |
| def path_to_udrl(path) -> str: |
| """Convert coordinate path to UDRL string.""" |
| moves = [] |
| for i in range(len(path) - 1): |
| r1, c1 = path[i] |
| r2, c2 = path[i + 1] |
| if r2 < r1: |
| moves.append("U") |
| elif r2 > r1: |
| moves.append("D") |
| elif c2 < c1: |
| moves.append("L") |
| else: |
| moves.append("R") |
| return "".join(moves) |
|
|
| |
|
|
| def verify_path(self, grid: Grid, start: Tuple, end: Tuple, udrl: str, strict: bool = True) -> bool: |
| """Verify that *udrl* is a wall-respecting walk from *start* to *end*.""" |
| n = len(grid) |
| r, c = start |
| for ch in udrl.replace(",", "").replace(" ", "").strip(): |
| if ch not in MOVES: |
| continue |
| dr, dc, wall = MOVES[ch] |
| if grid[r][c][wall]: |
| return False |
| nr, nc = r + dr, c + dc |
| if not (0 <= nr < n and 0 <= nc < n): |
| return False |
| r, c = nr, nc |
| if not strict and (r, c) == end: |
| break |
|
|
| return (r, c) == end |
|
|
| |
|
|
| def encode_grid(self, grid: Grid) -> str: |
| """Encode grid to compact bitmask string (one int per cell, row-major).""" |
| rows = [] |
| for row in grid: |
| vals = [] |
| for cell in row: |
| v = 0 |
| for d, mask in WALL_MASKS.items(): |
| if cell[d]: |
| v |= mask |
| vals.append(str(v)) |
| rows.append(" ".join(vals)) |
| return "\n".join(rows) |
|
|
| def decode_grid(self, text_lines: List[str]) -> Grid: |
| """Decode bitmask text lines back to grid dicts.""" |
| grid = [] |
| for line in text_lines: |
| row = [] |
| for val_s in line.split(): |
| val = int(val_s) |
| row.append({d: bool(val & mask) for d, mask in WALL_MASKS.items()}) |
| grid.append(row) |
| return grid |
|
|
| def save_text(self, filepath, grid: Grid, start: Tuple, end: Tuple) -> None: |
| """Save maze to compact text file.""" |
| n = len(grid) |
| with open(filepath, "w") as f: |
| f.write(f"{n}\n{start[0]} {start[1]}\n{end[0]} {end[1]}\n") |
| f.write(self.encode_grid(grid) + "\n") |
|
|
| def load_text(self, filepath) -> Optional[Dict]: |
| """ |
| Load maze from text file. |
| |
| Returns dict with keys: size, start, end, grid (dict-based), |
| grid_raw (list[list[int]] bitmask). None on failure. |
| """ |
| try: |
| with open(filepath) as f: |
| lines = [l.strip() for l in f if l.strip()] |
| n = int(lines[0]) |
| sr, sc = map(int, lines[1].split()) |
| er, ec = map(int, lines[2].split()) |
| grid = self.decode_grid(lines[3 : 3 + n]) |
| grid_raw: List[List[int]] = [] |
| for r in range(n): |
| grid_raw.append(list(map(int, lines[3 + r].split()))) |
| return { |
| "size": n, |
| "start": (sr, sc), |
| "end": (er, ec), |
| "grid": grid, |
| "grid_raw": grid_raw, |
| } |
| except Exception: |
| return None |
|
|
| def fingerprint(self, grid: Grid, start: Tuple, end: Tuple) -> str: |
| """Content fingerprint for deduplication.""" |
| n = len(grid) |
| parts = [f"{n},{start[0]},{start[1]},{end[0]},{end[1]}"] |
| for row in grid: |
| for cell in row: |
| v = sum(WALL_MASKS[d] for d in WALL_MASKS if cell[d]) |
| parts.append(str(v)) |
| return "|".join(parts) |
|
|
| |
|
|
| def _layout(self, n: int): |
| """Compute rendering layout parameters.""" |
| cell_f = float(self.img_size) / n |
| wall_f = cell_f / 4.0 |
| half_f = wall_f / 2.0 |
| grid_w = max(1, int(cell_f / 16.0)) |
| return cell_f, wall_f, half_f, grid_w |
|
|
| def render( |
| self, |
| grid: Grid, |
| start: Tuple[int, int], |
| end: Tuple[int, int], |
| path: Optional[np.ndarray] = None, |
| path_steps: Optional[int] = None, |
| ) -> Image.Image: |
| """ |
| Render maze as a PIL image. |
| |
| Args: |
| grid: The maze grid. |
| start, end: Coordinates of start/end cells. |
| path: Full solution path (L, 2). |
| path_steps: Draw only the first *path_steps* segments (for video). |
| |
| Returns: |
| PIL.Image (RGB, img_size × img_size). |
| """ |
| n = len(grid) |
| cell_f, wall_f, half_f, grid_w = self._layout(n) |
|
|
| img = Image.new("RGB", (self.img_size, self.img_size), self.bg_color) |
| draw = ImageDraw.Draw(img) |
|
|
| |
| for r in range(n): |
| for c in range(n): |
| x1 = c * cell_f + half_f |
| y1 = r * cell_f + half_f |
| x2 = (c + 1) * cell_f - half_f |
| y2 = (r + 1) * cell_f - half_f |
| draw.rectangle([(x1, y1), (x2, y2)], fill=self.cell_color) |
| cell = grid[r][c] |
| if not cell["S"] and r < n - 1: |
| draw.rectangle( |
| [(x1, y2), (x2, y2 + wall_f)], fill=self.cell_color |
| ) |
| if not cell["E"] and c < n - 1: |
| draw.rectangle( |
| [(x2, y1), (x2 + wall_f, y2)], fill=self.cell_color |
| ) |
|
|
| |
| for r in range(n): |
| for c in range(n): |
| if r < n - 1 and not grid[r][c]["S"]: |
| y = (r + 1) * cell_f |
| draw.line( |
| [(c * cell_f + half_f, y), ((c + 1) * cell_f - half_f, y)], |
| fill=self.grid_color, width=grid_w, |
| ) |
| if c < n - 1 and not grid[r][c]["E"]: |
| x = (c + 1) * cell_f |
| draw.line( |
| [(x, r * cell_f + half_f), (x, (r + 1) * cell_f - half_f)], |
| fill=self.grid_color, width=grid_w, |
| ) |
|
|
| |
| def _dot(rc, color): |
| rr, cc = rc |
| cx = cc * cell_f + cell_f / 2 |
| cy = rr * cell_f + cell_f / 2 |
| rad = max(2, int((cell_f - wall_f) * 0.25)) |
| draw.ellipse([cx - rad, cy - rad, cx + rad, cy + rad], fill=color) |
|
|
| _dot(start, self.start_color) |
| _dot(end, self.end_color) |
|
|
| |
| if path is not None and len(path) >= 2: |
| end_idx = ( |
| len(path) if path_steps is None |
| else min(path_steps + 1, len(path)) |
| ) |
| if end_idx >= 2: |
| pts = [ |
| (c * cell_f + cell_f / 2, r * cell_f + cell_f / 2) |
| for r, c in path[:end_idx] |
| ] |
| draw.line( |
| pts, fill=self.path_color, |
| width=max(1, int(wall_f)), joint="curve", |
| ) |
|
|
| return img |
|
|
| |
|
|
| def generate_video_frames( |
| self, |
| grid: Grid, |
| start: Tuple[int, int], |
| end: Tuple[int, int], |
| path: np.ndarray, |
| n_start: int = 5, |
| m_end: int = 5, |
| frames: Optional[int] = None, |
| ) -> List[Image.Image]: |
| """ |
| Generate progressive video frames showing the red line growing. |
| |
| *frames* controls the number of **content frames** between holds: |
| - None → 1 per path step |
| - frames > steps → slow-motion |
| - frames < steps → fast-forward |
| |
| Total length = n_start + content_frames + m_end. |
| """ |
| n_steps = len(path) - 1 |
| if n_steps <= 0: |
| blank = self.render(grid, start, end) |
| return [blank] * (n_start + m_end + 1) |
|
|
| content_frames = frames if frames is not None else n_steps |
| content_frames = max(1, content_frames) |
|
|
| result: List[Image.Image] = [] |
|
|
| |
| blank = self.render(grid, start, end) |
| result.extend([blank.copy() for _ in range(n_start)]) |
|
|
| |
| if content_frames == n_steps: |
| for step in range(1, n_steps + 1): |
| result.append( |
| self.render(grid, start, end, path=path, path_steps=step) |
| ) |
| elif content_frames > n_steps: |
| for step in range(1, n_steps + 1): |
| f_lo = (step - 1) * content_frames // n_steps |
| f_hi = step * content_frames // n_steps |
| count = f_hi - f_lo |
| frame_img = self.render( |
| grid, start, end, path=path, path_steps=step |
| ) |
| result.append(frame_img) |
| if count > 1: |
| result.extend([frame_img.copy() for _ in range(count - 1)]) |
| else: |
| for f in range(content_frames): |
| step = (f + 1) * n_steps // content_frames |
| result.append( |
| self.render(grid, start, end, path=path, path_steps=step) |
| ) |
|
|
| |
| final = self.render(grid, start, end, path=path) |
| result.extend([final.copy() for _ in range(m_end)]) |
|
|
| return result |
|
|
| |
|
|
| def _detect_red_grid( |
| self, |
| pixels: np.ndarray, |
| size: int, |
| pixel_threshold: float = 0.01, |
| ) -> np.ndarray: |
| """ |
| Detect which cells contain red pixels and return a boolean grid. |
| |
| Args: |
| pixels: (H, W, 3) uint8 RGB array. |
| size: Maze dimension n. |
| pixel_threshold: Min red-pixel fraction to mark a cell. |
| |
| Returns: |
| (size, size) bool ndarray — True where red path detected. |
| """ |
| img = Image.fromarray(pixels) |
| w, h = img.size |
| px = np.array(img, dtype=float) |
|
|
| r_ch, g_ch, b_ch = px[:, :, 0], px[:, :, 1], px[:, :, 2] |
| red_mask = (r_ch > 100) & (r_ch > g_ch * 1.2) & (r_ch > b_ch * 1.2) |
|
|
| cell_h_f = h / size |
| cell_w_f = w / size |
|
|
| path_grid = np.zeros((size, size), dtype=bool) |
| for r in range(size): |
| y0 = int(round(r * cell_h_f)) |
| y1 = int(round((r + 1) * cell_h_f)) |
| for c in range(size): |
| x0 = int(round(c * cell_w_f)) |
| x1 = int(round((c + 1) * cell_w_f)) |
| ch_ = y1 - y0 |
| cw_ = x1 - x0 |
| margin_y = max(1, int(ch_ * 0.15)) |
| margin_x = max(1, int(cw_ * 0.15)) |
| sub = red_mask[y0 + margin_y : y1 - margin_y, |
| x0 + margin_x : x1 - margin_x] |
| if sub.size > 0 and np.mean(sub) > pixel_threshold: |
| path_grid[r, c] = True |
| return path_grid |
|
|
| def _greedy_walk( |
| self, |
| path_grid: np.ndarray, |
| grid_raw: List[List[int]], |
| size: int, |
| start: Tuple[int, int], |
| ) -> str: |
| """Greedy walk from start following red cells (original strategy).""" |
| directions = [ |
| ("R", MOVES["R"]), |
| ("D", MOVES["D"]), |
| ("L", MOVES["L"]), |
| ("U", MOVES["U"]), |
| ] |
| visited = {start} |
| cr, cc = start |
| actions: List[str] = [] |
| for _ in range(size * size * 2): |
| found = False |
| wval = grid_raw[cr][cc] |
| for act, (dr, dc, wall_ch) in directions: |
| nr, nc = cr + dr, cc + dc |
| if 0 <= nr < size and 0 <= nc < size: |
| if (wval & WALL_MASKS[wall_ch]) != 0: |
| continue |
| if path_grid[nr, nc] and (nr, nc) not in visited: |
| visited.add((nr, nc)) |
| actions.append(act) |
| cr, cc = nr, nc |
| found = True |
| break |
| if not found: |
| break |
| return "".join(actions) |
|
|
| def _backtrack_walk( |
| self, |
| path_grid: np.ndarray, |
| grid_raw: List[List[int]], |
| size: int, |
| start: Tuple[int, int], |
| end: Optional[Tuple[int, int]] = None, |
| strict: bool = True, |
| ) -> str: |
| """ |
| Backtracking DFS walk from start following red cells. |
| |
| When multiple neighbouring red cells are reachable, tries each branch |
| and backtracks on dead-ends — guaranteeing a complete path if one exists. |
| |
| Args: |
| path_grid: (size, size) bool grid of detected red cells. |
| grid_raw: Bitmask wall grid. |
| size: Maze dimension n. |
| start: Start coordinate (r, c). |
| end: End coordinate; required for strict, optional otherwise. |
| strict: If True, path must visit ALL red cells and end at *end*. |
| If False, reaching *end* is sufficient. |
| |
| Returns: |
| UDRL action string (empty string if no valid path found). |
| """ |
| directions = [ |
| ("R", MOVES["R"]), |
| ("D", MOVES["D"]), |
| ("L", MOVES["L"]), |
| ("U", MOVES["U"]), |
| ] |
| total_red = int(path_grid.sum()) |
|
|
| def _dfs(r: int, c: int, visited: set, actions: List[str]) -> Optional[str]: |
| |
| if not strict and end and (r, c) == end: |
| return "".join(actions) |
| |
| if strict and len(visited) == total_red: |
| if end is None or (r, c) == end: |
| return "".join(actions) |
| return None |
|
|
| wval = grid_raw[r][c] |
| for act, (dr, dc, wall_ch) in directions: |
| nr, nc = r + dr, c + dc |
| if not (0 <= nr < size and 0 <= nc < size): |
| continue |
| if (wval & WALL_MASKS[wall_ch]) != 0: |
| continue |
| if not path_grid[nr, nc] or (nr, nc) in visited: |
| continue |
| visited.add((nr, nc)) |
| actions.append(act) |
| result = _dfs(nr, nc, visited, actions) |
| if result is not None: |
| return result |
| actions.pop() |
| visited.remove((nr, nc)) |
| return None |
|
|
| result = _dfs(start[0], start[1], {start}, []) |
| return result if result is not None else "" |
|
|
| def extract_path_from_pixels( |
| self, |
| pixels: np.ndarray, |
| grid_raw: List[List[int]], |
| size: int, |
| start: Tuple[int, int], |
| pixel_threshold: float = 0.01, |
| recursive: bool = False, |
| end: Optional[Tuple[int, int]] = None, |
| strict: bool = True, |
| ) -> str: |
| """ |
| Detect red path in an RGB pixel array and return UDRL. |
| |
| Uses floating-point cell boundaries matching the renderer to avoid |
| misalignment on sizes that don't evenly divide the image. |
| |
| Args: |
| pixels: (H, W, 3) uint8 RGB array. |
| grid_raw: Bitmask grid as list[list[int]]. |
| size: Maze dimension n. |
| start: Start coordinate (r, c). |
| pixel_threshold: Min red-pixel fraction to mark a cell. |
| recursive: Use backtracking DFS instead of greedy walk. |
| end: End coordinate (required for recursive strict mode). |
| strict: For recursive mode — if True, path must visit ALL |
| red cells and end at *end*; if False, reaching |
| *end* is sufficient. |
| |
| Returns: |
| UDRL action string. |
| """ |
| path_grid = self._detect_red_grid(pixels, size, pixel_threshold) |
|
|
| if recursive: |
| return self._backtrack_walk( |
| path_grid, grid_raw, size, start, end=end, strict=strict |
| ) |
| return self._greedy_walk(path_grid, grid_raw, size, start) |
|
|
| def extract_path_from_image( |
| self, img_path: str, grid_raw: List[List[int]], size: int, start: Tuple, |
| recursive: bool = False, end: Optional[Tuple] = None, strict: bool = True, |
| ) -> str: |
| """Extract UDRL from an image file (convenience wrapper).""" |
| try: |
| pixels = np.array(Image.open(img_path).convert("RGB")) |
| return self.extract_path_from_pixels( |
| pixels, grid_raw, size, start, |
| recursive=recursive, end=end, strict=strict, |
| ) |
| except Exception: |
| return "" |
|
|
|
|
| if __name__ == "__main__": |
| proc = MazeProcessor(img_size=512) |
|
|
| |
| grid, start, end, path = proc.generate(size=8, min_path_len=10) |
| n_steps = len(path) - 1 |
| print(f"Maze 8×8 | path length {len(path)} | steps {n_steps}") |
| print(f"UDRL: {proc.path_to_udrl(path)}") |
| print(f"Verify: {proc.verify_path(grid, start, end, proc.path_to_udrl(path))}") |
|
|
| proc.render(grid, start, end).save("test_maze.png") |
| proc.render(grid, start, end, path=path).save("test_maze_solution.png") |
|
|
| |
| f1 = proc.generate_video_frames(grid, start, end, path, n_start=3, m_end=3) |
| assert len(f1) == 3 + n_steps + 3 |
|
|
| f2 = proc.generate_video_frames( |
| grid, start, end, path, n_start=3, m_end=3, frames=n_steps * 3 |
| ) |
| assert len(f2) == 3 + n_steps * 3 + 3 |
|
|
| half = max(1, n_steps // 2) |
| f3 = proc.generate_video_frames( |
| grid, start, end, path, n_start=3, m_end=3, frames=half |
| ) |
| assert len(f3) == 3 + half + 3 |
|
|
| print(f"frames=None → {len(f1)} total ({n_steps} content)") |
| print(f"frames={n_steps*3:<4d} → {len(f2)} total (slow-mo)") |
| print(f"frames={half:<4d} → {len(f3)} total (fast-fwd)") |
| print("All assertions passed ✓") |