]> Repositories - Adafruit_Blinka-hackapet.git/blob - src/adafruit_blinka/microcontroller/amlogic/a311d/pulseio/PulseIn.py
Merge pull request #621 from twa127/master
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / amlogic / a311d / pulseio / PulseIn.py
1 # SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
2 #
3 # SPDX-License-Identifier: MIT
4 """Custom PulseIn Class to read PWM signals"""
5 import time
6 import subprocess
7 import os
8 import atexit
9 import random
10 import struct
11 import sysv_ipc
12
13 DEBUG = False
14 queues = []
15 procs = []
16
17 # The message queues live outside of python space, and must be formally cleaned!
18 def final():
19     """In case the program is cancelled or quit, we need to clean up the PulseIn
20     helper process and also the message queue, this is called at exit to do so"""
21     if DEBUG:
22         print("Cleaning up message queues", queues)
23         print("Cleaning up processes", procs)
24     for q in queues:
25         q.remove()
26     for proc in procs:
27         proc.terminate()
28
29
30 atexit.register(final)
31
32 # pylint: disable=c-extension-no-member
33 class PulseIn:
34     """PulseIn Class to read PWM signals"""
35
36     def __init__(self, pin, maxlen=2, idle_state=False):
37         """Create a PulseIn object associated with the given pin.
38         The object acts as a read-only sequence of pulse lengths with
39         a given max length. When it is active, new pulse lengths are
40         added to the end of the list. When there is no more room
41         (len() == maxlen) the oldest pulse length is removed to make room."""
42         self._pin = pin
43         self._maxlen = maxlen
44         self._idle_state = idle_state
45         self._queue_key = random.randint(1, 9999)
46         try:
47             self._mq = sysv_ipc.MessageQueue(None, flags=sysv_ipc.IPC_CREX)
48             if DEBUG:
49                 print("Message Queue Key: ", self._mq.key)
50             queues.append(self._mq)
51         except sysv_ipc.ExistentialError:
52             raise RuntimeError(
53                 "Message queue creation failed"
54             ) from sysv_ipc.ExistentialError
55
56         # Check if OS is 64-bit
57         if struct.calcsize("P") * 8 == 64:  # pylint: disable=no-member
58             libgpiod_filename = "libgpiod_pulsein64"
59         else:
60             libgpiod_filename = "libgpiod_pulsein"
61
62         dir_path = os.path.dirname(os.path.realpath(__file__))
63         cmd = [
64             dir_path + "/" + libgpiod_filename,
65             "--pulses",
66             str(maxlen),
67             "--queue",
68             str(self._mq.key),
69         ]
70         if idle_state:
71             cmd.append("-i")
72         cmd.append("gpiochip0")
73         cmd.append(str(pin))
74         if DEBUG:
75             print(cmd)
76
77         self._process = subprocess.Popen(cmd)  # pylint: disable=consider-using-with
78         procs.append(self._process)
79
80         # wait for it to start up
81         if DEBUG:
82             print("Waiting for startup success message from subprocess")
83         message = self._wait_receive_msg(timeout=0.25)
84         if message[0] != b"!":
85             raise RuntimeError("Could not establish message queue with subprocess")
86         self._paused = False
87
88     # pylint: disable=redefined-builtin
89     def _wait_receive_msg(self, timeout=0, type=2):
90         """Internal helper that will wait for new messages of a given type,
91         and throw an exception on timeout"""
92         if timeout > 0:
93             stamp = time.monotonic()
94             while (time.monotonic() - stamp) < timeout:
95                 try:
96                     message = self._mq.receive(block=False, type=type)
97                     return message
98                 except sysv_ipc.BusyError:
99                     time.sleep(0.001)  # wait a bit then retry!
100             # uh-oh timed out
101             raise RuntimeError(
102                 "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
103             )
104         message = self._mq.receive(block=True, type=type)
105         return message
106
107     # pylint: enable=redefined-builtin
108
109     def deinit(self):
110         """Deinitialises the PulseIn and releases any hardware and software
111         resources for reuse."""
112         # Clean up after ourselves
113         self._process.terminate()
114         procs.remove(self._process)
115         self._mq.remove()
116         queues.remove(self._mq)
117
118     def __enter__(self):
119         """No-op used by Context Managers."""
120         return self
121
122     def __exit__(self, exc_type, exc_value, tb):
123         """Automatically deinitializes the hardware when exiting a context."""
124         self.deinit()
125
126     def resume(self, trigger_duration=0):
127         """Resumes pulse capture after an optional trigger pulse."""
128         if trigger_duration != 0:
129             self._mq.send("t%d" % trigger_duration, True, type=1)
130         else:
131             self._mq.send("r", True, type=1)
132         self._paused = False
133
134     def pause(self):
135         """Pause pulse capture"""
136         self._mq.send("p", True, type=1)
137         self._paused = True
138
139     @property
140     def paused(self):
141         """True when pulse capture is paused as a result of pause() or
142         an error during capture such as a signal that is too fast."""
143         return self._paused
144
145     @property
146     def maxlen(self):
147         """The maximum length of the PulseIn. When len() is equal to maxlen,
148         it is unclear which pulses are active and which are idle."""
149         return self._maxlen
150
151     def clear(self):
152         """Clears all captured pulses"""
153         self._mq.send("c", True, type=1)
154
155     def popleft(self):
156         """Removes and returns the oldest read pulse."""
157         self._mq.send("^", True, type=1)
158         message = self._wait_receive_msg()
159         reply = int(message[0].decode("utf-8"))
160         # print(reply)
161         if reply == -1:
162             raise IndexError("pop from empty list")
163         return reply
164
165     def __len__(self):
166         """Returns the current pulse length"""
167         self._mq.send("l", True, type=1)
168         message = self._wait_receive_msg()
169         return int(message[0].decode("utf-8"))
170
171     # pylint: disable=redefined-builtin
172     def __getitem__(self, index, type=None):
173         """Returns the value at the given index or values in slice."""
174         self._mq.send("i%d" % index, True, type=1)
175         message = self._wait_receive_msg()
176         ret = int(message[0].decode("utf-8"))
177         if ret == -1:
178             raise IndexError("list index out of range")
179         return ret
180
181     # pylint: enable=redefined-builtin