+++ /dev/null
-"""Custom PulseIn Class to read PWM signals"""
-import time
-import subprocess
-import os
-import atexit
-import random
-import struct
-import sysv_ipc
-
-DEBUG = False
-queues = []
-procs = []
-
-# The message queues live outside of python space, and must be formally cleaned!
-def final():
- """In case the program is cancelled or quit, we need to clean up the PulseIn
- helper process and also the message queue, this is called at exit to do so"""
- if DEBUG:
- print("Cleaning up message queues", queues)
- print("Cleaning up processes", procs)
- for q in queues:
- q.remove()
- for proc in procs:
- proc.terminate()
-
-
-atexit.register(final)
-
-# pylint: disable=c-extension-no-member
-class PulseIn:
- """PulseIn Class to read PWM signals"""
-
- def __init__(self, pin, maxlen=2, idle_state=False):
- """Create a PulseIn object associated with the given pin.
- The object acts as a read-only sequence of pulse lengths with
- a given max length. When it is active, new pulse lengths are
- added to the end of the list. When there is no more room
- (len() == maxlen) the oldest pulse length is removed to make room."""
- self._pin = pin
- self._maxlen = maxlen
- self._idle_state = idle_state
- self._queue_key = random.randint(1, 9999)
- try:
- self._mq = sysv_ipc.MessageQueue(None, flags=sysv_ipc.IPC_CREX)
- if DEBUG:
- print("Message Queue Key: ", self._mq.key)
- queues.append(self._mq)
- except sysv_ipc.ExistentialError:
- raise RuntimeError(
- "Message queue creation failed"
- ) from sysv_ipc.ExistentialError
-
- # Check if OS is 64-bit
- if struct.calcsize("P") * 8 == 64:
- libgpiod_filename = "libgpiod_pulsein64"
- else:
- libgpiod_filename = "libgpiod_pulsein"
-
- dir_path = os.path.dirname(os.path.realpath(__file__))
- cmd = [
- dir_path + "/" + libgpiod_filename,
- "--pulses",
- str(maxlen),
- "--queue",
- str(self._mq.key),
- ]
- if idle_state:
- cmd.append("-i")
- cmd.append("gpiochip0")
- cmd.append(str(pin))
- if DEBUG:
- print(cmd)
-
- self._process = subprocess.Popen(cmd)
- procs.append(self._process)
-
- # wait for it to start up
- if DEBUG:
- print("Waiting for startup success message from subprocess")
- message = self._wait_receive_msg(timeout=0.25)
- if message[0] != b"!":
- raise RuntimeError("Could not establish message queue with subprocess")
- self._paused = False
-
- # pylint: disable=redefined-builtin
- def _wait_receive_msg(self, timeout=0, type=2):
- """Internal helper that will wait for new messages of a given type,
- and throw an exception on timeout"""
- if timeout > 0:
- stamp = time.monotonic()
- while (time.monotonic() - stamp) < timeout:
- try:
- message = self._mq.receive(block=False, type=type)
- return message
- except sysv_ipc.BusyError:
- time.sleep(0.001) # wait a bit then retry!
- # uh-oh timed out
- raise RuntimeError(
- "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
- )
- message = self._mq.receive(block=True, type=type)
- return message
-
- # pylint: enable=redefined-builtin
-
- def deinit(self):
- """Deinitialises the PulseIn and releases any hardware and software
- resources for reuse."""
- # Clean up after ourselves
- self._process.terminate()
- procs.remove(self._process)
- self._mq.remove()
- queues.remove(self._mq)
-
- def __enter__(self):
- """No-op used by Context Managers."""
- return self
-
- def __exit__(self, exc_type, exc_value, tb):
- """Automatically deinitializes the hardware when exiting a context."""
- self.deinit()
-
- def resume(self, trigger_duration=0):
- """Resumes pulse capture after an optional trigger pulse."""
- if trigger_duration != 0:
- self._mq.send("t%d" % trigger_duration, True, type=1)
- else:
- self._mq.send("r", True, type=1)
- self._paused = False
-
- def pause(self):
- """Pause pulse capture"""
- self._mq.send("p", True, type=1)
- self._paused = True
-
- @property
- def paused(self):
- """True when pulse capture is paused as a result of pause() or
- an error during capture such as a signal that is too fast."""
- return self._paused
-
- @property
- def maxlen(self):
- """The maximum length of the PulseIn. When len() is equal to maxlen,
- it is unclear which pulses are active and which are idle."""
- return self._maxlen
-
- def clear(self):
- """Clears all captured pulses"""
- self._mq.send("c", True, type=1)
-
- def popleft(self):
- """Removes and returns the oldest read pulse."""
- self._mq.send("^", True, type=1)
- message = self._wait_receive_msg()
- reply = int(message[0].decode("utf-8"))
- # print(reply)
- if reply == -1:
- raise IndexError("pop from empty list")
- return reply
-
- def __len__(self):
- """Returns the current pulse length"""
- self._mq.send("l", True, type=1)
- message = self._wait_receive_msg()
- return int(message[0].decode("utf-8"))
-
- # pylint: disable=redefined-builtin
- def __getitem__(self, index, type=None):
- """Returns the value at the given index or values in slice."""
- self._mq.send("i%d" % index, True, type=1)
- message = self._wait_receive_msg()
- ret = int(message[0].decode("utf-8"))
- if ret == -1:
- raise IndexError("list index out of range")
- return ret
-
- # pylint: enable=redefined-builtin