[Documentation] [TitleIndex] [WordIndex

Writing your Parallax PING))) driver (Python)

This tutorial will guide you through combining distance measurement and relative velocity measurement of the Parallax PING))) sensor into a driver, which will be included in a ROS wrapper.

It is assumed that you have already read Distance measurement with ultrasonic sensor Parallax PING))) (Python) and Speed measurement with ultrasonic sensor Parallax PING))) (Python).

Besides the basics in ROS from the Tutorials, it is also already assumed that you have read the chapters What is a ROS Wrapper?, Write a ROS Wrapper (Python) and Package and Test Your Driver.

Programming our Driver code

Why do we need a driver? Of course, we can simplistically say that we need a driver so that we can operate the sensor. That is also correct. We could simplified yes access the sensor also within our ROS code and save and the concept of a driver. One advantage of programming our driver is that we create a reusability through it. This driver is a Parallax PING))) Python driver for the Raspberry Pi. That means we can use it on every Raspberry Pi, also independent of ROS. Another important point of reusability is that I can connect multiple Parallax PING))) sensors to one Raspberry Pi. To achieve this, we need to rewrite our previous code in an object-oriented way:

import sys
import RPi.GPIO as GPIO
import time

class UltrasonicParallaxPING(object):

    def __init__(self, signal):
        self.signal = signal

        self.timeout = 0.05

        GPIO.setmode(GPIO.BCM)
        GPIO.setwarnings(False)

    def distance(self):
        arrivalTime = 0
        startTime = 0

        GPIO.setup(self.signal, GPIO.OUT) # Set to low
        GPIO.output(self.signal, False)

        time.sleep(0.01)

        GPIO.output(self.signal, True) # Set high

        time.sleep(0.00001)

        GPIO.output(self.signal, False) # Set low
        GPIO.setup(self.signal, GPIO.IN) # Set to input

        timeout_start = time.time()

        # Count microseconds that SIG was high
        while GPIO.input(self.signal) == 0:
            startTime = time.time()

            if startTime - timeout_start > self.timeout:
                return -1

        while GPIO.input(self.signal) == 1:
            arrivalTime = time.time()
        
            if startTime - timeout_start > self.timeout:
                return -1

        if startTime != 0 and arrivalTime != 0:
            pulse_duration = arrivalTime - startTime
            # The speed of sound is 340 m/s or 29 microseconds per centimeter.
            # The ping travels out and back, so to find the distance of the
            # object we take half of the distance travelled.
            # distance = duration / 29 / 2
            #distance = pulse_duration * 100 * 343.0 / 2
            distance = (pulse_duration * 34300) / 2

            #print('start = %s'%startTime,)
            #print('end = %s'%arrivalTime)
            if distance >= 0:
                return distance
            else:
                return -1
        else :
            #print('start = %s'%startTime,)
            #print('end = %s'%arrivalTime)
            return -1

    def speed(self):
        start_time = time.time()

        start_distance = self.distance() * 0.01     # to m conversion
        end_distance = self.distance() * 0.01       # to m conversion

        end_time = time.time()

        speed = (end_distance - start_distance) / 1.0   # m/s

        return speed

As we can see, two things have changed:

Programming our Wrapper code

The next step is to build our ROS wrapper for this driver. This looks like the following for example:

import rospy
from sensor_msgs.msg import Range
# import custom message type for Relative Velocity
from ultrasonic_parallax_ping import UltrasonicParallaxPING # import our driver module

class UltrasonicParallaxPINGWrapper(object):

    def __init__(self):
        self.min_range = 3
        self.max_range = 300
        self.fov = 0.34906585039887 # 20 degrees

        self.ultrasonicPub = rospy.Publisher('/ultrasonic/distance', Range, queue_size=10)
        self.ultrasonicVelocityPub = rospy.Publisher('/ultrasonic/relative_velocity', Range, queue_size=10)

        # loading the driver
        self.ultrasonic_sensor = UltrasonicParallaxPING(17)

        self.rate = rospy.Rate(10) # 10hz
        rospy.Timer(self.rate, self.publish_current_distance)
        rospy.Timer(self.rate, self.publish_current_velocity)

    def publish_current_distance(self):
        distance = self.ultrasonic_sensor.distance()

        message_str = "Distance: %s cm" % distance
        rospy.loginfo(message_str)
        
        #for distance in ranges:
        r = Range()

        r.header.stamp = rospy.Time.now()
        r.header.frame_id = "/base_link"
        r.radiation_type = Range.ULTRASOUND
        r.field_of_view = self.fov # 15 degrees
        r.min_range = self.min_range
        r.max_range = self.max_range

        r.range = distance
            
        self.ultrasonicPub.publish(r)

    def publish_current_velocity(self):
        relative_velocity = self.ultrasonic_sensor.speed()

        message_str = " Speed: %s m/s" % relative_velocity
        rospy.loginfo(message_str)
        
        rv = Range() # using range instead of a custom message type

        rv.header.stamp = rospy.Time.now()
        rv.header.frame_id = "/base_link"
        rv.radiation_type = Range.ULTRASOUND
        rv.field_of_view = self.fov
        rv.min_range = self.min_range
        rv.max_range = self.max_range

        rv.rage = relative_velocity # using range instead of a custom message type with a field relative velocity
            
        self.ultrasonicVelocityPub.publish(rv)

    def stop(self):
        self.ultrasonicPub.unregister()
        self.ultrasonicVelocityPub.unregister()

# Main function.
if __name__ == '__main__':
    # Initialize the node and name it.
    rospy.init_node("ultrasonic_driver", anonymous=False)

    ultrasonic_wrapper = UltrasonicParallaxPINGWrapper()

    rospy.on_shutdown(ultrasonic_wrapper.stop)
    rospy.loginfo("Ultrasonic driver is now started.")

    rospy.spin()

Optimizations

What else do you need to optimize in this example? What do you need to adjust?

The sensor_msgs have information about the sensor, that is for example the Field of View. These should possibly be loaded via parameters. Also, depending on the sensor, the minimum and maximum range from which a sensor can measure can be adjusted to your sensor. Also here it is recommended to load parameters. And last but not least, you can of course select the pins for the trigger and for the echo differently. Here, too, it would be conceivable that they load parameters.

This can look like the following:

import rospy
from sensor_msgs.msg import Range
# import custom message type for Relative Velocity
from ultrasonic_parallax_ping import UltrasonicParallaxPING # import our driver module

class UltrasonicParallaxPINGWrapper(object):

    def __init__(self):
        self.min_range = rospy.get_param("~minimum_range", 3)
        self.max_range = rospy.get_param("~maximum_range", 300)
        self.fov = rospy.get_param("~field_of_view", 20) * 0.0174532925199433 # degree to radian conversion (0.34906585039887 # 20 degrees)

        signal_pin = rospy.get_param("~signal_pin", 17)

        self.ultrasonicPub = rospy.Publisher('/ultrasonic/distance', Range, queue_size=10)
        self.ultrasonicVelocityPub = rospy.Publisher('/ultrasonic/relative_velocity', Range, queue_size=10)

        # loading the driver
        self.ultrasonic_sensor = UltrasonicPARALLAXPING(signal_pin)

        self.rate = rospy.Rate(10) # 10hz
        rospy.Timer(self.rate, self.publish_current_distance)
        rospy.Timer(self.rate, self.publish_current_velocity)

    def publish_current_distance(self):
        distance = self.ultrasonic_sensor.distance()

        message_str = "Distance: %s cm" % distance
        rospy.loginfo(message_str)
        
        #for distance in ranges:
        r = Range()

        r.header.stamp = rospy.Time.now()
        r.header.frame_id = "/base_link"
        r.radiation_type = Range.ULTRASOUND
        r.field_of_view = self.fov # 15 degrees
        r.min_range = self.min_range
        r.max_range = self.max_range

        r.range = distance
            
        self.ultrasonicPub.publish(r)

    def publish_current_velocity(self):
        relative_velocity = self.ultrasonic_sensor.speed()

        message_str = " Speed: %s m/s" % relative_velocity
        rospy.loginfo(message_str)
        
        rv = Range() # using range instead of a custom message type

        rv.header.stamp = rospy.Time.now()
        rv.header.frame_id = "/base_link"
        rv.radiation_type = Range.ULTRASOUND
        rv.field_of_view = self.fov
        rv.min_range = self.min_range
        rv.max_range = self.max_range

        rv.rage = relative_velocity # using range instead of a custom message type with a field relative velocity
            
        self.ultrasonicVelocityPub.publish(rv)

    def stop(self):
        self.ultrasonicPub.unregister()
        self.ultrasonicVelocityPub.unregister()

# Main function.
if __name__ == '__main__':
    # Initialize the node and name it.
    rospy.init_node("ultrasonic_driver", anonymous=False)

    ultrasonic_wrapper = UltrasonicParallaxPINGWrapper()

    rospy.on_shutdown(ultrasonic_wrapper.stop)
    rospy.loginfo("Ultrasonic driver is now started.")

    rospy.spin()

In the next tutorial series, we'll look at how to implement the code using C++ instead of Python. Please read Distance measurement with ultrasonic sensor Parallax PING))) (C++).


2024-12-28 12:14