1 # SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
 
   3 # SPDX-License-Identifier: MIT
 
   4 """Custom PulseIn Class to read PWM signals"""
 
  18 # The message queues live outside of python space, and must be formally cleaned!
 
  20     """In case the program is cancelled or quit, we need to clean up the PulseIn
 
  21     helper process and also the message queue, this is called at exit to do so"""
 
  23         print("Cleaning up message queues", queues)
 
  24         print("Cleaning up processes", procs)
 
  31 atexit.register(final)
 
  34 # pylint: disable=c-extension-no-member
 
  36     """PulseIn Class to read PWM signals"""
 
  38     def __init__(self, pin, maxlen=2, idle_state=False):
 
  39         """Create a PulseIn object associated with the given pin.
 
  40         The object acts as a read-only sequence of pulse lengths with
 
  41         a given max length. When it is active, new pulse lengths are
 
  42         added to the end of the list. When there is no more room
 
  43         (len() == maxlen) the oldest pulse length is removed to make room."""
 
  46         self._idle_state = idle_state
 
  47         self._queue_key = random.randint(1, 9999)
 
  49             self._mq = sysv_ipc.MessageQueue(None, flags=sysv_ipc.IPC_CREX)
 
  51                 print("Message Queue Key: ", self._mq.key)
 
  52             queues.append(self._mq)
 
  53         except sysv_ipc.ExistentialError:
 
  55                 "Message queue creation failed"
 
  56             ) from sysv_ipc.ExistentialError
 
  58         # Check if OS is 64-bit
 
  59         if struct.calcsize("P") * 8 == 64:  # pylint: disable=no-member
 
  60             libgpiod_filename = "libgpiod_pulsein64"
 
  62             libgpiod_filename = "libgpiod_pulsein"
 
  64         dir_path = os.path.dirname(os.path.realpath(__file__))
 
  66             dir_path + "/" + libgpiod_filename,
 
  74         if isinstance(pin.id, tuple):
 
  75             cmd.append(f"gpiochip{pin.id[0]}")
 
  76             cmd.append(str(pin.id[1]))
 
  78             cmd.append("gpiochip0")
 
  83         self._process = subprocess.Popen(cmd)  # pylint: disable=consider-using-with
 
  84         procs.append(self._process)
 
  86         # wait for it to start up
 
  88             print("Waiting for startup success message from subprocess")
 
  89         message = self._wait_receive_msg(timeout=0.25)
 
  90         if message[0] != b"!":
 
  91             raise RuntimeError("Could not establish message queue with subprocess")
 
  94     # pylint: disable=redefined-builtin
 
  95     def _wait_receive_msg(self, timeout=0, type=2):
 
  96         """Internal helper that will wait for new messages of a given type,
 
  97         and throw an exception on timeout"""
 
  99             stamp = time.monotonic()
 
 100             while (time.monotonic() - stamp) < timeout:
 
 102                     message = self._mq.receive(block=False, type=type)
 
 104                 except sysv_ipc.BusyError:
 
 105                     time.sleep(0.001)  # wait a bit then retry!
 
 108                 "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
 
 110         message = self._mq.receive(block=True, type=type)
 
 113     # pylint: enable=redefined-builtin
 
 116         """Deinitialises the PulseIn and releases any hardware and software
 
 117         resources for reuse."""
 
 118         # Clean up after ourselves
 
 119         self._process.terminate()
 
 120         procs.remove(self._process)
 
 122         queues.remove(self._mq)
 
 125         """No-op used by Context Managers."""
 
 128     def __exit__(self, exc_type, exc_value, tb):
 
 129         """Automatically deinitializes the hardware when exiting a context."""
 
 132     def resume(self, trigger_duration=0):
 
 133         """Resumes pulse capture after an optional trigger pulse."""
 
 134         if trigger_duration != 0:
 
 135             self._mq.send("t%d" % trigger_duration, True, type=1)
 
 137             self._mq.send("r", True, type=1)
 
 141         """Pause pulse capture"""
 
 142         self._mq.send("p", True, type=1)
 
 147         """True when pulse capture is paused as a result of pause() or
 
 148         an error during capture such as a signal that is too fast."""
 
 153         """The maximum length of the PulseIn. When len() is equal to maxlen,
 
 154         it is unclear which pulses are active and which are idle."""
 
 158         """Clears all captured pulses"""
 
 159         self._mq.send("c", True, type=1)
 
 162         """Removes and returns the oldest read pulse."""
 
 163         self._mq.send("^", True, type=1)
 
 164         message = self._wait_receive_msg()
 
 165         reply = int(message[0].decode("utf-8"))
 
 168             raise IndexError("pop from empty list")
 
 172         """Returns the current pulse length"""
 
 173         self._mq.send("l", True, type=1)
 
 174         message = self._wait_receive_msg()
 
 175         return int(message[0].decode("utf-8"))
 
 177     # pylint: disable=redefined-builtin
 
 178     def __getitem__(self, index, type=None):
 
 179         """Returns the value at the given index or values in slice."""
 
 180         self._mq.send("i%d" % index, True, type=1)
 
 181         message = self._wait_receive_msg()
 
 182         ret = int(message[0].decode("utf-8"))
 
 184             raise IndexError("list index out of range")
 
 187     # pylint: enable=redefined-builtin