Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 106 additions & 42 deletions XRPLib/rangefinder.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import machine, time
from machine import Pin
import time
from machine import Pin, Timer

class Rangefinder:

_DEFAULT_RANGEFINDER_INSTANCE = None
_instances = []
_timer = None
_current_index = 0

@classmethod
def get_default_rangefinder(cls):
Expand All @@ -14,17 +17,43 @@ def get_default_rangefinder(cls):
cls._DEFAULT_RANGEFINDER_INSTANCE = cls()
return cls._DEFAULT_RANGEFINDER_INSTANCE

@classmethod
def _start_shared_timer(cls):
"""
Start the shared timer if it isn't already running.
One timer cycles through all rangefinder instances to prevent acoustic crosstalk.
"""
if cls._timer is None:
cls._timer = Timer(-1)
cls._timer.init(mode=Timer.PERIODIC, period=60, callback=cls._timer_callback)

@classmethod
def _timer_callback(cls, _t):
"""
Shared timer callback that triggers one rangefinder per tick,
cycling through all instances in round-robin order.
"""
if not cls._instances:
return
instance = cls._instances[cls._current_index]
instance._do_ping()
cls._current_index = (cls._current_index + 1) % len(cls._instances)

def __init__(self, trigger_pin: int|str = "RANGE_TRIGGER", echo_pin: int|str = "RANGE_ECHO", timeout_us:int=500*2*30):
"""
A basic class for using the HC-SR04 Ultrasonic Rangefinder.
A non-blocking class for using the HC-SR04 Ultrasonic Rangefinder.
The sensor range is between 2cm and 4m.
Measurements are taken continuously in the background using a shared timer
and pin IRQ, so distance() returns immediately with the most recent value.
When multiple rangefinders exist, they are pinged sequentially to prevent
acoustic crosstalk.
Timeouts will return a MAX_VALUE (65535) instead of raising an exception.

:param trigger_pin: The number of the pin on the microcontroller that's connected to the ``Trig`` pin on the HC-SR04.
:type trigger_pin: int
:param echo_pin: The number of the pin on the microcontroller that's connected to the ``Echo`` pin on the HC-SR04.
:type echo_pin: int
:param timeout_us: Max microseconds seconds to wait for a response from the sensor before assuming it isn't going to answer. By default set to 30,000 us (0.03 s)
:param timeout_us: Max microseconds to wait for a response from the sensor before assuming it isn't going to answer. By default set to 30,000 us (0.03 s)
:type timeout_us: int
"""
self.timeout_us = timeout_us
Expand All @@ -36,58 +65,93 @@ def __init__(self, trigger_pin: int|str = "RANGE_TRIGGER", echo_pin: int|str = "
# Init echo pin (in)
self.echo = Pin(echo_pin, mode=Pin.IN, pull=None)

self.cms = 0
self.last_echo_time = 0
self.cache_time_us = 3000
# Cache time functions as instance attributes for use in hard IRQ context,
# where global module lookups are not allowed
self._ticks_us = time.ticks_us
self._ticks_diff = time.ticks_diff

self.cms = self.MAX_VALUE
self._echo_start = 0
self._waiting_for_echo = False
self._trigger_time = 0
self._first_reading_done = False

# Pre-bind methods as instance attributes so the shared timer callback
# can call them without creating bound method objects (which would
# allocate memory in hard IRQ context)
self._do_ping = self._trigger_ping
self._do_echo = self._echo_handler

def _send_pulse_and_wait(self):
# Register echo pin IRQ for both rising and falling edges
self.echo.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=self._do_echo)

# Register this instance and start the shared timer
Rangefinder._instances.append(self)
Rangefinder._start_shared_timer()

def _trigger_ping(self):
"""
Send the pulse to trigger and listen on echo pin.
We use the method `machine.time_pulse_us()` to get the microseconds until the echo is received.
Send a trigger pulse to the HC-SR04.
Called by the shared timer callback.
Only ~15us of work per call (negligible blocking).
Also detects timeouts from previous measurements.
"""
self._trigger.value(0) # Stabilize the sensor
if self._waiting_for_echo:
# Check if previous measurement timed out
if self._ticks_diff(self._ticks_us(), self._trigger_time) > self.timeout_us:
self.cms = self.MAX_VALUE
self._waiting_for_echo = False
self._first_reading_done = True
else:
# Still waiting for a valid echo, skip this trigger
return

# Send trigger pulse
self._trigger.value(0)
self._delay_us(5)
self._trigger.value(1)
# Send a 10us pulse.
self._delay_us(10)
self._trigger.value(0)
try:
pulse_time = machine.time_pulse_us(self.echo, 1, self.timeout_us)
return pulse_time
except OSError as exception:
raise exception
self._trigger_time = self._ticks_us()
self._waiting_for_echo = True

def _echo_handler(self, pin):
"""
Pin IRQ handler for the echo pin.
Rising edge: record start time.
Falling edge: compute distance from pulse width.
"""
if pin.value() == 1:
# Rising edge - echo pulse started
self._echo_start = self._ticks_us()
else:
# Falling edge - echo pulse ended
if self._waiting_for_echo:
pulse_time = self._ticks_diff(self._ticks_us(), self._echo_start)
if pulse_time > 0:
# Sound speed 343.2 m/s = 0.034320 cm/us = 1cm per 29.1us
# Divide by 2 because pulse travels to target and back
self.cms = (pulse_time / 2) / 29.1
else:
self.cms = self.MAX_VALUE
self._waiting_for_echo = False
self._first_reading_done = True

def distance(self) -> float:
"""
Get the distance in centimeters by measuring the echo pulse time
"""
if time.ticks_diff(time.ticks_us(), self.last_echo_time) < self.cache_time_us and not (self.cms == 65535 or self.cms == 0):
return self.cms

try:
pulse_time = self._send_pulse_and_wait()
if pulse_time <= 0:
return self.MAX_VALUE
except OSError as exception:
# We don't want programs to crash if the HC-SR04 doesn't see anything in range
# So we catch those errors and return 65535 instead
if exception.args[0] == 110: # 110 = ETIMEDOUT
return self.MAX_VALUE
raise exception

# To calculate the distance we get the pulse_time and divide it by 2
# (the pulse walk the distance twice) and by 29.1 becasue
# the sound speed on air (343.2 m/s), that It's equivalent to
# 0.034320 cm/us that is 1cm each 29.1us
self.cms = (pulse_time / 2) / 29.1
self.last_echo_time = time.ticks_us()
Get the most recent distance measurement in centimeters.
Blocks until the first measurement is available, then returns
immediately on all subsequent calls.
"""
while not self._first_reading_done:
pass
return self.cms

def _delay_us(self, delay:int):
"""
Custom implementation of time.sleep_us(), used to get around the bug in MicroPython where time.sleep_us()
Custom implementation of time.sleep_us(), used to get around the bug in MicroPython where time.sleep_us()
doesn't work properly and causes the IDE to hang when uploading the code
"""
start = time.ticks_us()
while time.ticks_diff(time.ticks_us(), start) < delay:
start = self._ticks_us()
while self._ticks_diff(self._ticks_us(), start) < delay:
pass