]> Repositories - Adafruit_Blinka-hackapet.git/blobdiff - src/adafruit_blinka/microcontroller/bcm283x/pulseio/PulseIn.py
Added pre-commit support
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / bcm283x / pulseio / PulseIn.py
index aef942a6c93319a37578bfb503ec07ebb56bbc4a..caf0d62b147d6468a8b98fa829d064936767ca3d 100644 (file)
@@ -1,9 +1,13 @@
+# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
+#
+# SPDX-License-Identifier: MIT
 """Custom PulseIn Class to read PWM signals"""
 import time
 import subprocess
 import os
 import atexit
 import random
 """Custom PulseIn Class to read PWM signals"""
 import time
 import subprocess
 import os
 import atexit
 import random
+import struct
 import sysv_ipc
 
 DEBUG = False
 import sysv_ipc
 
 DEBUG = False
@@ -49,9 +53,15 @@ class PulseIn:
                 "Message queue creation failed"
             ) from sysv_ipc.ExistentialError
 
                 "Message queue creation failed"
             ) from sysv_ipc.ExistentialError
 
+        # Check if OS is 64-bit
+        if struct.calcsize("P") * 8 == 64:  # pylint: disable=no-member
+            libgpiod_filename = "libgpiod_pulsein64"
+        else:
+            libgpiod_filename = "libgpiod_pulsein"
+
         dir_path = os.path.dirname(os.path.realpath(__file__))
         cmd = [
         dir_path = os.path.dirname(os.path.realpath(__file__))
         cmd = [
-            dir_path + "/libgpiod_pulsein",
+            dir_path + "/" + libgpiod_filename,
             "--pulses",
             str(maxlen),
             "--queue",
             "--pulses",
             str(maxlen),
             "--queue",
@@ -64,32 +74,35 @@ class PulseIn:
         if DEBUG:
             print(cmd)
 
         if DEBUG:
             print(cmd)
 
-        self._process = subprocess.Popen(cmd)
+        self._process = subprocess.Popen(cmd)  # pylint: disable=consider-using-with
         procs.append(self._process)
 
         # wait for it to start up
         if DEBUG:
             print("Waiting for startup success message from subprocess")
         procs.append(self._process)
 
         # wait for it to start up
         if DEBUG:
             print("Waiting for startup success message from subprocess")
-        message = self._wait_receive_msg()
+        message = self._wait_receive_msg(timeout=0.25)
         if message[0] != b"!":
             raise RuntimeError("Could not establish message queue with subprocess")
         self._paused = False
 
     # pylint: disable=redefined-builtin
         if message[0] != b"!":
             raise RuntimeError("Could not establish message queue with subprocess")
         self._paused = False
 
     # pylint: disable=redefined-builtin
-    def _wait_receive_msg(self, timeout=0.25, type=2):
+    def _wait_receive_msg(self, timeout=0, type=2):
         """Internal helper that will wait for new messages of a given type,
         and throw an exception on timeout"""
         """Internal helper that will wait for new messages of a given type,
         and throw an exception on timeout"""
-        stamp = time.monotonic()
-        while (time.monotonic() - stamp) < timeout:
-            try:
-                message = self._mq.receive(block=False, type=type)
-                return message
-            except sysv_ipc.BusyError:
-                time.sleep(0.001)  # wait a bit then retry!
-        # uh-oh timed out
-        raise RuntimeError(
-            "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
-        )
+        if timeout > 0:
+            stamp = time.monotonic()
+            while (time.monotonic() - stamp) < timeout:
+                try:
+                    message = self._mq.receive(block=False, type=type)
+                    return message
+                except sysv_ipc.BusyError:
+                    time.sleep(0.001)  # wait a bit then retry!
+            # uh-oh timed out
+            raise RuntimeError(
+                "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
+            )
+        message = self._mq.receive(block=True, type=type)
+        return message
 
     # pylint: enable=redefined-builtin
 
 
     # pylint: enable=redefined-builtin