X-Git-Url: https://git.ayoreis.com/hackapet/Adafruit_Blinka.git/blobdiff_plain/6586729b6b4c230875ff9ed18f4dd01aa6caff87..fa5afb5e90e6fa672ce2b9af3c7b8cf7a58b34c6:/src/adafruit_blinka/microcontroller/bcm283x/pulseio/PulseIn.py diff --git a/src/adafruit_blinka/microcontroller/bcm283x/pulseio/PulseIn.py b/src/adafruit_blinka/microcontroller/bcm283x/pulseio/PulseIn.py index b918c86..19dc98f 100644 --- a/src/adafruit_blinka/microcontroller/bcm283x/pulseio/PulseIn.py +++ b/src/adafruit_blinka/microcontroller/bcm283x/pulseio/PulseIn.py @@ -1,12 +1,10 @@ -import array +"""Custom PulseIn Class to read PWM signals""" import time import subprocess -import os, signal -import traceback -import signal -import sysv_ipc +import os import atexit import random +import sysv_ipc DEBUG = False queues = [] @@ -23,9 +21,14 @@ def final(): q.remove() for proc in procs: proc.terminate() + + atexit.register(final) +# pylint: disable=c-extension-no-member class PulseIn: + """PulseIn Class to read PWM signals""" + def __init__(self, pin, maxlen=2, idle_state=False): """Create a PulseIn object associated with the given pin. The object acts as a read-only sequence of pulse lengths with @@ -42,42 +45,56 @@ class PulseIn: print("Message Queue Key: ", self._mq.key) queues.append(self._mq) except sysv_ipc.ExistentialError: - raise RuntimeError("Message queue creation failed") + raise RuntimeError( + "Message queue creation failed" + ) from sysv_ipc.ExistentialError dir_path = os.path.dirname(os.path.realpath(__file__)) - cmd = [dir_path+"/libgpiod_pulsein", - "--pulses", str(maxlen), - "--queue", str(self._mq.key)] - if not idle_state: + cmd = [ + dir_path + "/libgpiod_pulsein", + "--pulses", + str(maxlen), + "--queue", + str(self._mq.key), + ] + if idle_state: cmd.append("-i") cmd.append("gpiochip0") cmd.append(str(pin)) if DEBUG: print(cmd) - + self._process = subprocess.Popen(cmd) procs.append(self._process) # wait for it to start up if DEBUG: print("Waiting for startup success message from subprocess") - message = self._wait_receive_msg() - if message[0] != b'!': + message = self._wait_receive_msg(timeout=0.25) + if message[0] != b"!": raise RuntimeError("Could not establish message queue with subprocess") self._paused = False - def _wait_receive_msg(self, timeout=0.25, type=2): + # pylint: disable=redefined-builtin + def _wait_receive_msg(self, timeout=0, type=2): """Internal helper that will wait for new messages of a given type, and throw an exception on timeout""" - stamp = time.monotonic() - while (time.monotonic() - stamp) < timeout: - try: - message = self._mq.receive(block=False, type=2) - return message - except sysv_ipc.BusyError: - time.sleep(0.001) # wait a bit then retry! - # uh-oh timed out - raise RuntimeError("Timed out waiting for PulseIn message") + if timeout > 0: + stamp = time.monotonic() + while (time.monotonic() - stamp) < timeout: + try: + message = self._mq.receive(block=False, type=type) + return message + except sysv_ipc.BusyError: + time.sleep(0.001) # wait a bit then retry! + # uh-oh timed out + raise RuntimeError( + "Timed out waiting for PulseIn message. Make sure libgpiod is installed." + ) + message = self._mq.receive(block=True, type=type) + return message + + # pylint: enable=redefined-builtin def deinit(self): """Deinitialises the PulseIn and releases any hardware and software @@ -129,8 +146,8 @@ class PulseIn: """Removes and returns the oldest read pulse.""" self._mq.send("^", True, type=1) message = self._wait_receive_msg() - reply = int(message[0].decode('utf-8')) - #print(reply) + reply = int(message[0].decode("utf-8")) + # print(reply) if reply == -1: raise IndexError("pop from empty list") return reply @@ -139,13 +156,16 @@ class PulseIn: """Returns the current pulse length""" self._mq.send("l", True, type=1) message = self._wait_receive_msg() - return int(message[0].decode('utf-8')) + return int(message[0].decode("utf-8")) + # pylint: disable=redefined-builtin def __getitem__(self, index, type=None): """Returns the value at the given index or values in slice.""" self._mq.send("i%d" % index, True, type=1) message = self._wait_receive_msg() - ret = int(message[0].decode('utf-8')) + ret = int(message[0].decode("utf-8")) if ret == -1: raise IndexError("list index out of range") return ret + + # pylint: enable=redefined-builtin