-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathros2_pcl_filters.py
93 lines (76 loc) · 2.96 KB
/
ros2_pcl_filters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2024, Ouster, Inc.
# All rights reserved.
"""
A module that shows how to subscribe to ouster point cloud and utililze pcl filters.
Author: Ussama Naal
"""
import numpy as np
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy, LivelinessPolicy
from sensor_msgs.msg import PointCloud2, PointField
import point_cloud2 as pc2
import pcl
class PointCloudSubscriber(Node):
def __init__(self):
super().__init__('point_cloud_subscriber')
qos_profile = QoSProfile(
reliability=ReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
durability=DurabilityPolicy.RMW_QOS_POLICY_DURABILITY_VOLATILE,
liveliness=LivelinessPolicy.RMW_QOS_POLICY_LIVELINESS_AUTOMATIC,
depth=5
)
self.subscription = self.create_subscription(
PointCloud2,
'/ouster/points',
self.point_cloud_callback,
qos_profile=qos_profile)
self.publisher = self.create_publisher(
PointCloud2,
'/ouster/points_filtered',
qos_profile=qos_profile)
def convert_pointcloud2_to_pcl(self, msg):
data = pc2.read_points(msg, field_names="xyz")
points = np.array(list(data), dtype=np.float32)
cloud = pcl.PointCloud()
cloud.from_array(points)
return cloud
def convert_pcl_to_pointcloud2(self, pcl_cloud, header):
points = pcl_cloud.to_array()
fields = [
PointField(name='x', offset=0,
datatype=PointField.FLOAT32, count=1),
PointField(name='y', offset=4,
datatype=PointField.FLOAT32, count=1),
PointField(name='z', offset=8,
datatype=PointField.FLOAT32, count=1)
]
return pc2.create_cloud(header, fields, points)
def remove_outliers(self, point_cloud, mean_k=10, std_dev_mul_thresh=1.0):
fil = point_cloud.make_statistical_outlier_filter()
fil.set_negative(True)
fil.set_mean_k(mean_k)
fil.set_std_dev_mul_thresh(std_dev_mul_thresh)
return fil.filter()
def pass_through_filter(self, point_cloud, axis="x", dist=1.0):
fil = point_cloud.make_passthrough_filter()
fil.set_filter_field_name(axis)
fil.set_filter_limits(-dist, dist)
return fil.filter()
def point_cloud_callback(self, msg):
points = self.convert_pointcloud2_to_pcl(msg)
# pcl_cloud = self.pass_through_filter(points)
pcl_cloud = self.remove_outliers(points)
mm = self.convert_pcl_to_pointcloud2(pcl_cloud, msg.header)
self.publisher.publish(mm)
def main(args=None):
rclpy.init(args=args)
point_cloud_subscriber = PointCloudSubscriber()
rclpy.spin(point_cloud_subscriber)
point_cloud_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()