1 # SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
3 # SPDX-License-Identifier: MIT
4 """Custom PulseIn Class to read PWM signals"""
18 # The message queues live outside of python space, and must be formally cleaned!
20 """In case the program is cancelled or quit, we need to clean up the PulseIn
21 helper process and also the message queue, this is called at exit to do so"""
23 print("Cleaning up message queues", queues)
24 print("Cleaning up processes", procs)
31 atexit.register(final)
34 # pylint: disable=c-extension-no-member
36 """PulseIn Class to read PWM signals"""
38 def __init__(self, pin, maxlen=2, idle_state=False):
39 """Create a PulseIn object associated with the given pin.
40 The object acts as a read-only sequence of pulse lengths with
41 a given max length. When it is active, new pulse lengths are
42 added to the end of the list. When there is no more room
43 (len() == maxlen) the oldest pulse length is removed to make room."""
46 self._idle_state = idle_state
47 self._queue_key = random.randint(1, 9999)
49 self._mq = sysv_ipc.MessageQueue(None, flags=sysv_ipc.IPC_CREX)
51 print("Message Queue Key: ", self._mq.key)
52 queues.append(self._mq)
53 except sysv_ipc.ExistentialError:
55 "Message queue creation failed"
56 ) from sysv_ipc.ExistentialError
58 # Check if OS is 64-bit
59 if struct.calcsize("P") * 8 == 64: # pylint: disable=no-member
60 libgpiod_filename = "libgpiod_pulsein64"
62 libgpiod_filename = "libgpiod_pulsein"
64 dir_path = os.path.dirname(os.path.realpath(__file__))
66 dir_path + "/" + libgpiod_filename,
74 if isinstance(pin.id, tuple):
75 cmd.append(f"gpiochip{pin.id[0]}")
76 cmd.append(str(pin.id[1]))
78 cmd.append("gpiochip0")
83 self._process = subprocess.Popen(cmd) # pylint: disable=consider-using-with
84 procs.append(self._process)
86 # wait for it to start up
88 print("Waiting for startup success message from subprocess")
89 message = self._wait_receive_msg(timeout=0.25)
90 if message[0] != b"!":
91 raise RuntimeError("Could not establish message queue with subprocess")
94 # pylint: disable=redefined-builtin
95 def _wait_receive_msg(self, timeout=0, type=2):
96 """Internal helper that will wait for new messages of a given type,
97 and throw an exception on timeout"""
99 stamp = time.monotonic()
100 while (time.monotonic() - stamp) < timeout:
102 message = self._mq.receive(block=False, type=type)
104 except sysv_ipc.BusyError:
105 time.sleep(0.001) # wait a bit then retry!
108 "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
110 message = self._mq.receive(block=True, type=type)
113 # pylint: enable=redefined-builtin
116 """Deinitialises the PulseIn and releases any hardware and software
117 resources for reuse."""
118 # Clean up after ourselves
119 self._process.terminate()
120 procs.remove(self._process)
122 queues.remove(self._mq)
125 """No-op used by Context Managers."""
128 def __exit__(self, exc_type, exc_value, tb):
129 """Automatically deinitializes the hardware when exiting a context."""
132 def resume(self, trigger_duration=0):
133 """Resumes pulse capture after an optional trigger pulse."""
134 if trigger_duration != 0:
135 self._mq.send("t%d" % trigger_duration, True, type=1)
137 self._mq.send("r", True, type=1)
141 """Pause pulse capture"""
142 self._mq.send("p", True, type=1)
147 """True when pulse capture is paused as a result of pause() or
148 an error during capture such as a signal that is too fast."""
153 """The maximum length of the PulseIn. When len() is equal to maxlen,
154 it is unclear which pulses are active and which are idle."""
158 """Clears all captured pulses"""
159 self._mq.send("c", True, type=1)
162 """Removes and returns the oldest read pulse."""
163 self._mq.send("^", True, type=1)
164 message = self._wait_receive_msg()
165 reply = int(message[0].decode("utf-8"))
168 raise IndexError("pop from empty list")
172 """Returns the current pulse length"""
173 self._mq.send("l", True, type=1)
174 message = self._wait_receive_msg()
175 return int(message[0].decode("utf-8"))
177 # pylint: disable=redefined-builtin
178 def __getitem__(self, index, type=None):
179 """Returns the value at the given index or values in slice."""
180 self._mq.send("i%d" % index, True, type=1)
181 message = self._wait_receive_msg()
182 ret = int(message[0].decode("utf-8"))
184 raise IndexError("list index out of range")
187 # pylint: enable=redefined-builtin