]> Repositories - hackapet/Adafruit_Blinka.git/blob - src/adafruit_blinka/microcontroller/bcm283x/pulseio/PulseIn.py
Run pre-commit and fix issues
[hackapet/Adafruit_Blinka.git] / src / adafruit_blinka / microcontroller / bcm283x / pulseio / PulseIn.py
1 # SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
2 #
3 # SPDX-License-Identifier: MIT
4 """Custom PulseIn Class to read PWM signals"""
5 import time
6 import subprocess
7 import os
8 import atexit
9 import random
10 import struct
11 import sysv_ipc
12
13 DEBUG = False
14 queues = []
15 procs = []
16
17
18 # The message queues live outside of python space, and must be formally cleaned!
19 def final():
20     """In case the program is cancelled or quit, we need to clean up the PulseIn
21     helper process and also the message queue, this is called at exit to do so"""
22     if DEBUG:
23         print("Cleaning up message queues", queues)
24         print("Cleaning up processes", procs)
25     for q in queues:
26         q.remove()
27     for proc in procs:
28         proc.terminate()
29
30
31 atexit.register(final)
32
33
34 # pylint: disable=c-extension-no-member
35 class PulseIn:
36     """PulseIn Class to read PWM signals"""
37
38     def __init__(self, pin, maxlen=2, idle_state=False):
39         """Create a PulseIn object associated with the given pin.
40         The object acts as a read-only sequence of pulse lengths with
41         a given max length. When it is active, new pulse lengths are
42         added to the end of the list. When there is no more room
43         (len() == maxlen) the oldest pulse length is removed to make room."""
44         self._pin = pin
45         self._maxlen = maxlen
46         self._idle_state = idle_state
47         self._queue_key = random.randint(1, 9999)
48         try:
49             self._mq = sysv_ipc.MessageQueue(None, flags=sysv_ipc.IPC_CREX)
50             if DEBUG:
51                 print("Message Queue Key: ", self._mq.key)
52             queues.append(self._mq)
53         except sysv_ipc.ExistentialError:
54             raise RuntimeError(
55                 "Message queue creation failed"
56             ) from sysv_ipc.ExistentialError
57
58         # Check if OS is 64-bit
59         if struct.calcsize("P") * 8 == 64:  # pylint: disable=no-member
60             libgpiod_filename = "libgpiod_pulsein64"
61         else:
62             libgpiod_filename = "libgpiod_pulsein"
63
64         dir_path = os.path.dirname(os.path.realpath(__file__))
65         cmd = [
66             dir_path + "/" + libgpiod_filename,
67             "--pulses",
68             str(maxlen),
69             "--queue",
70             str(self._mq.key),
71         ]
72         if idle_state:
73             cmd.append("-i")
74         if isinstance(pin.id, tuple):
75             cmd.append(f"gpiochip{pin.id[0]}")
76             cmd.append(str(pin.id[1]))
77         else:
78             cmd.append("gpiochip0")
79             cmd.append(str(pin))
80         if DEBUG:
81             print(cmd)
82
83         self._process = subprocess.Popen(cmd)  # pylint: disable=consider-using-with
84         procs.append(self._process)
85
86         # wait for it to start up
87         if DEBUG:
88             print("Waiting for startup success message from subprocess")
89         message = self._wait_receive_msg(timeout=0.25)
90         if message[0] != b"!":
91             raise RuntimeError("Could not establish message queue with subprocess")
92         self._paused = False
93
94     # pylint: disable=redefined-builtin
95     def _wait_receive_msg(self, timeout=0, type=2):
96         """Internal helper that will wait for new messages of a given type,
97         and throw an exception on timeout"""
98         if timeout > 0:
99             stamp = time.monotonic()
100             while (time.monotonic() - stamp) < timeout:
101                 try:
102                     message = self._mq.receive(block=False, type=type)
103                     return message
104                 except sysv_ipc.BusyError:
105                     time.sleep(0.001)  # wait a bit then retry!
106             # uh-oh timed out
107             raise RuntimeError(
108                 "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
109             )
110         message = self._mq.receive(block=True, type=type)
111         return message
112
113     # pylint: enable=redefined-builtin
114
115     def deinit(self):
116         """Deinitialises the PulseIn and releases any hardware and software
117         resources for reuse."""
118         # Clean up after ourselves
119         self._process.terminate()
120         procs.remove(self._process)
121         self._mq.remove()
122         queues.remove(self._mq)
123
124     def __enter__(self):
125         """No-op used by Context Managers."""
126         return self
127
128     def __exit__(self, exc_type, exc_value, tb):
129         """Automatically deinitializes the hardware when exiting a context."""
130         self.deinit()
131
132     def resume(self, trigger_duration=0):
133         """Resumes pulse capture after an optional trigger pulse."""
134         if trigger_duration != 0:
135             self._mq.send("t%d" % trigger_duration, True, type=1)
136         else:
137             self._mq.send("r", True, type=1)
138         self._paused = False
139
140     def pause(self):
141         """Pause pulse capture"""
142         self._mq.send("p", True, type=1)
143         self._paused = True
144
145     @property
146     def paused(self):
147         """True when pulse capture is paused as a result of pause() or
148         an error during capture such as a signal that is too fast."""
149         return self._paused
150
151     @property
152     def maxlen(self):
153         """The maximum length of the PulseIn. When len() is equal to maxlen,
154         it is unclear which pulses are active and which are idle."""
155         return self._maxlen
156
157     def clear(self):
158         """Clears all captured pulses"""
159         self._mq.send("c", True, type=1)
160
161     def popleft(self):
162         """Removes and returns the oldest read pulse."""
163         self._mq.send("^", True, type=1)
164         message = self._wait_receive_msg()
165         reply = int(message[0].decode("utf-8"))
166         # print(reply)
167         if reply == -1:
168             raise IndexError("pop from empty list")
169         return reply
170
171     def __len__(self):
172         """Returns the current pulse length"""
173         self._mq.send("l", True, type=1)
174         message = self._wait_receive_msg()
175         return int(message[0].decode("utf-8"))
176
177     # pylint: disable=redefined-builtin
178     def __getitem__(self, index, type=None):
179         """Returns the value at the given index or values in slice."""
180         self._mq.send("i%d" % index, True, type=1)
181         message = self._wait_receive_msg()
182         ret = int(message[0].decode("utf-8"))
183         if ret == -1:
184             raise IndexError("list index out of range")
185         return ret
186
187     # pylint: enable=redefined-builtin