]> Repositories - hackapet/Adafruit_Blinka.git/blobdiff - src/adafruit_blinka/microcontroller/bcm283x/pulseio/PulseIn.py
Fix pulsein for the Pi 5 and make lockable silent
[hackapet/Adafruit_Blinka.git] / src / adafruit_blinka / microcontroller / bcm283x / pulseio / PulseIn.py
index b918c86a02e369c051eb8d069f9373f73daa7eff..78d5761afcfb6df5cc0191f28d8e240c7b945eb9 100644 (file)
@@ -1,17 +1,20 @@
-import array
+# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
+#
+# SPDX-License-Identifier: MIT
+"""Custom PulseIn Class to read PWM signals"""
 import time
 import subprocess
 import time
 import subprocess
-import os, signal
-import traceback
-import signal
-import sysv_ipc
+import os
 import atexit
 import random
 import atexit
 import random
+import struct
+import sysv_ipc
 
 DEBUG = False
 queues = []
 procs = []
 
 
 DEBUG = False
 queues = []
 procs = []
 
+
 # The message queues live outside of python space, and must be formally cleaned!
 def final():
     """In case the program is cancelled or quit, we need to clean up the PulseIn
 # The message queues live outside of python space, and must be formally cleaned!
 def final():
     """In case the program is cancelled or quit, we need to clean up the PulseIn
@@ -23,9 +26,15 @@ def final():
         q.remove()
     for proc in procs:
         proc.terminate()
         q.remove()
     for proc in procs:
         proc.terminate()
+
+
 atexit.register(final)
 
 atexit.register(final)
 
+
+# pylint: disable=c-extension-no-member
 class PulseIn:
 class PulseIn:
+    """PulseIn Class to read PWM signals"""
+
     def __init__(self, pin, maxlen=2, idle_state=False):
         """Create a PulseIn object associated with the given pin.
         The object acts as a read-only sequence of pulse lengths with
     def __init__(self, pin, maxlen=2, idle_state=False):
         """Create a PulseIn object associated with the given pin.
         The object acts as a read-only sequence of pulse lengths with
@@ -42,42 +51,66 @@ class PulseIn:
                 print("Message Queue Key: ", self._mq.key)
             queues.append(self._mq)
         except sysv_ipc.ExistentialError:
                 print("Message Queue Key: ", self._mq.key)
             queues.append(self._mq)
         except sysv_ipc.ExistentialError:
-            raise RuntimeError("Message queue creation failed")
+            raise RuntimeError(
+                "Message queue creation failed"
+            ) from sysv_ipc.ExistentialError
+
+        # Check if OS is 64-bit
+        if struct.calcsize("P") * 8 == 64:  # pylint: disable=no-member
+            libgpiod_filename = "libgpiod_pulsein64"
+        else:
+            libgpiod_filename = "libgpiod_pulsein"
 
         dir_path = os.path.dirname(os.path.realpath(__file__))
 
         dir_path = os.path.dirname(os.path.realpath(__file__))
-        cmd = [dir_path+"/libgpiod_pulsein",
-               "--pulses", str(maxlen),
-               "--queue", str(self._mq.key)]
-        if not idle_state:
+        cmd = [
+            dir_path + "/" + libgpiod_filename,
+            "--pulses",
+            str(maxlen),
+            "--queue",
+            str(self._mq.key),
+        ]
+        if idle_state:
             cmd.append("-i")
             cmd.append("-i")
-        cmd.append("gpiochip0")
-        cmd.append(str(pin))
+        if isinstance(pin.id, tuple):
+            cmd.append(f"gpiochip{pin.id[0]}")
+            cmd.append(str(pin.id[1]))
+        else:
+            cmd.append("gpiochip0")
+            cmd.append(str(pin))
         if DEBUG:
             print(cmd)
         if DEBUG:
             print(cmd)
-        
-        self._process = subprocess.Popen(cmd)
+
+        self._process = subprocess.Popen(cmd)  # pylint: disable=consider-using-with
         procs.append(self._process)
 
         # wait for it to start up
         if DEBUG:
             print("Waiting for startup success message from subprocess")
         procs.append(self._process)
 
         # wait for it to start up
         if DEBUG:
             print("Waiting for startup success message from subprocess")
-        message = self._wait_receive_msg()
-        if message[0] != b'!':
+        message = self._wait_receive_msg(timeout=0.25)
+        if message[0] != b"!":
             raise RuntimeError("Could not establish message queue with subprocess")
         self._paused = False
 
             raise RuntimeError("Could not establish message queue with subprocess")
         self._paused = False
 
-    def _wait_receive_msg(self, timeout=0.25, type=2):
+    # pylint: disable=redefined-builtin
+    def _wait_receive_msg(self, timeout=0, type=2):
         """Internal helper that will wait for new messages of a given type,
         and throw an exception on timeout"""
         """Internal helper that will wait for new messages of a given type,
         and throw an exception on timeout"""
-        stamp = time.monotonic()
-        while (time.monotonic() - stamp) < timeout:
-            try:
-                message = self._mq.receive(block=False, type=2)
-                return message
-            except sysv_ipc.BusyError:
-                time.sleep(0.001) # wait a bit then retry!
-        # uh-oh timed out
-        raise RuntimeError("Timed out waiting for PulseIn message")
+        if timeout > 0:
+            stamp = time.monotonic()
+            while (time.monotonic() - stamp) < timeout:
+                try:
+                    message = self._mq.receive(block=False, type=type)
+                    return message
+                except sysv_ipc.BusyError:
+                    time.sleep(0.001)  # wait a bit then retry!
+            # uh-oh timed out
+            raise RuntimeError(
+                "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
+            )
+        message = self._mq.receive(block=True, type=type)
+        return message
+
+    # pylint: enable=redefined-builtin
 
     def deinit(self):
         """Deinitialises the PulseIn and releases any hardware and software
 
     def deinit(self):
         """Deinitialises the PulseIn and releases any hardware and software
@@ -129,8 +162,8 @@ class PulseIn:
         """Removes and returns the oldest read pulse."""
         self._mq.send("^", True, type=1)
         message = self._wait_receive_msg()
         """Removes and returns the oldest read pulse."""
         self._mq.send("^", True, type=1)
         message = self._wait_receive_msg()
-        reply = int(message[0].decode('utf-8'))
-        #print(reply)
+        reply = int(message[0].decode("utf-8"))
+        # print(reply)
         if reply == -1:
             raise IndexError("pop from empty list")
         return reply
         if reply == -1:
             raise IndexError("pop from empty list")
         return reply
@@ -139,13 +172,16 @@ class PulseIn:
         """Returns the current pulse length"""
         self._mq.send("l", True, type=1)
         message = self._wait_receive_msg()
         """Returns the current pulse length"""
         self._mq.send("l", True, type=1)
         message = self._wait_receive_msg()
-        return int(message[0].decode('utf-8'))
+        return int(message[0].decode("utf-8"))
 
 
+    # pylint: disable=redefined-builtin
     def __getitem__(self, index, type=None):
         """Returns the value at the given index or values in slice."""
         self._mq.send("i%d" % index, True, type=1)
         message = self._wait_receive_msg()
     def __getitem__(self, index, type=None):
         """Returns the value at the given index or values in slice."""
         self._mq.send("i%d" % index, True, type=1)
         message = self._wait_receive_msg()
-        ret = int(message[0].decode('utf-8'))
+        ret = int(message[0].decode("utf-8"))
         if ret == -1:
             raise IndexError("list index out of range")
         return ret
         if ret == -1:
             raise IndexError("list index out of range")
         return ret
+
+    # pylint: enable=redefined-builtin