import os
import shutil
import numpy as np
import torch
import torch.nn.functional as F
import open3d as o3d
from plyfile import PlyData, PlyElement
from scipy.spatial.transform import Rotation as R
from cosense3d.utils.misc import check_numpy_to_torch
from cosense3d.utils.pcdio import point_cloud_from_path
ply_fields = {'x': 'f4', 'y': 'f4', 'z': 'f4', 'ObjIdx': 'u4', 'ObjTag': 'u4', 'ring': 'u1', 'time': 'f4'}
np_types = {'f4': np.float32, 'u4': np.uint32, 'u1': np.uint8}
[docs]def read_ply(filename):
ply = PlyData.read(filename)
data = ply['vertex']
properties = [prop.name for prop in data.properties]
property_types = [prop.val_dtype for prop in data.properties]
return {name: np.array(data[name]) for name in properties}, property_types
[docs]def save_cosense_ply(data, output_file_name):
data = {
'x': data['x'].astype(np_types[ply_fields['x']]),
'y': data['y'].astype(np_types[ply_fields['y']]),
'z': data['z'].astype(np_types[ply_fields['z']]),
'ObjIdx': data['ObjIdx'].astype(np_types[ply_fields['ObjIdx']]),
'ObjTag': data['ObjTag'].astype(np_types[ply_fields['ObjTag']]),
'ring': data['ring'].astype(np_types[ply_fields['ring']]),
'time': data['time'].astype(np_types[ply_fields['time']])
}
vertex_data = list(zip(*[data[k] for k, v in ply_fields.items()]))
vertex_type = [(k, v) for k, v in ply_fields.items()]
vertex = np.array(vertex_data, dtype=vertex_type)
el = PlyElement.describe(vertex, 'vertex')
PlyData([el]).write(output_file_name)
[docs]def lidar_ply2bin(ply_file, bin_file,
fields=['x', 'y', 'z', 'intensity'],
replace=False):
"""
Read ply and save to the cosense3d binary format.
:param ply_file: str, input file name
:param bin_file: str, output file name
:param fields: list of str, names that indicates 'x', 'y', 'z' and 'intensity'
:param replace: replace the exisiting file if True
"""
if not replace and os.path.exists(bin_file):
return
pointcloud, property_types = read_ply(ply_file)
pcd_out = np.stack([pointcloud[k] for k in fields], axis=1)
pcd_out.tofile(bin_file)
[docs]def lidar_bin2pcd_o3d(bin_file, out_file, replace=False):
if not replace and os.path.exists(out_file):
return
bin_pcd = np.fromfile(bin_file, dtype=np.float32)
# reshape
points = bin_pcd.reshape(-1, 4)
# remove nan points
mask = np.logical_not(np.isnan(points[:, :3]).any(axis=1))
points = points[mask]
o3d_pcd = o3d.geometry.PointCloud()
o3d_pcd.points = o3d.utility.Vector3dVector(points[:, :-1])
point_intensity = np.zeros_like(points[:, :-1])
point_intensity[:, 0] = points[:, -1] / 255.
o3d_pcd.colors = o3d.utility.Vector3dVector(point_intensity)
# write to pcd file
o3d.io.write_point_cloud(out_file,
pointcloud=o3d_pcd,
write_ascii=True)
[docs]def lidar_bin2pcd(bin_file, out_file, replace=False):
if not replace and os.path.exists(out_file):
return
bin_pcd = np.fromfile(bin_file, dtype=np.float32)
# reshape
points = bin_pcd.reshape(-1, 4)
points[:, 3] /= 255
mask = np.logical_not(np.isnan(points[:, :3]).any(axis=1))
points = points[mask]
header_str = header(points)
with open(out_file, 'w') as fh:
# fh.write()
np.savetxt(fh, points, fmt='%f', header=header_str)
# shutil.copy(out_file.replace('pcd', 'txt'), out_file)
[docs]def lidar_bin2bin(bin_file, out_file):
shutil.copy(bin_file, out_file)
[docs]def load_pcd(pcd_file: str, return_o3d: bool=False):
"""
Read pcd and return numpy array.
:param pcd_file: The pcd file that contains the point cloud.
:param return_o3d: Default returns numpy array, set True to return pcd as o3d PointCloud object
:return: lidar_dict,
xyz: (pcd_np | pcd : np.ndarray | o3d.geometry.PointCloud) the lidar xyz coordinates in numpy format, shape:(n, 3);
intensity: (optional) np.ndarray, (n,).
label: (optional) np.ndarray, (n,).
time: (optional) np.ndarray, (n,).
ray: (optional) np.ndarray, (n,).
"""
lidar_dict = {}
ext = os.path.splitext(pcd_file)[-1]
if ext == '.pcd':
if return_o3d:
return o3d.io.read_point_cloud(pcd_file)
else:
pcd = point_cloud_from_path(pcd_file)
lidar_dict['xyz'] = np.stack([pcd.pc_data[x] for x in 'xyz'], axis=-1).astype(float)
# we save the intensity in the first channel
if 'intensity' in pcd.fields:
lidar_dict['intensity'] = pcd.pc_data['intensity']
if 'timestamp' in pcd.fields:
lidar_dict['time'] = pcd.pc_data['timestamp']
elif ext == '.bin':
pcd_np = np.fromfile(pcd_file, dtype=np.float32).reshape(-1, 4)
if return_o3d:
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(pcd_np)
return pcd
lidar_dict['xyz'] = pcd_np[:, :3]
# check attribute of last column,
# num of unique labels for the datasets in this projects is less than 50,
# unique intensities is normally larger then 50
if len(np.unique(pcd_np[:, -1])) < 50:
lidar_dict['label'] = pcd_np[:, -1]
elif pcd_np[:, -1].max() > 1:
lidar_dict['intensity'] = pcd_np[:, -1] / 255
else:
lidar_dict['intensity'] = pcd_np[:, -1]
elif ext == '.ply':
data = read_ply(pcd_file)[0]
xyz = np.stack([data.pop(x) for x in 'xyz'], axis=1)
lidar_dict['xyz'] = xyz
lidar_dict.update(data)
else:
raise NotImplementedError
return lidar_dict
[docs]def tf2pose(tf_matrix):
euler = R.from_matrix(tf_matrix[:3, :3]).as_euler('xyz')
translation = tf_matrix[:3, 3]
return translation.tolist() + euler.tolist()
[docs]def pose2tf(pose):
tf_matrix = np.eye(4)
tf_matrix[:3, :3] = rotation_matrix(pose[3:])
tf_matrix[:3, 3] = np.array(pose[:3])
return tf_matrix
[docs]def rotation_matrix(euler, degrees=True):
"""
Construct rotation matrix with the given pose.
:param euler: list or np.ndarray
[roll, pitch, yaw]
:return: rot: np.ndarray, 3x3
rotation matrix
"""
return R.from_euler('xyz', euler, degrees=degrees).as_matrix()
[docs]def rotate3d(points, euler):
"""
Rotate point cloud with the euler angles given in pose.
:param points: np.ndarray, N x (3 + C)
each point in the row has the format [x, y, z, ...]
:param euler: list or np.ndarray
[roll, pitch, yaw]
:return: points: np.ndarray
rotated point cloud
"""
assert len(euler) == 3
rot = rotation_matrix(euler)
points[:, :3] = (rot @ points[:, :3].T).T
return points
[docs]def cart2cyl(input_xyz):
rho = np.sqrt(input_xyz[..., 0] ** 2 + input_xyz[..., 1] ** 2)
phi = np.arctan2(input_xyz[..., 1], input_xyz[..., 0])
return np.concatenate((rho.reshape(-1, 1), phi.reshape(-1, 1), input_xyz[..., 2:]), axis=-1)
[docs]def cyl2cart(input_xyz_polar):
x = input_xyz_polar[..., 0] * np.cos(input_xyz_polar[..., 1])
y = input_xyz_polar[..., 0] * np.sin(input_xyz_polar[..., 1])
return np.concatenate((x.reshape(-1, 1), y.reshape(-1, 1), input_xyz_polar[..., 2:]), axis=-1)
[docs]def mat_yaw(cosa, sina, zeros=0, ones=1):
return [
cosa, -sina, zeros,
sina, cosa, zeros,
zeros, zeros, ones
]
[docs]def mat_pitch(cosa, sina, zeros=0, ones=1):
return [
cosa, zeros, sina,
zeros, ones, zeros,
-sina, zeros, cosa,
]
[docs]def mat_roll(cosa, sina, zeros=0, ones=1):
return [
ones, zeros, zeros,
zeros, cosa, -sina,
zeros, sina, cosa,
]
[docs]def rotate_points_along_z_np(points, angle):
"""
:param points: (N, 3 + C or 2 + C)
:param angle: float, angle along z-axis, angle increases x ==> y
"""
cosa = np.cos(angle)
sina = np.sin(angle)
rot_matrix = np.array([
[cosa, sina, 0],
[-sina, cosa, 0],
[0, 0, 1]
]).astype(np.float)
if points.shape[1]==2:
points_rot = np.matmul(points, rot_matrix[:2, :2])
elif points.shape[1]>2:
points_rot = np.matmul(points[:, 0:3], rot_matrix)
points_rot = np.concatenate((points_rot, points[:, 3:]), axis=-1)
else:
raise IOError('Input points should have the shape: (N, 3 + C or 2 + C).')
return points_rot
[docs]def rotate_points_batch(points, angles, order='xyz'):
"""
:param points: (B, N, 3 + C)
:param angles: (B, 1|3), radians
rotation = R(3)R(2)R(1) if angles shape in (B, 3)
:return: points_rot: (B, N, 3 + C)
"""
assert angles.shape[1] == len(order), \
"angles should has the shape (len(points), len(order))."
points, is_numpy = check_numpy_to_torch(points)
angles, _ = check_numpy_to_torch(angles)
cosas = torch.cos(angles)
sinas = torch.sin(angles)
zeros = angles[:, 0].new_zeros(points.shape[0])
ones = angles[:, 0].new_ones(points.shape[0])
rot_matrix = torch.eye(3, dtype=points.dtype, device=points.device)
rot_matrix = rot_matrix.reshape((1, 3, 3)).repeat(angles.shape[0], 1, 1)
for cosa, sina, ax in zip(cosas.T, sinas.T, order):
if ax == 'z':
rot = torch.stack(mat_yaw(
cosa, sina, zeros, ones
), dim=1).view(-1, 3, 3).float()
elif ax == 'y':
rot = torch.stack(mat_pitch(
cosa, sina, zeros, ones
), dim=1).view(-1, 3, 3).float()
elif ax == 'x':
rot = torch.stack(mat_roll(
cosa, sina, zeros, ones
), dim=1).view(-1, 3, 3).float()
else:
raise NotImplementedError
rot_matrix = torch.bmm(rot, rot_matrix)
points_rot = torch.bmm(rot_matrix, points[:, :, 0:3].float().
permute(0, 2, 1)).permute(0, 2, 1)
points_rot = torch.cat((points_rot, points[:, :, 3:]), dim=-1)
return points_rot.numpy() if is_numpy else points_rot
[docs]def rotate_points_along_z_torch(points, angle):
"""
:param points: (N, 2 + C) or (B, 2 + C)
:param angle: float or tensor of shape (B), angle along z-axis, angle increases x ==> y
"""
if len(points.shape) == 2:
points = points.unsqueeze(0)
if isinstance(angle, float):
angle = torch.tensor([angle], device=points.device)
else:
assert isinstance(angle, torch.Tensor)
assert points.shape[0] == 1 or angle.shape[0] == points.shape[0]
cosa = torch.cos(angle)
sina = torch.sin(angle)
rot_matrix = torch.stack([
torch.stack([cosa, sina], dim=-1),
torch.stack([-sina, cosa], dim=-1)
], dim=1).float().to(points.device)
if points.shape[0] == 1 and angle.shape[0] > 1:
points = torch.tile(points, (len(rot_matrix), 1, 1))
points_rot = torch.bmm(points[..., 0:2], rot_matrix)
points_rot = torch.cat((points_rot, points[..., 2:]), dim=-1)
return points_rot
[docs]def rotate_points_with_tf_np(points: np.ndarray, tf_np: np.ndarray) -> np.ndarray:
"""
Rotate points with transformation matrix.
:param points (np.ndarray): Nx3 points array
:param tf_np (np.ndarray): 4x4 transformation matrix
:return: points (np.ndarray): Nx3 points array
"""
points_homo = np.concatenate([points, np.ones_like(points[:, :1])], axis=-1).T
points = (tf_np @ points_homo)[:3].T
return points
[docs]def rotate_box_corners_with_tf_np(corners: np.ndarray, tf_np: np.ndarray) -> np.ndarray:
"""
Rotate points with transformation matrix
:param corners: Nx8X3 points array
:param tf_np: 4x4 transformation matrix
:return: corners, Nx8X3 points array
"""
points = rotate_points_with_tf_np(corners.reshape(-1, 3), tf_np)
corners = points.reshape(corners.shape)
return corners
[docs]def mask_values_in_range(values, min, max):
return np.logical_and(values>min, values<max)
[docs]def mask_points_in_box(points, pc_range):
n_ranges = len(pc_range) // 2
list_mask = [mask_values_in_range(points[:, i], pc_range[i],
pc_range[i+n_ranges]) for i in range(n_ranges)]
return np.array(list_mask).all(axis=0)
[docs]def mask_points_in_range(points: np.array, dist: float) -> np.array:
"""
:rtype: np.array
"""
return np.linalg.norm(points[:, :2], axis=1) < dist
[docs]def get_tf_matrix_torch(vectors, inv=False):
device = vectors.device
n, _ = vectors.shape
xs = vectors[:, 0]
ys = vectors[:, 1]
angles = vectors[:, 2]
cosa = torch.cos(angles)
sina = torch.sin(angles)
ones = torch.ones_like(angles)
zeros = torch.zeros_like(angles)
rot_matrix = torch.zeros((n, 3, 3), device=device, requires_grad=True)
rot_matrix[:, 0, 0] = cosa
rot_matrix[:, 0, 1] = -sina
rot_matrix[:, 1, 0] = sina
rot_matrix[:, 1, 1] = cosa
shift_matrix = torch.zeros_like(rot_matrix, requires_grad=True)
shift_matrix[:, 0, 1] = xs
shift_matrix[:, 1, 0] = ys
shift_matrix[:, [0, 1, 2], [0, 1, 2]] = 1.0
if inv:
mat = torch.einsum('...ij, ...jk', rot_matrix, shift_matrix)
else:
mat = torch.einsum('...ij, ...jk', shift_matrix, rot_matrix)
return mat, rot_matrix, shift_matrix
[docs]def rotation_mat2euler_torch(mat):
sy = torch.norm(mat[:, :2, 0], dim=1)
singular = sy < 1e-6
not_singular = torch.logical_not(singular)
euler = torch.zeros_like(mat[:, 0])
if not_singular.sum() > 0:
euler[not_singular, 0] = torch.atan2(mat[not_singular, 2, 1], mat[not_singular, 2, 2])
euler[not_singular, 1] = torch.atan2(-mat[not_singular, 2, 0], sy)
euler[not_singular, 2] = torch.atan2(mat[not_singular, 1, 0], mat[not_singular, 0, 0])
if singular.sum() > 0:
euler[singular, 0] = torch.atan2(-mat[singular, 1, 2], mat[singular, 1, 1])
euler[singular, 1] = torch.atan2(-mat[singular, 2, 0], sy)
return euler
[docs]def pose_err_global2relative_torch(poses, errs):
"""
Calculate relative pose transformation based on the errorneous global positioning
:param poses: Nx2 or Nx3, first row is ego pose, other rows are the coop poses
:param errs: Nx3, first row is ego pose error and other rows for coop pose errors
:return: (N-1)x3, relative localization errors between ego and coop vehicles
"""
if poses.shape[-1]==2:
poses = torch.cat([poses, torch.zeros_like(poses[:, 0:1])], dim=-1)
poses_err = poses + errs
R01, _, _ = get_tf_matrix_torch(-poses[:1], inv=True)
R10_hat, _, _ = get_tf_matrix_torch(poses_err[:1])
R20, _, _ = get_tf_matrix_torch(poses[1:])
R02_hat, _, _ = get_tf_matrix_torch(-poses_err[1:], inv=True)
delta_R21 = torch.einsum('...ij, ...jk', R01, R20)
delta_R21 = torch.einsum('...ij, ...jk', delta_R21, R02_hat)
delta_R21 = torch.einsum('...ij, ...jk', delta_R21, R10_hat)
x = delta_R21[0, 2]
y = delta_R21[1, 2]
theta = torch.atan2(delta_R21[1, 0], delta_R21[0, 0])
return torch.stack([x, y, theta], dim=-1)
[docs]def project_points_by_matrix_torch(points, transformation_matrix):
"""
Project the points to another coordinate system based on the
transformation matrix.
:param points: torch.Tensor, 3D points, (N, 3)
:param transformation_matrix: torch.Tensor, Transformation matrix, (4, 4)
:return: projected_points : torch.Tensor, The projected points, (N, 3)
"""
points, is_numpy = \
check_numpy_to_torch(points)
transformation_matrix, _ = \
check_numpy_to_torch(transformation_matrix)
# convert to homogeneous coordinates via padding 1 at the last dimension.
# (N, 4)
points_homogeneous = F.pad(points, (0, 1), mode="constant", value=1)
# (N, 4)
projected_points = torch.einsum("ik, jk->ij", points_homogeneous,
transformation_matrix)
return projected_points[:, :3] if not is_numpy \
else projected_points[:, :3].numpy()
if __name__=="__main__":
for i in range(0, 300):
frame = f"{i:06d}"
ply_file = f"/koko/LUMPI/train/measurement5/lidar/{frame}.ply"
bin_file = f"/media/hdd/projects/TAL/data/lumpi_m5/lidar0/{frame}.bin"
lidar_ply2bin(ply_file, bin_file)