]> Repositories - hackapet/Adafruit_Blinka.git/blob - src/adafruit_blinka/microcontroller/bcm283x/pulseio/PulseIn.py
update setup.py and docs conf to specify python 3.7
[hackapet/Adafruit_Blinka.git] / src / adafruit_blinka / microcontroller / bcm283x / pulseio / PulseIn.py
1 """Custom PulseIn Class to read PWM signals"""
2 import time
3 import subprocess
4 import os
5 import atexit
6 import random
7 import struct
8 import sysv_ipc
9
10 DEBUG = False
11 queues = []
12 procs = []
13
14 # The message queues live outside of python space, and must be formally cleaned!
15 def final():
16     """In case the program is cancelled or quit, we need to clean up the PulseIn
17     helper process and also the message queue, this is called at exit to do so"""
18     if DEBUG:
19         print("Cleaning up message queues", queues)
20         print("Cleaning up processes", procs)
21     for q in queues:
22         q.remove()
23     for proc in procs:
24         proc.terminate()
25
26
27 atexit.register(final)
28
29 # pylint: disable=c-extension-no-member
30 class PulseIn:
31     """PulseIn Class to read PWM signals"""
32
33     def __init__(self, pin, maxlen=2, idle_state=False):
34         """Create a PulseIn object associated with the given pin.
35         The object acts as a read-only sequence of pulse lengths with
36         a given max length. When it is active, new pulse lengths are
37         added to the end of the list. When there is no more room
38         (len() == maxlen) the oldest pulse length is removed to make room."""
39         self._pin = pin
40         self._maxlen = maxlen
41         self._idle_state = idle_state
42         self._queue_key = random.randint(1, 9999)
43         try:
44             self._mq = sysv_ipc.MessageQueue(None, flags=sysv_ipc.IPC_CREX)
45             if DEBUG:
46                 print("Message Queue Key: ", self._mq.key)
47             queues.append(self._mq)
48         except sysv_ipc.ExistentialError:
49             raise RuntimeError(
50                 "Message queue creation failed"
51             ) from sysv_ipc.ExistentialError
52
53         # Check if OS is 64-bit
54         if struct.calcsize("P") * 8 == 64:
55             libgpiod_filename = "libgpiod_pulsein64"
56         else:
57             libgpiod_filename = "libgpiod_pulsein"
58
59         dir_path = os.path.dirname(os.path.realpath(__file__))
60         cmd = [
61             dir_path + "/" + libgpiod_filename,
62             "--pulses",
63             str(maxlen),
64             "--queue",
65             str(self._mq.key),
66         ]
67         if idle_state:
68             cmd.append("-i")
69         cmd.append("gpiochip0")
70         cmd.append(str(pin))
71         if DEBUG:
72             print(cmd)
73
74         self._process = subprocess.Popen(cmd)
75         procs.append(self._process)
76
77         # wait for it to start up
78         if DEBUG:
79             print("Waiting for startup success message from subprocess")
80         message = self._wait_receive_msg(timeout=0.25)
81         if message[0] != b"!":
82             raise RuntimeError("Could not establish message queue with subprocess")
83         self._paused = False
84
85     # pylint: disable=redefined-builtin
86     def _wait_receive_msg(self, timeout=0, type=2):
87         """Internal helper that will wait for new messages of a given type,
88         and throw an exception on timeout"""
89         if timeout > 0:
90             stamp = time.monotonic()
91             while (time.monotonic() - stamp) < timeout:
92                 try:
93                     message = self._mq.receive(block=False, type=type)
94                     return message
95                 except sysv_ipc.BusyError:
96                     time.sleep(0.001)  # wait a bit then retry!
97             # uh-oh timed out
98             raise RuntimeError(
99                 "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
100             )
101         message = self._mq.receive(block=True, type=type)
102         return message
103
104     # pylint: enable=redefined-builtin
105
106     def deinit(self):
107         """Deinitialises the PulseIn and releases any hardware and software
108         resources for reuse."""
109         # Clean up after ourselves
110         self._process.terminate()
111         procs.remove(self._process)
112         self._mq.remove()
113         queues.remove(self._mq)
114
115     def __enter__(self):
116         """No-op used by Context Managers."""
117         return self
118
119     def __exit__(self, exc_type, exc_value, tb):
120         """Automatically deinitializes the hardware when exiting a context."""
121         self.deinit()
122
123     def resume(self, trigger_duration=0):
124         """Resumes pulse capture after an optional trigger pulse."""
125         if trigger_duration != 0:
126             self._mq.send("t%d" % trigger_duration, True, type=1)
127         else:
128             self._mq.send("r", True, type=1)
129         self._paused = False
130
131     def pause(self):
132         """Pause pulse capture"""
133         self._mq.send("p", True, type=1)
134         self._paused = True
135
136     @property
137     def paused(self):
138         """True when pulse capture is paused as a result of pause() or
139         an error during capture such as a signal that is too fast."""
140         return self._paused
141
142     @property
143     def maxlen(self):
144         """The maximum length of the PulseIn. When len() is equal to maxlen,
145         it is unclear which pulses are active and which are idle."""
146         return self._maxlen
147
148     def clear(self):
149         """Clears all captured pulses"""
150         self._mq.send("c", True, type=1)
151
152     def popleft(self):
153         """Removes and returns the oldest read pulse."""
154         self._mq.send("^", True, type=1)
155         message = self._wait_receive_msg()
156         reply = int(message[0].decode("utf-8"))
157         # print(reply)
158         if reply == -1:
159             raise IndexError("pop from empty list")
160         return reply
161
162     def __len__(self):
163         """Returns the current pulse length"""
164         self._mq.send("l", True, type=1)
165         message = self._wait_receive_msg()
166         return int(message[0].decode("utf-8"))
167
168     # pylint: disable=redefined-builtin
169     def __getitem__(self, index, type=None):
170         """Returns the value at the given index or values in slice."""
171         self._mq.send("i%d" % index, True, type=1)
172         message = self._wait_receive_msg()
173         ret = int(message[0].decode("utf-8"))
174         if ret == -1:
175             raise IndexError("list index out of range")
176         return ret
177
178     # pylint: enable=redefined-builtin