-import array
+"""Custom PulseIn Class to read PWM signals"""
import time
import subprocess
-import os, signal
-import traceback
-import signal
-import sysv_ipc
+import os
import atexit
import random
+import sysv_ipc
DEBUG = False
queues = []
q.remove()
for proc in procs:
proc.terminate()
+
+
atexit.register(final)
+# pylint: disable=c-extension-no-member
class PulseIn:
+ """PulseIn Class to read PWM signals"""
+
def __init__(self, pin, maxlen=2, idle_state=False):
"""Create a PulseIn object associated with the given pin.
The object acts as a read-only sequence of pulse lengths with
raise RuntimeError("Message queue creation failed")
dir_path = os.path.dirname(os.path.realpath(__file__))
- cmd = [dir_path+"/libgpiod_pulsein",
- "--pulses", str(maxlen),
- "--queue", str(self._mq.key)]
- if not idle_state:
+ cmd = [
+ dir_path + "/libgpiod_pulsein",
+ "--pulses",
+ str(maxlen),
+ "--queue",
+ str(self._mq.key),
+ ]
+ if idle_state:
cmd.append("-i")
cmd.append("gpiochip0")
cmd.append(str(pin))
if DEBUG:
print(cmd)
-
+
self._process = subprocess.Popen(cmd)
procs.append(self._process)
if DEBUG:
print("Waiting for startup success message from subprocess")
message = self._wait_receive_msg()
- if message[0] != b'!':
+ if message[0] != b"!":
raise RuntimeError("Could not establish message queue with subprocess")
self._paused = False
+ # pylint: disable=redefined-builtin
def _wait_receive_msg(self, timeout=0.25, type=2):
"""Internal helper that will wait for new messages of a given type,
and throw an exception on timeout"""
stamp = time.monotonic()
while (time.monotonic() - stamp) < timeout:
try:
- message = self._mq.receive(block=False, type=2)
+ message = self._mq.receive(block=False, type=type)
return message
except sysv_ipc.BusyError:
- time.sleep(0.001) # wait a bit then retry!
+ time.sleep(0.001) # wait a bit then retry!
# uh-oh timed out
raise RuntimeError("Timed out waiting for PulseIn message")
+ # pylint: enable=redefined-builtin
+
def deinit(self):
"""Deinitialises the PulseIn and releases any hardware and software
resources for reuse."""
"""Removes and returns the oldest read pulse."""
self._mq.send("^", True, type=1)
message = self._wait_receive_msg()
- reply = int(message[0].decode('utf-8'))
- #print(reply)
+ reply = int(message[0].decode("utf-8"))
+ # print(reply)
if reply == -1:
raise IndexError("pop from empty list")
return reply
"""Returns the current pulse length"""
self._mq.send("l", True, type=1)
message = self._wait_receive_msg()
- return int(message[0].decode('utf-8'))
+ return int(message[0].decode("utf-8"))
+ # pylint: disable=redefined-builtin
def __getitem__(self, index, type=None):
"""Returns the value at the given index or values in slice."""
self._mq.send("i%d" % index, True, type=1)
message = self._wait_receive_msg()
- ret = int(message[0].decode('utf-8'))
+ ret = int(message[0].decode("utf-8"))
if ret == -1:
raise IndexError("list index out of range")
return ret
+
+ # pylint: enable=redefined-builtin