import os
import atexit
import random
+import struct
import sysv_ipc
DEBUG = False
"Message queue creation failed"
) from sysv_ipc.ExistentialError
+ # Check if OS is 64-bit
+ if struct.calcsize("P") * 8 == 64:
+ libgpiod_filename = "libgpiod_pulsein64"
+ else:
+ libgpiod_filename = "libgpiod_pulsein"
+
dir_path = os.path.dirname(os.path.realpath(__file__))
cmd = [
- dir_path + "/libgpiod_pulsein",
+ dir_path + "/" + libgpiod_filename,
"--pulses",
str(maxlen),
"--queue",
# wait for it to start up
if DEBUG:
print("Waiting for startup success message from subprocess")
- message = self._wait_receive_msg()
+ message = self._wait_receive_msg(timeout=0.25)
if message[0] != b"!":
raise RuntimeError("Could not establish message queue with subprocess")
self._paused = False
# pylint: disable=redefined-builtin
- def _wait_receive_msg(self, timeout=0.25, type=2):
+ def _wait_receive_msg(self, timeout=0, type=2):
"""Internal helper that will wait for new messages of a given type,
and throw an exception on timeout"""
- stamp = time.monotonic()
- while (time.monotonic() - stamp) < timeout:
- try:
- message = self._mq.receive(block=False, type=type)
- return message
- except sysv_ipc.BusyError:
- time.sleep(0.001) # wait a bit then retry!
- # uh-oh timed out
- raise RuntimeError(
- "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
- )
+ if timeout > 0:
+ stamp = time.monotonic()
+ while (time.monotonic() - stamp) < timeout:
+ try:
+ message = self._mq.receive(block=False, type=type)
+ return message
+ except sysv_ipc.BusyError:
+ time.sleep(0.001) # wait a bit then retry!
+ # uh-oh timed out
+ raise RuntimeError(
+ "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
+ )
+ message = self._mq.receive(block=True, type=type)
+ return message
# pylint: enable=redefined-builtin