Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import torch | |
| from gym import Env | |
| from typing import Tuple, List | |
| def get_vi_sequence(env: Env, observation: np.ndarray) -> Tuple[np.ndarray, List]: | |
| """ | |
| Overview: | |
| Given an instance of the maze environment and the current observation, using Broad-First-Search (BFS) \ | |
| algorithm to plan an optimal path and record the result. | |
| Arguments: | |
| - env (:obj:`Env`): The instance of the maze environment. | |
| - observation (:obj:`np.ndarray`): The current observation. | |
| Returns: | |
| - output (:obj:`Tuple[np.ndarray, List]`): The BFS result. ``output[0]`` contains the BFS map after each \ | |
| iteration and ``output[1]`` contains the optimal actions before reaching the finishing point. | |
| """ | |
| xy = np.where(observation[Ellipsis, -1] == 1) | |
| start_x, start_y = xy[0][0], xy[1][0] | |
| target_location = env.target_location | |
| nav_map = env.nav_map | |
| current_points = [target_location] | |
| chosen_actions = {target_location: 0} | |
| visited_points = {target_location: True} | |
| vi_sequence = [] | |
| vi_map = np.full((env.size, env.size), fill_value=env.n_action, dtype=np.int32) | |
| found_start = False | |
| while current_points and not found_start: | |
| next_points = [] | |
| for point_x, point_y in current_points: | |
| for (action, (next_point_x, next_point_y)) in [(0, (point_x - 1, point_y)), (1, (point_x, point_y - 1)), | |
| (2, (point_x + 1, point_y)), (3, (point_x, point_y + 1))]: | |
| if (next_point_x, next_point_y) in visited_points: | |
| continue | |
| if not (0 <= next_point_x < len(nav_map) and 0 <= next_point_y < len(nav_map[next_point_x])): | |
| continue | |
| if nav_map[next_point_x][next_point_y] == 'x': | |
| continue | |
| next_points.append((next_point_x, next_point_y)) | |
| visited_points[(next_point_x, next_point_y)] = True | |
| chosen_actions[(next_point_x, next_point_y)] = action | |
| vi_map[next_point_x, next_point_y] = action | |
| if next_point_x == start_x and next_point_y == start_y: | |
| found_start = True | |
| vi_sequence.append(vi_map.copy()) | |
| current_points = next_points | |
| track_back = [] | |
| if found_start: | |
| cur_x, cur_y = start_x, start_y | |
| while cur_x != target_location[0] or cur_y != target_location[1]: | |
| act = vi_sequence[-1][cur_x, cur_y] | |
| track_back.append((torch.FloatTensor(env.process_states([cur_x, cur_y], env.get_maze_map())), act)) | |
| if act == 0: | |
| cur_x += 1 | |
| elif act == 1: | |
| cur_y += 1 | |
| elif act == 2: | |
| cur_x -= 1 | |
| elif act == 3: | |
| cur_y -= 1 | |
| return np.array(vi_sequence), track_back | |