X-Git-Url: https://git.ayoreis.com/hackapet/Adafruit_Blinka.git/blobdiff_plain/a65d8d4305b542bfba8b6942c94c789f258fcf88..b28dac1baa0ed7040cae5b75079b1bcbc504f511:/src/adafruit_blinka/microcontroller/bcm2711/pulseio/PulseIn.py diff --git a/src/adafruit_blinka/microcontroller/bcm2711/pulseio/PulseIn.py b/src/adafruit_blinka/microcontroller/bcm2711/pulseio/PulseIn.py deleted file mode 100644 index 700d8de..0000000 --- a/src/adafruit_blinka/microcontroller/bcm2711/pulseio/PulseIn.py +++ /dev/null @@ -1,178 +0,0 @@ -"""Custom PulseIn Class to read PWM signals""" -import time -import subprocess -import os -import atexit -import random -import struct -import sysv_ipc - -DEBUG = False -queues = [] -procs = [] - -# The message queues live outside of python space, and must be formally cleaned! -def final(): - """In case the program is cancelled or quit, we need to clean up the PulseIn - helper process and also the message queue, this is called at exit to do so""" - if DEBUG: - print("Cleaning up message queues", queues) - print("Cleaning up processes", procs) - for q in queues: - q.remove() - for proc in procs: - proc.terminate() - - -atexit.register(final) - -# pylint: disable=c-extension-no-member -class PulseIn: - """PulseIn Class to read PWM signals""" - - def __init__(self, pin, maxlen=2, idle_state=False): - """Create a PulseIn object associated with the given pin. - The object acts as a read-only sequence of pulse lengths with - a given max length. When it is active, new pulse lengths are - added to the end of the list. When there is no more room - (len() == maxlen) the oldest pulse length is removed to make room.""" - self._pin = pin - self._maxlen = maxlen - self._idle_state = idle_state - self._queue_key = random.randint(1, 9999) - try: - self._mq = sysv_ipc.MessageQueue(None, flags=sysv_ipc.IPC_CREX) - if DEBUG: - print("Message Queue Key: ", self._mq.key) - queues.append(self._mq) - except sysv_ipc.ExistentialError: - raise RuntimeError( - "Message queue creation failed" - ) from sysv_ipc.ExistentialError - - # Check if OS is 64-bit - if struct.calcsize("P") * 8 == 64: - libgpiod_filename = "libgpiod_pulsein64" - else: - libgpiod_filename = "libgpiod_pulsein" - - dir_path = os.path.dirname(os.path.realpath(__file__)) - cmd = [ - dir_path + "/" + libgpiod_filename, - "--pulses", - str(maxlen), - "--queue", - str(self._mq.key), - ] - if idle_state: - cmd.append("-i") - cmd.append("gpiochip0") - cmd.append(str(pin)) - if DEBUG: - print(cmd) - - self._process = subprocess.Popen(cmd) - procs.append(self._process) - - # wait for it to start up - if DEBUG: - print("Waiting for startup success message from subprocess") - message = self._wait_receive_msg(timeout=0.25) - if message[0] != b"!": - raise RuntimeError("Could not establish message queue with subprocess") - self._paused = False - - # pylint: disable=redefined-builtin - def _wait_receive_msg(self, timeout=0, type=2): - """Internal helper that will wait for new messages of a given type, - and throw an exception on timeout""" - if timeout > 0: - stamp = time.monotonic() - while (time.monotonic() - stamp) < timeout: - try: - message = self._mq.receive(block=False, type=type) - return message - except sysv_ipc.BusyError: - time.sleep(0.001) # wait a bit then retry! - # uh-oh timed out - raise RuntimeError( - "Timed out waiting for PulseIn message. Make sure libgpiod is installed." - ) - message = self._mq.receive(block=True, type=type) - return message - - # pylint: enable=redefined-builtin - - def deinit(self): - """Deinitialises the PulseIn and releases any hardware and software - resources for reuse.""" - # Clean up after ourselves - self._process.terminate() - procs.remove(self._process) - self._mq.remove() - queues.remove(self._mq) - - def __enter__(self): - """No-op used by Context Managers.""" - return self - - def __exit__(self, exc_type, exc_value, tb): - """Automatically deinitializes the hardware when exiting a context.""" - self.deinit() - - def resume(self, trigger_duration=0): - """Resumes pulse capture after an optional trigger pulse.""" - if trigger_duration != 0: - self._mq.send("t%d" % trigger_duration, True, type=1) - else: - self._mq.send("r", True, type=1) - self._paused = False - - def pause(self): - """Pause pulse capture""" - self._mq.send("p", True, type=1) - self._paused = True - - @property - def paused(self): - """True when pulse capture is paused as a result of pause() or - an error during capture such as a signal that is too fast.""" - return self._paused - - @property - def maxlen(self): - """The maximum length of the PulseIn. When len() is equal to maxlen, - it is unclear which pulses are active and which are idle.""" - return self._maxlen - - def clear(self): - """Clears all captured pulses""" - self._mq.send("c", True, type=1) - - def popleft(self): - """Removes and returns the oldest read pulse.""" - self._mq.send("^", True, type=1) - message = self._wait_receive_msg() - reply = int(message[0].decode("utf-8")) - # print(reply) - if reply == -1: - raise IndexError("pop from empty list") - return reply - - def __len__(self): - """Returns the current pulse length""" - self._mq.send("l", True, type=1) - message = self._wait_receive_msg() - return int(message[0].decode("utf-8")) - - # pylint: disable=redefined-builtin - def __getitem__(self, index, type=None): - """Returns the value at the given index or values in slice.""" - self._mq.send("i%d" % index, True, type=1) - message = self._wait_receive_msg() - ret = int(message[0].decode("utf-8")) - if ret == -1: - raise IndexError("list index out of range") - return ret - - # pylint: enable=redefined-builtin