-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
177 changed files
with
528,879 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from .curve_basis import bezier_basis, bspline_basis | ||
from .curve_fitting import curve_fitting |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
import numpy as np | ||
import torch | ||
|
||
|
||
def torch_binom(n, k): | ||
mask = n.detach() >= k.detach() | ||
n = mask * n | ||
k = mask * k | ||
a = torch.lgamma(n + 1) - torch.lgamma((n - k) + 1) - torch.lgamma(k + 1) | ||
return torch.exp(a) * mask | ||
|
||
|
||
def torch_pactorial(n): | ||
return torch.lgamma(n + 1).exp() | ||
|
||
|
||
def irwin_hall_pdf(n, x): | ||
# https://en.wikipedia.org/wiki/Irwin%E2%80%93Hall_distribution | ||
k = torch.arange(0, n+1, 1, dtype=torch.float) | ||
n_ = torch.ones(n+1) * n | ||
comb = torch_binom(n_, k) | ||
sgn = (x - k) | ||
sgn_ = torch.zeros(n+1) | ||
eps = 1e-4 | ||
sgn_[eps <= sgn] = 1 | ||
sgn_[sgn <= -eps] = -1 | ||
sigma = (torch.FloatTensor([-1]) ** k) * comb * ((x - k) ** (n-1)) * sgn_ | ||
return sigma.sum() / (2 * torch_pactorial(torch.FloatTensor([n])-1)) | ||
|
||
|
||
def bezier_basis(degree=3, step=13): | ||
"""Basis function for Bézier curve""" | ||
index = torch.linspace(0, 1, steps=step, dtype=torch.float).repeat(degree + 1, 1) | ||
i = torch.arange(0, degree + 1, 1, dtype=torch.float) | ||
binomial_coefficient = torch_binom(torch.ones(degree + 1) * degree, i) | ||
bernstein_basis_polynomial = binomial_coefficient * (index.T ** i) * ((1 - index.T) ** i.flip(0)) | ||
return bernstein_basis_polynomial.detach() | ||
|
||
|
||
def bspline_basis(cpoint=7, degree=2, step=13): | ||
"""Piecewise polynomial function for basis-spline""" | ||
from scipy.interpolate import BSpline | ||
cpoint += 1 | ||
steps = np.linspace(0., 1., step) | ||
knot = cpoint - degree + 1 | ||
knots_qu = np.concatenate([np.zeros(degree), np.linspace(0, 1, knot), np.ones(degree)]) | ||
bs = np.zeros([step, cpoint]) | ||
for i in range(cpoint): | ||
bs[:, i] = BSpline(knots_qu, (np.arange(cpoint) == i).astype(float), degree, extrapolate=False)(steps) | ||
return torch.FloatTensor(bs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
import sys | ||
import torch | ||
|
||
|
||
def curve_fitting(traj, basis): | ||
n_ped, t_traj, dim = traj.shape | ||
n_cp = basis.size(1) | ||
|
||
class Model(torch.nn.Module): | ||
def __init__(self): | ||
super(Model, self).__init__() | ||
cp_init = torch.FloatTensor(n_ped, n_cp, dim) | ||
cp_init[:, 0], cp_init[:, -1] = traj[:, 0], traj[:, -1] | ||
for i in range(1, n_cp): | ||
cp_init[:, i] = cp_init[:, i - 1] + (traj[:, -1] - traj[:, 0]) / (n_cp - 1) | ||
self.cp = torch.nn.Parameter(cp_init) # Fast convergence | ||
# self.cp = torch.nn.Parameter(torch.FloatTensor(n_ped, n_cp, dim)) | ||
# torch.nn.init.xavier_uniform_(self.cp, gain=1.0) | ||
|
||
def forward(self): | ||
recon = (self.cp.transpose(1, 2) @ basis.T).transpose(1, 2) | ||
loss = (recon - traj).norm(p=2, dim=-1).mean() | ||
return recon, loss | ||
|
||
model = Model() | ||
model.train() | ||
if torch.cuda.is_available(): | ||
model = model.cuda() | ||
traj, basis = traj.cuda(), basis.cuda() | ||
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001) | ||
|
||
recon_best, loss_best = None, 1e8 | ||
for _ in range(100000): | ||
recon, loss = model() | ||
loss.backward() | ||
optimizer.step() | ||
sys.stdout.write('\r\033Curve Fitting... loss={:.4f}'.format(loss.item())) | ||
if loss.item() < loss_best: | ||
recon_best = recon.detach().cpu() | ||
loss_best = loss.item() | ||
|
||
sys.stdout.write('\r\033Curve Fitting... Done.\n') | ||
return recon_best |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from .model import EigenTrajectory | ||
from .normalizer import TrajNorm |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
import torch | ||
import torch.nn as nn | ||
|
||
|
||
class ETAnchor(nn.Module): | ||
r"""EigenTrajectory anchor model | ||
Args: | ||
hyper_params (DotDict): The hyper-parameters | ||
""" | ||
|
||
def __init__(self, hyper_params): | ||
super().__init__() | ||
|
||
self.hyper_params = hyper_params | ||
self.k = hyper_params.k | ||
self.s = hyper_params.num_samples | ||
self.dim = hyper_params.traj_dim | ||
|
||
self.C_anchor = nn.Parameter(torch.zeros((self.k, self.s))) | ||
|
||
def to_ET_space(self, traj, evec): | ||
r"""Transform Euclidean trajectories to EigenTrajectory coefficients | ||
Args: | ||
traj (torch.Tensor): The trajectory to be transformed | ||
evec (torch.Tensor): The ET descriptors (eigenvectors) | ||
Returns: | ||
C (torch.Tensor): The ET descriptor coefficients""" | ||
|
||
# Euclidean -> ET | ||
tdim = evec.size(0) | ||
M = traj.reshape(-1, tdim).T | ||
C = evec.T.detach() @ M | ||
return C | ||
|
||
def to_Euclidean_space(self, C, evec): | ||
r"""Transform EigenTrajectory coefficients to Euclidean trajectories | ||
Args: | ||
C (torch.Tensor): The ET descriptor coefficients | ||
evec (torch.Tensor): The ET descriptors (eigenvectors) | ||
Returns: | ||
traj (torch.Tensor): The Euclidean trajectory""" | ||
|
||
# ET -> Euclidean | ||
t = evec.size(0) // self.dim | ||
M = evec.detach() @ C | ||
traj = M.T.reshape(-1, t, self.dim) | ||
return traj | ||
|
||
def anchor_generation(self, pred_traj_norm, U_pred_trunc): | ||
r"""Anchor generation on EigenTrajectory space | ||
Args: | ||
pred_traj_norm (torch.Tensor): The normalized predicted trajectory | ||
U_pred_trunc (torch.Tensor): The truncated ET descriptors (eigenvectors) of the predicted trajectory | ||
Note: | ||
This function should be called once before training the model. | ||
""" | ||
|
||
from sklearn.cluster import KMeans | ||
# Trajectory projection | ||
C_pred = self.to_ET_space(pred_traj_norm, evec=U_pred_trunc).T.detach().numpy() | ||
|
||
# Anchor generation on EigenTrajectory space | ||
C_anchor = torch.FloatTensor( | ||
KMeans(n_clusters=self.s, random_state=0, init='k-means++', n_init=10).fit(C_pred).cluster_centers_.T) | ||
|
||
# Register anchors as model parameters | ||
self.C_anchor = nn.Parameter(C_anchor.to(self.C_anchor.device)) | ||
|
||
def forward(self, C_pred): | ||
r"""Anchor refinement on EigenTrajectory space | ||
Args: | ||
C_pred (torch.Tensor): The predicted ET descriptor coefficients | ||
Returns: | ||
C_pred_refine (torch.Tensor): The refined ET descriptor coefficients | ||
""" | ||
|
||
# Anchor Refinement | ||
C_pred_refine = self.C_anchor.unsqueeze(dim=1).detach() + C_pred | ||
return C_pred_refine |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
import torch | ||
import torch.nn as nn | ||
from .normalizer import TrajNorm | ||
|
||
|
||
class ETDescriptor(nn.Module): | ||
r"""EigenTrajectory descriptor model | ||
Args: | ||
hyper_params (DotDict): The hyper-parameters | ||
norm_ori (bool): Whether to normalize the trajectory with the origin | ||
norm_rot (bool): Whether to normalize the trajectory with the rotation | ||
norm_sca (bool): Whether to normalize the trajectory with the scale""" | ||
|
||
def __init__(self, hyper_params, norm_ori=True, norm_rot=True, norm_sca=True): | ||
super().__init__() | ||
|
||
self.hyper_params = hyper_params | ||
self.t_obs, self.t_pred = hyper_params.obs_len, hyper_params.pred_len | ||
self.obs_svd, self.pred_svd = hyper_params.obs_svd, hyper_params.pred_svd | ||
self.k = hyper_params.k | ||
self.s = hyper_params.num_samples | ||
self.dim = hyper_params.traj_dim | ||
self.traj_normalizer = TrajNorm(ori=norm_ori, rot=norm_rot, sca=norm_sca) | ||
|
||
self.U_obs_trunc = nn.Parameter(torch.zeros((self.t_obs * self.dim, self.k))) | ||
self.U_pred_trunc = nn.Parameter(torch.zeros((self.t_pred * self.dim, self.k))) | ||
|
||
def normalize_trajectory(self, obs_traj, pred_traj=None): | ||
r"""Trajectory normalization | ||
Args: | ||
obs_traj (torch.Tensor): The observed trajectory | ||
pred_traj (torch.Tensor): The predicted trajectory (Optional, for training only) | ||
Returns: | ||
obs_traj_norm (torch.Tensor): The normalized observed trajectory | ||
pred_traj_norm (torch.Tensor): The normalized predicted trajectory | ||
""" | ||
|
||
self.traj_normalizer.calculate_params(obs_traj) | ||
obs_traj_norm = self.traj_normalizer.normalize(obs_traj) | ||
pred_traj_norm = self.traj_normalizer.normalize(pred_traj) if pred_traj is not None else None | ||
return obs_traj_norm, pred_traj_norm | ||
|
||
def denormalize_trajectory(self, traj_norm): | ||
r"""Trajectory denormalization | ||
Args: | ||
traj_norm (torch.Tensor): The trajectory to be denormalized | ||
Returns: | ||
traj (torch.Tensor): The denormalized trajectory | ||
""" | ||
|
||
traj = self.traj_normalizer.denormalize(traj_norm) | ||
return traj | ||
|
||
def to_ET_space(self, traj, evec): | ||
r"""Transform Euclidean trajectories to EigenTrajectory coefficients | ||
Args: | ||
traj (torch.Tensor): The trajectory to be transformed | ||
evec (torch.Tensor): The ET descriptors (eigenvectors) | ||
Returns: | ||
C (torch.Tensor): The ET descriptor coefficients""" | ||
|
||
# Euclidean -> ET | ||
tdim = evec.size(0) | ||
M = traj.reshape(-1, tdim).T | ||
C = evec.T.detach() @ M | ||
return C | ||
|
||
def to_Euclidean_space(self, C, evec): | ||
r"""Transform EigenTrajectory coefficients to Euclidean trajectories | ||
Args: | ||
C (torch.Tensor): The ET descriptor coefficients | ||
evec (torch.Tensor): The ET descriptors (eigenvectors) | ||
Returns: | ||
traj (torch.Tensor): The Euclidean trajectory""" | ||
|
||
# ET -> Euclidean | ||
t = evec.size(0) // self.dim | ||
M = evec.detach() @ C | ||
traj = M.T.reshape(-1, t, self.dim) | ||
return traj | ||
|
||
def truncated_SVD(self, traj, k=None, full_matrices=False): | ||
r"""Truncated Singular Value Decomposition | ||
Args: | ||
traj (torch.Tensor): The trajectory to be decomposed | ||
k (int): The number of singular values and vectors to be computed | ||
full_matrices (bool): Whether to compute full-sized matrices | ||
Returns: | ||
U_trunc (torch.Tensor): The truncated left singular vectors | ||
S_trunc (torch.Tensor): The truncated singular values | ||
Vt_trunc (torch.Tensor): The truncated right singular vectors | ||
""" | ||
|
||
assert traj.size(2) == self.dim # NTC | ||
k = self.k if k is None else k | ||
|
||
# Singular Value Decomposition | ||
M = traj.reshape(-1, traj.size(1) * self.dim).T | ||
U, S, Vt = torch.linalg.svd(M, full_matrices=full_matrices) | ||
|
||
# Truncated SVD | ||
U_trunc, S_trunc, Vt_trunc = U[:, :k], S[:k], Vt[:k, :] | ||
return U_trunc, S_trunc, Vt_trunc.T | ||
|
||
def parameter_initialization(self, obs_traj, pred_traj): | ||
r"""Initialize the ET descriptor parameters (for training only) | ||
Args: | ||
obs_traj (torch.Tensor): The observed trajectory | ||
pred_traj (torch.Tensor): The predicted trajectory | ||
Returns: | ||
pred_traj_norm (torch.Tensor): The normalized predicted trajectory | ||
U_pred_trunc (torch.Tensor): The truncated eigenvectors of the predicted trajectory | ||
Note: | ||
This function should be called once before training the model.""" | ||
|
||
# Normalize trajectory | ||
obs_traj_norm, pred_traj_norm = self.normalize_trajectory(obs_traj, pred_traj) | ||
|
||
# Singular Value Decomposition with truncation | ||
U_obs_trunc, _, _ = self.truncated_SVD(obs_traj_norm) | ||
U_pred_trunc, _, _ = self.truncated_SVD(pred_traj_norm) | ||
|
||
# Register eigenvectors as model parameters | ||
self.U_obs_trunc = nn.Parameter(U_obs_trunc.to(self.U_obs_trunc.device)) | ||
self.U_pred_trunc = nn.Parameter(U_pred_trunc.to(self.U_pred_trunc.device)) | ||
|
||
# Reuse values for anchor generation | ||
return pred_traj_norm, U_pred_trunc | ||
|
||
def projection(self, obs_traj, pred_traj=None): | ||
r"""Trajectory projection to the ET space | ||
Args: | ||
obs_traj (torch.Tensor): The observed trajectory | ||
pred_traj (torch.Tensor): The predicted trajectory (optional, for training only) | ||
Returns: | ||
C_obs (torch.Tensor): The observed trajectory in the ET space | ||
C_pred (torch.Tensor): The predicted trajectory in the ET space (optional, for training only) | ||
""" | ||
|
||
# Trajectory Projection | ||
obs_traj_norm, pred_traj_norm = self.normalize_trajectory(obs_traj, pred_traj) | ||
C_obs = self.to_ET_space(obs_traj_norm, evec=self.U_obs_trunc).detach() | ||
C_pred = self.to_ET_space(pred_traj_norm, evec=self.U_pred_trunc).detach() if pred_traj is not None else None | ||
return C_obs, C_pred | ||
|
||
def reconstruction(self, C_pred): | ||
r"""Trajectory reconstruction from the ET space | ||
Args: | ||
C_pred (torch.Tensor): The predicted trajectory in the ET space | ||
Returns: | ||
pred_traj (torch.Tensor): The predicted trajectory in the Euclidean space | ||
""" | ||
|
||
# Trajectory Reconstruction | ||
pred_traj_norm = [self.to_Euclidean_space(C_pred[:, :, s], evec=self.U_pred_trunc) for s in range(self.s)] | ||
pred_traj = [self.denormalize_trajectory(pred_traj_norm[s]) for s in range(self.s)] | ||
pred_traj = torch.stack(pred_traj, dim=0) # SNTC | ||
return pred_traj | ||
|
||
def forward(self, C_pred): | ||
r"""Alias for reconstruction""" | ||
|
||
return self.reconstruction(C_pred) |
Oops, something went wrong.