]> Repositories - Adafruit_Blinka-hackapet.git/blob - src/adafruit_blinka/microcontroller/amlogic/meson_g12_common/pulseio/PulseIn.py
libgpio 2.x support for odroid c4
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / amlogic / meson_g12_common / pulseio / PulseIn.py
1 # SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
2 #
3 # SPDX-License-Identifier: MIT
4 """Custom PulseIn Class to read PWM signals"""
5 import time
6 import subprocess
7 import os
8 import atexit
9 import random
10 import sysv_ipc
11
12 DEBUG = False
13 queues = []
14 procs = []
15
16
17 # The message queues live outside of python space, and must be formally cleaned!
18 def final():
19     """In case the program is cancelled or quit, we need to clean up the PulseIn
20     helper process and also the message queue, this is called at exit to do so"""
21     if DEBUG:
22         print("Cleaning up message queues", queues)
23         print("Cleaning up processes", procs)
24     for q in queues:
25         q.remove()
26     for proc in procs:
27         proc.terminate()
28
29
30 atexit.register(final)
31
32
33 # pylint: disable=c-extension-no-member
34 class PulseIn:
35     """PulseIn Class to read PWM signals"""
36
37     def __init__(self, pin, maxlen=2, idle_state=False):
38         """Create a PulseIn object associated with the given pin.
39         The object acts as a read-only sequence of pulse lengths with
40         a given max length. When it is active, new pulse lengths are
41         added to the end of the list. When there is no more room
42         (len() == maxlen) the oldest pulse length is removed to make room."""
43
44         if isinstance(pin.id, tuple):
45             self._pin = str(pin.id[1])
46             self._chip = "gpiochip{}".format(pin.id[0])
47         else:
48             self._pin = str(pin.id)
49             self._chip = "gpiochip0"
50
51         self._maxlen = maxlen
52         self._idle_state = idle_state
53         self._queue_key = random.randint(1, 9999)
54         try:
55             self._mq = sysv_ipc.MessageQueue(None, flags=sysv_ipc.IPC_CREX)
56             if DEBUG:
57                 print("Message Queue Key: ", self._mq.key)
58             queues.append(self._mq)
59         except sysv_ipc.ExistentialError:
60             raise RuntimeError(
61                 "Message queue creation failed"
62             ) from sysv_ipc.ExistentialError
63
64         # Check if OS is 64-bit
65         libgpiod_filename = "libgpiod_pulsein"
66         dir_path = os.path.dirname(os.path.realpath(__file__))
67         cmd = [
68             dir_path + "/" + libgpiod_filename,
69             "--pulses",
70             str(maxlen),
71             "--queue",
72             str(self._mq.key),
73         ]
74         if idle_state:
75             cmd.append("-i")
76         cmd.append(self._chip)
77         cmd.append(self._pin)
78         if DEBUG:
79             print(cmd)
80
81         self._process = subprocess.Popen(cmd)  # pylint: disable=consider-using-with
82         procs.append(self._process)
83
84         # wait for it to start up
85         if DEBUG:
86             print("Waiting for startup success message from subprocess")
87         message = self._wait_receive_msg(timeout=0.25)
88         if message[0] != b"!":
89             raise RuntimeError("Could not establish message queue with subprocess")
90         self._paused = False
91
92     # pylint: disable=redefined-builtin
93     def _wait_receive_msg(self, timeout=0, type=2):
94         """Internal helper that will wait for new messages of a given type,
95         and throw an exception on timeout"""
96         if timeout > 0:
97             stamp = time.monotonic()
98             while (time.monotonic() - stamp) < timeout:
99                 try:
100                     message = self._mq.receive(block=False, type=type)
101                     return message
102                 except sysv_ipc.BusyError:
103                     time.sleep(0.001)  # wait a bit then retry!
104             # uh-oh timed out
105             raise RuntimeError(
106                 "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
107             )
108         message = self._mq.receive(block=True, type=type)
109         return message
110
111     # pylint: enable=redefined-builtin
112
113     def deinit(self):
114         """Deinitialises the PulseIn and releases any hardware and software
115         resources for reuse."""
116         # Clean up after ourselves
117         self._process.terminate()
118         procs.remove(self._process)
119         self._mq.remove()
120         queues.remove(self._mq)
121
122     def __enter__(self):
123         """No-op used by Context Managers."""
124         return self
125
126     def __exit__(self, exc_type, exc_value, tb):
127         """Automatically deinitializes the hardware when exiting a context."""
128         self.deinit()
129
130     def resume(self, trigger_duration=0):
131         """Resumes pulse capture after an optional trigger pulse."""
132         if trigger_duration != 0:
133             self._mq.send("t%d" % trigger_duration, True, type=1)
134         else:
135             self._mq.send("r", True, type=1)
136         self._paused = False
137
138     def pause(self):
139         """Pause pulse capture"""
140         self._mq.send("p", True, type=1)
141         self._paused = True
142
143     @property
144     def paused(self):
145         """True when pulse capture is paused as a result of pause() or
146         an error during capture such as a signal that is too fast."""
147         return self._paused
148
149     @property
150     def maxlen(self):
151         """The maximum length of the PulseIn. When len() is equal to maxlen,
152         it is unclear which pulses are active and which are idle."""
153         return self._maxlen
154
155     def clear(self):
156         """Clears all captured pulses"""
157         self._mq.send("c", True, type=1)
158
159     def popleft(self):
160         """Removes and returns the oldest read pulse."""
161         self._mq.send("^", True, type=1)
162         message = self._wait_receive_msg()
163         reply = int(message[0].decode("utf-8"))
164         # print(reply)
165         if reply == -1:
166             raise IndexError("pop from empty list")
167         return reply
168
169     def __len__(self):
170         """Returns the current pulse length"""
171         self._mq.send("l", True, type=1)
172         message = self._wait_receive_msg()
173         return int(message[0].decode("utf-8"))
174
175     # pylint: disable=redefined-builtin
176     def __getitem__(self, index, type=None):
177         """Returns the value at the given index or values in slice."""
178         self._mq.send("i%d" % index, True, type=1)
179         message = self._wait_receive_msg()
180         ret = int(message[0].decode("utf-8"))
181         if ret == -1:
182             raise IndexError("list index out of range")
183         return ret
184
185     # pylint: enable=redefined-builtin