]> Repositories - Adafruit_Blinka-hackapet.git/blob - src/adafruit_blinka/microcontroller/amlogic/meson_g12_common/pulseio/PulseIn.py
Merge branch 'main' of github.com:janvolck/Adafruit_Blinka
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / amlogic / meson_g12_common / pulseio / PulseIn.py
1 # SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
2 #
3 # SPDX-License-Identifier: MIT
4 """Custom PulseIn Class to read PWM signals"""
5 import time
6 import subprocess
7 import os
8 import atexit
9 import random
10 import struct
11 import sysv_ipc
12
13 DEBUG = False
14 queues = []
15 procs = []
16
17 # The message queues live outside of python space, and must be formally cleaned!
18 def final():
19     """In case the program is cancelled or quit, we need to clean up the PulseIn
20     helper process and also the message queue, this is called at exit to do so"""
21     if DEBUG:
22         print("Cleaning up message queues", queues)
23         print("Cleaning up processes", procs)
24     for q in queues:
25         q.remove()
26     for proc in procs:
27         proc.terminate()
28
29
30 atexit.register(final)
31
32 # pylint: disable=c-extension-no-member
33 class PulseIn:
34     """PulseIn Class to read PWM signals"""
35
36     def __init__(self, pin, maxlen=2, idle_state=False):
37         """Create a PulseIn object associated with the given pin.
38         The object acts as a read-only sequence of pulse lengths with
39         a given max length. When it is active, new pulse lengths are
40         added to the end of the list. When there is no more room
41         (len() == maxlen) the oldest pulse length is removed to make room."""
42                 
43         if isinstance(pin.id, tuple):
44             self._pin = str(pin.id[1])
45             self._chip = "gpiochip{}".format(pin.id[0])
46         else:
47             self._pin = str(pin.id)
48             self._chip = "gpiochip0"
49         
50         self._maxlen = maxlen
51         self._idle_state = idle_state
52         self._queue_key = random.randint(1, 9999)
53         try:
54             self._mq = sysv_ipc.MessageQueue(None, flags=sysv_ipc.IPC_CREX)
55             if DEBUG:
56                 print("Message Queue Key: ", self._mq.key)
57             queues.append(self._mq)
58         except sysv_ipc.ExistentialError:
59             raise RuntimeError(
60                 "Message queue creation failed"
61             ) from sysv_ipc.ExistentialError
62
63         # Check if OS is 64-bit
64         libgpiod_filename = "libgpiod_pulsein"
65         dir_path = os.path.dirname(os.path.realpath(__file__))
66         cmd = [
67             dir_path + "/" + libgpiod_filename,
68             "--pulses",
69             str(maxlen),
70             "--queue",
71             str(self._mq.key),
72         ]
73         if idle_state:
74             cmd.append("-i")
75         cmd.append(self._chip)
76         cmd.append(self._pin)
77         if DEBUG:
78             print(cmd)
79
80         self._process = subprocess.Popen(cmd)  # pylint: disable=consider-using-with
81         procs.append(self._process)
82
83         # wait for it to start up
84         if DEBUG:
85             print("Waiting for startup success message from subprocess")
86         message = self._wait_receive_msg(timeout=0.25)
87         if message[0] != b"!":
88             raise RuntimeError("Could not establish message queue with subprocess")
89         self._paused = False
90
91     # pylint: disable=redefined-builtin
92     def _wait_receive_msg(self, timeout=0, type=2):
93         """Internal helper that will wait for new messages of a given type,
94         and throw an exception on timeout"""
95         if timeout > 0:
96             stamp = time.monotonic()
97             while (time.monotonic() - stamp) < timeout:
98                 try:
99                     message = self._mq.receive(block=False, type=type)
100                     return message
101                 except sysv_ipc.BusyError:
102                     time.sleep(0.001)  # wait a bit then retry!
103             # uh-oh timed out
104             raise RuntimeError(
105                 "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
106             )
107         message = self._mq.receive(block=True, type=type)
108         return message
109
110     # pylint: enable=redefined-builtin
111
112     def deinit(self):
113         """Deinitialises the PulseIn and releases any hardware and software
114         resources for reuse."""
115         # Clean up after ourselves
116         self._process.terminate()
117         procs.remove(self._process)
118         self._mq.remove()
119         queues.remove(self._mq)
120
121     def __enter__(self):
122         """No-op used by Context Managers."""
123         return self
124
125     def __exit__(self, exc_type, exc_value, tb):
126         """Automatically deinitializes the hardware when exiting a context."""
127         self.deinit()
128
129     def resume(self, trigger_duration=0):
130         """Resumes pulse capture after an optional trigger pulse."""
131         if trigger_duration != 0:
132             self._mq.send("t%d" % trigger_duration, True, type=1)
133         else:
134             self._mq.send("r", True, type=1)
135         self._paused = False
136
137     def pause(self):
138         """Pause pulse capture"""
139         self._mq.send("p", True, type=1)
140         self._paused = True
141
142     @property
143     def paused(self):
144         """True when pulse capture is paused as a result of pause() or
145         an error during capture such as a signal that is too fast."""
146         return self._paused
147
148     @property
149     def maxlen(self):
150         """The maximum length of the PulseIn. When len() is equal to maxlen,
151         it is unclear which pulses are active and which are idle."""
152         return self._maxlen
153
154     def clear(self):
155         """Clears all captured pulses"""
156         self._mq.send("c", True, type=1)
157
158     def popleft(self):
159         """Removes and returns the oldest read pulse."""
160         self._mq.send("^", True, type=1)
161         message = self._wait_receive_msg()
162         reply = int(message[0].decode("utf-8"))
163         # print(reply)
164         if reply == -1:
165             raise IndexError("pop from empty list")
166         return reply
167
168     def __len__(self):
169         """Returns the current pulse length"""
170         self._mq.send("l", True, type=1)
171         message = self._wait_receive_msg()
172         return int(message[0].decode("utf-8"))
173
174     # pylint: disable=redefined-builtin
175     def __getitem__(self, index, type=None):
176         """Returns the value at the given index or values in slice."""
177         self._mq.send("i%d" % index, True, type=1)
178         message = self._wait_receive_msg()
179         ret = int(message[0].decode("utf-8"))
180         if ret == -1:
181             raise IndexError("list index out of range")
182         return ret
183
184     # pylint: enable=redefined-builtin