-import array
+# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
+#
+# SPDX-License-Identifier: MIT
+"""Custom PulseIn Class to read PWM signals"""
import time
import subprocess
-import os, signal
-import traceback
-import signal
-import sysv_ipc
+import os
import atexit
import random
+import struct
+import sysv_ipc
DEBUG = False
queues = []
procs = []
+
# The message queues live outside of python space, and must be formally cleaned!
def final():
"""In case the program is cancelled or quit, we need to clean up the PulseIn
q.remove()
for proc in procs:
proc.terminate()
+
+
atexit.register(final)
+
+# pylint: disable=c-extension-no-member
class PulseIn:
+ """PulseIn Class to read PWM signals"""
+
def __init__(self, pin, maxlen=2, idle_state=False):
"""Create a PulseIn object associated with the given pin.
The object acts as a read-only sequence of pulse lengths with
print("Message Queue Key: ", self._mq.key)
queues.append(self._mq)
except sysv_ipc.ExistentialError:
- raise RuntimeError("Message queue creation failed")
+ raise RuntimeError(
+ "Message queue creation failed"
+ ) from sysv_ipc.ExistentialError
+
+ # Check if OS is 64-bit
+ if struct.calcsize("P") * 8 == 64: # pylint: disable=no-member
+ libgpiod_filename = "libgpiod_pulsein64"
+ else:
+ libgpiod_filename = "libgpiod_pulsein"
dir_path = os.path.dirname(os.path.realpath(__file__))
- cmd = [dir_path+"/libgpiod_pulsein",
- "--pulses", str(maxlen),
- "--queue", str(self._mq.key)]
- if not idle_state:
+ cmd = [
+ dir_path + "/" + libgpiod_filename,
+ "--pulses",
+ str(maxlen),
+ "--queue",
+ str(self._mq.key),
+ ]
+ if idle_state:
cmd.append("-i")
cmd.append("gpiochip0")
cmd.append(str(pin))
if DEBUG:
print(cmd)
-
- self._process = subprocess.Popen(cmd)
+
+ self._process = subprocess.Popen(cmd) # pylint: disable=consider-using-with
procs.append(self._process)
# wait for it to start up
if DEBUG:
print("Waiting for startup success message from subprocess")
- message = self._wait_receive_msg()
- if message[0] != b'!':
+ message = self._wait_receive_msg(timeout=0.25)
+ if message[0] != b"!":
raise RuntimeError("Could not establish message queue with subprocess")
self._paused = False
- def _wait_receive_msg(self, timeout=0.25, type=2):
+ # pylint: disable=redefined-builtin
+ def _wait_receive_msg(self, timeout=0, type=2):
"""Internal helper that will wait for new messages of a given type,
and throw an exception on timeout"""
- stamp = time.monotonic()
- while (time.monotonic() - stamp) < timeout:
- try:
- message = self._mq.receive(block=False, type=2)
- return message
- except sysv_ipc.BusyError:
- time.sleep(0.001) # wait a bit then retry!
- # uh-oh timed out
- raise RuntimeError("Timed out waiting for PulseIn message")
+ if timeout > 0:
+ stamp = time.monotonic()
+ while (time.monotonic() - stamp) < timeout:
+ try:
+ message = self._mq.receive(block=False, type=type)
+ return message
+ except sysv_ipc.BusyError:
+ time.sleep(0.001) # wait a bit then retry!
+ # uh-oh timed out
+ raise RuntimeError(
+ "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
+ )
+ message = self._mq.receive(block=True, type=type)
+ return message
+
+ # pylint: enable=redefined-builtin
def deinit(self):
"""Deinitialises the PulseIn and releases any hardware and software
"""Removes and returns the oldest read pulse."""
self._mq.send("^", True, type=1)
message = self._wait_receive_msg()
- reply = int(message[0].decode('utf-8'))
- #print(reply)
+ reply = int(message[0].decode("utf-8"))
+ # print(reply)
if reply == -1:
raise IndexError("pop from empty list")
return reply
"""Returns the current pulse length"""
self._mq.send("l", True, type=1)
message = self._wait_receive_msg()
- return int(message[0].decode('utf-8'))
+ return int(message[0].decode("utf-8"))
+ # pylint: disable=redefined-builtin
def __getitem__(self, index, type=None):
"""Returns the value at the given index or values in slice."""
self._mq.send("i%d" % index, True, type=1)
message = self._wait_receive_msg()
- ret = int(message[0].decode('utf-8'))
+ ret = int(message[0].decode("utf-8"))
if ret == -1:
raise IndexError("list index out of range")
return ret
+
+ # pylint: enable=redefined-builtin