]> Repositories - Adafruit_Blinka-hackapet.git/blob - src/adafruit_blinka/microcontroller/bcm283x/pulseio/PulseIn.py
Updated libgpiod to reduce CPU Usage
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / bcm283x / pulseio / PulseIn.py
1 """Custom PulseIn Class to read PWM signals"""
2 import time
3 import subprocess
4 import os
5 import atexit
6 import random
7 import sysv_ipc
8
9 DEBUG = False
10 queues = []
11 procs = []
12
13 # The message queues live outside of python space, and must be formally cleaned!
14 def final():
15     """In case the program is cancelled or quit, we need to clean up the PulseIn
16     helper process and also the message queue, this is called at exit to do so"""
17     if DEBUG:
18         print("Cleaning up message queues", queues)
19         print("Cleaning up processes", procs)
20     for q in queues:
21         q.remove()
22     for proc in procs:
23         proc.terminate()
24
25
26 atexit.register(final)
27
28 # pylint: disable=c-extension-no-member
29 class PulseIn:
30     """PulseIn Class to read PWM signals"""
31
32     def __init__(self, pin, maxlen=2, idle_state=False):
33         """Create a PulseIn object associated with the given pin.
34         The object acts as a read-only sequence of pulse lengths with
35         a given max length. When it is active, new pulse lengths are
36         added to the end of the list. When there is no more room
37         (len() == maxlen) the oldest pulse length is removed to make room."""
38         self._pin = pin
39         self._maxlen = maxlen
40         self._idle_state = idle_state
41         self._queue_key = random.randint(1, 9999)
42         try:
43             self._mq = sysv_ipc.MessageQueue(None, flags=sysv_ipc.IPC_CREX)
44             if DEBUG:
45                 print("Message Queue Key: ", self._mq.key)
46             queues.append(self._mq)
47         except sysv_ipc.ExistentialError:
48             raise RuntimeError("Message queue creation failed")
49
50         dir_path = os.path.dirname(os.path.realpath(__file__))
51         cmd = [
52             dir_path + "/libgpiod_pulsein",
53             "--pulses",
54             str(maxlen),
55             "--queue",
56             str(self._mq.key),
57         ]
58         if idle_state:
59             cmd.append("-i")
60         cmd.append("gpiochip0")
61         cmd.append(str(pin))
62         if DEBUG:
63             print(cmd)
64
65         self._process = subprocess.Popen(cmd)
66         procs.append(self._process)
67
68         # wait for it to start up
69         if DEBUG:
70             print("Waiting for startup success message from subprocess")
71         message = self._wait_receive_msg()
72         if message[0] != b"!":
73             raise RuntimeError("Could not establish message queue with subprocess")
74         self._paused = False
75
76     # pylint: disable=redefined-builtin
77     def _wait_receive_msg(self, timeout=0.25, type=2):
78         """Internal helper that will wait for new messages of a given type,
79         and throw an exception on timeout"""
80         stamp = time.monotonic()
81         while (time.monotonic() - stamp) < timeout:
82             try:
83                 message = self._mq.receive(block=False, type=type)
84                 return message
85             except sysv_ipc.BusyError:
86                 time.sleep(0.001)  # wait a bit then retry!
87         # uh-oh timed out
88         raise RuntimeError(
89             "Timed out waiting for PulseIn message. Be sure libgpiod is installed."
90         )
91
92     # pylint: enable=redefined-builtin
93
94     def deinit(self):
95         """Deinitialises the PulseIn and releases any hardware and software
96         resources for reuse."""
97         # Clean up after ourselves
98         self._process.terminate()
99         procs.remove(self._process)
100         self._mq.remove()
101         queues.remove(self._mq)
102
103     def __enter__(self):
104         """No-op used by Context Managers."""
105         return self
106
107     def __exit__(self, exc_type, exc_value, tb):
108         """Automatically deinitializes the hardware when exiting a context."""
109         self.deinit()
110
111     def resume(self, trigger_duration=0):
112         """Resumes pulse capture after an optional trigger pulse."""
113         if trigger_duration != 0:
114             self._mq.send("t%d" % trigger_duration, True, type=1)
115         else:
116             self._mq.send("r", True, type=1)
117         self._paused = False
118
119     def pause(self):
120         """Pause pulse capture"""
121         self._mq.send("p", True, type=1)
122         self._paused = True
123
124     @property
125     def paused(self):
126         """True when pulse capture is paused as a result of pause() or
127         an error during capture such as a signal that is too fast."""
128         return self._paused
129
130     @property
131     def maxlen(self):
132         """The maximum length of the PulseIn. When len() is equal to maxlen,
133         it is unclear which pulses are active and which are idle."""
134         return self._maxlen
135
136     def clear(self):
137         """Clears all captured pulses"""
138         self._mq.send("c", True, type=1)
139
140     def popleft(self):
141         """Removes and returns the oldest read pulse."""
142         self._mq.send("^", True, type=1)
143         message = self._wait_receive_msg()
144         reply = int(message[0].decode("utf-8"))
145         # print(reply)
146         if reply == -1:
147             raise IndexError("pop from empty list")
148         return reply
149
150     def __len__(self):
151         """Returns the current pulse length"""
152         self._mq.send("l", True, type=1)
153         message = self._wait_receive_msg()
154         return int(message[0].decode("utf-8"))
155
156     # pylint: disable=redefined-builtin
157     def __getitem__(self, index, type=None):
158         """Returns the value at the given index or values in slice."""
159         self._mq.send("i%d" % index, True, type=1)
160         message = self._wait_receive_msg()
161         ret = int(message[0].decode("utf-8"))
162         if ret == -1:
163             raise IndexError("list index out of range")
164         return ret
165
166     # pylint: enable=redefined-builtin