]> Repositories - hackapet/Adafruit_Blinka.git/blob - src/adafruit_blinka/microcontroller/bcm283x/pulseio/PulseIn.py
Merge pull request #322 from dmanla/master
[hackapet/Adafruit_Blinka.git] / src / adafruit_blinka / microcontroller / bcm283x / pulseio / PulseIn.py
1 """Custom PulseIn Class to read PWM signals"""
2 import time
3 import subprocess
4 import os
5 import atexit
6 import random
7 import sysv_ipc
8
9 DEBUG = False
10 queues = []
11 procs = []
12
13 # The message queues live outside of python space, and must be formally cleaned!
14 def final():
15     """In case the program is cancelled or quit, we need to clean up the PulseIn
16     helper process and also the message queue, this is called at exit to do so"""
17     if DEBUG:
18         print("Cleaning up message queues", queues)
19         print("Cleaning up processes", procs)
20     for q in queues:
21         q.remove()
22     for proc in procs:
23         proc.terminate()
24
25
26 atexit.register(final)
27
28 # pylint: disable=c-extension-no-member
29 class PulseIn:
30     """PulseIn Class to read PWM signals"""
31
32     def __init__(self, pin, maxlen=2, idle_state=False):
33         """Create a PulseIn object associated with the given pin.
34         The object acts as a read-only sequence of pulse lengths with
35         a given max length. When it is active, new pulse lengths are
36         added to the end of the list. When there is no more room
37         (len() == maxlen) the oldest pulse length is removed to make room."""
38         self._pin = pin
39         self._maxlen = maxlen
40         self._idle_state = idle_state
41         self._queue_key = random.randint(1, 9999)
42         try:
43             self._mq = sysv_ipc.MessageQueue(None, flags=sysv_ipc.IPC_CREX)
44             if DEBUG:
45                 print("Message Queue Key: ", self._mq.key)
46             queues.append(self._mq)
47         except sysv_ipc.ExistentialError:
48             raise RuntimeError(
49                 "Message queue creation failed"
50             ) from sysv_ipc.ExistentialError
51
52         dir_path = os.path.dirname(os.path.realpath(__file__))
53         cmd = [
54             dir_path + "/libgpiod_pulsein",
55             "--pulses",
56             str(maxlen),
57             "--queue",
58             str(self._mq.key),
59         ]
60         if idle_state:
61             cmd.append("-i")
62         cmd.append("gpiochip0")
63         cmd.append(str(pin))
64         if DEBUG:
65             print(cmd)
66
67         self._process = subprocess.Popen(cmd)
68         procs.append(self._process)
69
70         # wait for it to start up
71         if DEBUG:
72             print("Waiting for startup success message from subprocess")
73         message = self._wait_receive_msg(timeout=0.25)
74         if message[0] != b"!":
75             raise RuntimeError("Could not establish message queue with subprocess")
76         self._paused = False
77
78     # pylint: disable=redefined-builtin
79     def _wait_receive_msg(self, timeout=0, type=2):
80         """Internal helper that will wait for new messages of a given type,
81         and throw an exception on timeout"""
82         if timeout > 0:
83             stamp = time.monotonic()
84             while (time.monotonic() - stamp) < timeout:
85                 try:
86                     message = self._mq.receive(block=False, type=type)
87                     return message
88                 except sysv_ipc.BusyError:
89                     time.sleep(0.001)  # wait a bit then retry!
90             # uh-oh timed out
91             raise RuntimeError(
92                 "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
93             )
94         message = self._mq.receive(block=True, type=type)
95         return message
96
97     # pylint: enable=redefined-builtin
98
99     def deinit(self):
100         """Deinitialises the PulseIn and releases any hardware and software
101         resources for reuse."""
102         # Clean up after ourselves
103         self._process.terminate()
104         procs.remove(self._process)
105         self._mq.remove()
106         queues.remove(self._mq)
107
108     def __enter__(self):
109         """No-op used by Context Managers."""
110         return self
111
112     def __exit__(self, exc_type, exc_value, tb):
113         """Automatically deinitializes the hardware when exiting a context."""
114         self.deinit()
115
116     def resume(self, trigger_duration=0):
117         """Resumes pulse capture after an optional trigger pulse."""
118         if trigger_duration != 0:
119             self._mq.send("t%d" % trigger_duration, True, type=1)
120         else:
121             self._mq.send("r", True, type=1)
122         self._paused = False
123
124     def pause(self):
125         """Pause pulse capture"""
126         self._mq.send("p", True, type=1)
127         self._paused = True
128
129     @property
130     def paused(self):
131         """True when pulse capture is paused as a result of pause() or
132         an error during capture such as a signal that is too fast."""
133         return self._paused
134
135     @property
136     def maxlen(self):
137         """The maximum length of the PulseIn. When len() is equal to maxlen,
138         it is unclear which pulses are active and which are idle."""
139         return self._maxlen
140
141     def clear(self):
142         """Clears all captured pulses"""
143         self._mq.send("c", True, type=1)
144
145     def popleft(self):
146         """Removes and returns the oldest read pulse."""
147         self._mq.send("^", True, type=1)
148         message = self._wait_receive_msg()
149         reply = int(message[0].decode("utf-8"))
150         # print(reply)
151         if reply == -1:
152             raise IndexError("pop from empty list")
153         return reply
154
155     def __len__(self):
156         """Returns the current pulse length"""
157         self._mq.send("l", True, type=1)
158         message = self._wait_receive_msg()
159         return int(message[0].decode("utf-8"))
160
161     # pylint: disable=redefined-builtin
162     def __getitem__(self, index, type=None):
163         """Returns the value at the given index or values in slice."""
164         self._mq.send("i%d" % index, True, type=1)
165         message = self._wait_receive_msg()
166         ret = int(message[0].decode("utf-8"))
167         if ret == -1:
168             raise IndexError("list index out of range")
169         return ret
170
171     # pylint: enable=redefined-builtin