|
|
|
|
|
import torch |
|
|
import numpy as np |
|
|
import matplotlib.pyplot as plt |
|
|
|
|
|
|
|
|
class TrajectoryGenerator(object): |
|
|
def __init__(self, options, place_cells): |
|
|
self.options = options |
|
|
self.place_cells = place_cells |
|
|
|
|
|
def plot_trajectory(self, traj, box_width, box_height, idx=0, step=2): |
|
|
""" |
|
|
Visualize one trajectory from traj dict. |
|
|
|
|
|
Args: |
|
|
traj: dictionary containing trajectory info |
|
|
box_width, box_height: dimensions of the environment |
|
|
idx: which trajectory to plot from the batch (default: 0) |
|
|
step: plot an arrow every 'step' frames |
|
|
""" |
|
|
|
|
|
x = traj["target_x"][idx] |
|
|
y = traj["target_y"][idx] |
|
|
hd = traj["target_hd"][idx] |
|
|
|
|
|
|
|
|
x0 = traj["init_x"][idx, 0] |
|
|
y0 = traj["init_y"][idx, 0] |
|
|
hd0 = traj["init_hd"][idx, 0] |
|
|
|
|
|
x = np.concatenate([[x0], x]) |
|
|
y = np.concatenate([[y0], y]) |
|
|
hd = np.concatenate([[hd0], hd]) |
|
|
|
|
|
|
|
|
plt.figure(figsize=(6, 6)) |
|
|
plt.plot(x, y, "-o", markersize=2, label="trajectory") |
|
|
|
|
|
|
|
|
for t in range(0, len(x), step): |
|
|
dx = 0.1 * np.cos(hd[t]) |
|
|
dy = 0.1 * np.sin(hd[t]) |
|
|
plt.arrow( |
|
|
x[t], y[t], dx, dy, head_width=0.05, head_length=0.08, fc="r", ec="r" |
|
|
) |
|
|
|
|
|
|
|
|
plt.axhline(y=-box_height / 2, color="k") |
|
|
plt.axhline(y=box_height / 2, color="k") |
|
|
plt.axvline(x=-box_width / 2, color="k") |
|
|
plt.axvline(x=box_width / 2, color="k") |
|
|
|
|
|
plt.xlim([-box_width / 2 - 0.2, box_width / 2 + 0.2]) |
|
|
plt.ylim([-box_height / 2 - 0.2, box_height / 2 + 0.2]) |
|
|
plt.gca().set_aspect("equal", adjustable="box") |
|
|
plt.xlabel("x position (m)") |
|
|
plt.ylabel("y position (m)") |
|
|
plt.title(f"Trajectory {idx}") |
|
|
plt.legend() |
|
|
plt.show() |
|
|
|
|
|
def avoid_wall(self, position, hd, box_width, box_height): |
|
|
""" |
|
|
Compute distance and angle to nearest wall |
|
|
""" |
|
|
x = position[:, 0] |
|
|
y = position[:, 1] |
|
|
dists = [ |
|
|
box_width / 2 - x, |
|
|
box_height / 2 - y, |
|
|
box_width / 2 + x, |
|
|
box_height / 2 + y, |
|
|
] |
|
|
d_wall = np.min(dists, axis=0) |
|
|
angles = np.arange(4) * np.pi / 2 |
|
|
theta = angles[np.argmin(dists, axis=0)] |
|
|
hd = np.mod(hd, 2 * np.pi) |
|
|
a_wall = hd - theta |
|
|
a_wall = np.mod(a_wall + np.pi, 2 * np.pi) - np.pi |
|
|
|
|
|
is_near_wall = (d_wall < self.border_region) * (np.abs(a_wall) < np.pi / 2) |
|
|
turn_angle = np.zeros_like(hd) |
|
|
turn_angle[is_near_wall] = np.sign(a_wall[is_near_wall]) * ( |
|
|
np.pi / 2 - np.abs(a_wall[is_near_wall]) |
|
|
) |
|
|
|
|
|
return is_near_wall, turn_angle |
|
|
|
|
|
def generate_trajectory(self, box_width, box_height, batch_size): |
|
|
"""Generate a random walk in a rectangular box""" |
|
|
samples = self.options.sequence_length |
|
|
dt = 0.02 |
|
|
sigma = 5.76 * 2 |
|
|
b = 0.13 * 2 * np.pi |
|
|
mu = 0 |
|
|
self.border_region = 0.03 |
|
|
|
|
|
|
|
|
position = np.zeros([batch_size, samples + 2, 2]) |
|
|
head_dir = np.zeros([batch_size, samples + 2]) |
|
|
position[:, 0, 0] = np.random.uniform(-box_width / 2, box_width / 2, batch_size) |
|
|
position[:, 0, 1] = np.random.uniform( |
|
|
-box_height / 2, box_height / 2, batch_size |
|
|
) |
|
|
head_dir[:, 0] = np.random.uniform(0, 2 * np.pi, batch_size) |
|
|
velocity = np.zeros([batch_size, samples + 2]) |
|
|
|
|
|
|
|
|
random_turn = np.random.normal(mu, sigma, [batch_size, samples + 1]) |
|
|
random_vel = np.random.rayleigh(b, [batch_size, samples + 1]) |
|
|
v = np.abs(np.random.normal(0, b * np.pi / 2, batch_size)) |
|
|
|
|
|
for t in range(samples + 1): |
|
|
|
|
|
v = random_vel[:, t] |
|
|
turn_angle = np.zeros(batch_size) |
|
|
|
|
|
if not self.options.periodic: |
|
|
|
|
|
is_near_wall, turn_angle = self.avoid_wall( |
|
|
position[:, t], head_dir[:, t], box_width, box_height |
|
|
) |
|
|
v[is_near_wall] *= 0.25 |
|
|
|
|
|
|
|
|
turn_angle += dt * random_turn[:, t] |
|
|
|
|
|
|
|
|
velocity[:, t] = v * dt |
|
|
update = velocity[:, t, None] * np.stack( |
|
|
[np.cos(head_dir[:, t]), np.sin(head_dir[:, t])], axis=-1 |
|
|
) |
|
|
position[:, t + 1] = position[:, t] + update |
|
|
|
|
|
|
|
|
head_dir[:, t + 1] = head_dir[:, t] + turn_angle |
|
|
|
|
|
|
|
|
if self.options.periodic: |
|
|
position[:, :, 0] = ( |
|
|
np.mod(position[:, :, 0] + box_width / 2, box_width) - box_width / 2 |
|
|
) |
|
|
position[:, :, 1] = ( |
|
|
np.mod(position[:, :, 1] + box_height / 2, box_height) - box_height / 2 |
|
|
) |
|
|
|
|
|
head_dir = np.mod(head_dir + np.pi, 2 * np.pi) - np.pi |
|
|
|
|
|
traj = {} |
|
|
|
|
|
traj["init_hd"] = head_dir[:, 0, None] |
|
|
traj["init_x"] = position[:, 1, 0, None] |
|
|
traj["init_y"] = position[:, 1, 1, None] |
|
|
|
|
|
traj["ego_v"] = velocity[:, 1:-1] |
|
|
ang_v = np.diff(head_dir, axis=-1) |
|
|
traj["phi_x"], traj["phi_y"] = np.cos(ang_v)[:, :-1], np.sin(ang_v)[:, :-1] |
|
|
|
|
|
|
|
|
traj["target_hd"] = head_dir[:, 1:-1] |
|
|
traj["target_x"] = position[:, 2:, 0] |
|
|
traj["target_y"] = position[:, 2:, 1] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return traj |
|
|
|
|
|
def get_generator(self, batch_size=None, box_width=None, box_height=None): |
|
|
""" |
|
|
Returns a generator that yields batches of trajectories |
|
|
""" |
|
|
if not batch_size: |
|
|
batch_size = self.options.batch_size |
|
|
if not box_width: |
|
|
box_width = self.options.box_width |
|
|
if not box_height: |
|
|
box_height = self.options.box_height |
|
|
|
|
|
while True: |
|
|
traj = self.generate_trajectory(box_width, box_height, batch_size) |
|
|
|
|
|
v = np.stack( |
|
|
[ |
|
|
traj["ego_v"] * np.cos(traj["target_hd"]), |
|
|
traj["ego_v"] * np.sin(traj["target_hd"]), |
|
|
], |
|
|
axis=-1, |
|
|
) |
|
|
v = torch.tensor(v, dtype=torch.float32).transpose(0, 1) |
|
|
|
|
|
pos = np.stack([traj["target_x"], traj["target_y"]], axis=-1) |
|
|
pos = torch.tensor(pos, dtype=torch.float32).transpose(0, 1) |
|
|
|
|
|
pos = pos.to(self.options.device) |
|
|
place_outputs = self.place_cells.get_activation(pos) |
|
|
|
|
|
init_pos = np.stack([traj["init_x"], traj["init_y"]], axis=-1) |
|
|
init_pos = torch.tensor(init_pos, dtype=torch.float32) |
|
|
init_pos = init_pos.to(self.options.device) |
|
|
init_actv = self.place_cells.get_activation(init_pos).squeeze() |
|
|
|
|
|
v = v.to(self.options.device) |
|
|
inputs = (v, init_actv) |
|
|
|
|
|
yield (inputs, place_outputs, pos) |
|
|
|
|
|
def get_test_batch(self, batch_size=None, box_width=None, box_height=None): |
|
|
"""For testing performance, returns a batch of smample trajectories""" |
|
|
if not batch_size: |
|
|
batch_size = self.options.batch_size |
|
|
if not box_width: |
|
|
box_width = self.options.box_width |
|
|
if not box_height: |
|
|
box_height = self.options.box_height |
|
|
|
|
|
traj = self.generate_trajectory(box_width, box_height, batch_size) |
|
|
|
|
|
v = np.stack( |
|
|
[ |
|
|
traj["ego_v"] * np.cos(traj["target_hd"]), |
|
|
traj["ego_v"] * np.sin(traj["target_hd"]), |
|
|
], |
|
|
axis=-1, |
|
|
) |
|
|
v = torch.tensor(v, dtype=torch.float32).transpose(0, 1) |
|
|
|
|
|
pos = np.stack([traj["target_x"], traj["target_y"]], axis=-1) |
|
|
pos = torch.tensor(pos, dtype=torch.float32).transpose(0, 1) |
|
|
pos = pos.to(self.options.device) |
|
|
place_outputs = self.place_cells.get_activation(pos) |
|
|
|
|
|
init_pos = np.stack([traj["init_x"], traj["init_y"]], axis=-1) |
|
|
init_pos = torch.tensor(init_pos, dtype=torch.float32) |
|
|
init_pos = init_pos.to(self.options.device) |
|
|
init_actv = self.place_cells.get_activation(init_pos).squeeze() |
|
|
|
|
|
v = v.to(self.options.device) |
|
|
inputs = (v, init_actv) |
|
|
|
|
|
return (inputs, pos, place_outputs) |
|
|
|