]> Repositories - Adafruit_Blinka-hackapet.git/blobdiff - src/adafruit_blinka/microcontroller/bcm2711/pulseio/PulseIn.py
Combine CM4 and remove duplicated files
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / bcm2711 / pulseio / PulseIn.py
diff --git a/src/adafruit_blinka/microcontroller/bcm2711/pulseio/PulseIn.py b/src/adafruit_blinka/microcontroller/bcm2711/pulseio/PulseIn.py
deleted file mode 100644 (file)
index 700d8de..0000000
+++ /dev/null
@@ -1,178 +0,0 @@
-"""Custom PulseIn Class to read PWM signals"""
-import time
-import subprocess
-import os
-import atexit
-import random
-import struct
-import sysv_ipc
-
-DEBUG = False
-queues = []
-procs = []
-
-# The message queues live outside of python space, and must be formally cleaned!
-def final():
-    """In case the program is cancelled or quit, we need to clean up the PulseIn
-    helper process and also the message queue, this is called at exit to do so"""
-    if DEBUG:
-        print("Cleaning up message queues", queues)
-        print("Cleaning up processes", procs)
-    for q in queues:
-        q.remove()
-    for proc in procs:
-        proc.terminate()
-
-
-atexit.register(final)
-
-# pylint: disable=c-extension-no-member
-class PulseIn:
-    """PulseIn Class to read PWM signals"""
-
-    def __init__(self, pin, maxlen=2, idle_state=False):
-        """Create a PulseIn object associated with the given pin.
-        The object acts as a read-only sequence of pulse lengths with
-        a given max length. When it is active, new pulse lengths are
-        added to the end of the list. When there is no more room
-        (len() == maxlen) the oldest pulse length is removed to make room."""
-        self._pin = pin
-        self._maxlen = maxlen
-        self._idle_state = idle_state
-        self._queue_key = random.randint(1, 9999)
-        try:
-            self._mq = sysv_ipc.MessageQueue(None, flags=sysv_ipc.IPC_CREX)
-            if DEBUG:
-                print("Message Queue Key: ", self._mq.key)
-            queues.append(self._mq)
-        except sysv_ipc.ExistentialError:
-            raise RuntimeError(
-                "Message queue creation failed"
-            ) from sysv_ipc.ExistentialError
-
-        # Check if OS is 64-bit
-        if struct.calcsize("P") * 8 == 64:
-            libgpiod_filename = "libgpiod_pulsein64"
-        else:
-            libgpiod_filename = "libgpiod_pulsein"
-
-        dir_path = os.path.dirname(os.path.realpath(__file__))
-        cmd = [
-            dir_path + "/" + libgpiod_filename,
-            "--pulses",
-            str(maxlen),
-            "--queue",
-            str(self._mq.key),
-        ]
-        if idle_state:
-            cmd.append("-i")
-        cmd.append("gpiochip0")
-        cmd.append(str(pin))
-        if DEBUG:
-            print(cmd)
-
-        self._process = subprocess.Popen(cmd)
-        procs.append(self._process)
-
-        # wait for it to start up
-        if DEBUG:
-            print("Waiting for startup success message from subprocess")
-        message = self._wait_receive_msg(timeout=0.25)
-        if message[0] != b"!":
-            raise RuntimeError("Could not establish message queue with subprocess")
-        self._paused = False
-
-    # pylint: disable=redefined-builtin
-    def _wait_receive_msg(self, timeout=0, type=2):
-        """Internal helper that will wait for new messages of a given type,
-        and throw an exception on timeout"""
-        if timeout > 0:
-            stamp = time.monotonic()
-            while (time.monotonic() - stamp) < timeout:
-                try:
-                    message = self._mq.receive(block=False, type=type)
-                    return message
-                except sysv_ipc.BusyError:
-                    time.sleep(0.001)  # wait a bit then retry!
-            # uh-oh timed out
-            raise RuntimeError(
-                "Timed out waiting for PulseIn message. Make sure libgpiod is installed."
-            )
-        message = self._mq.receive(block=True, type=type)
-        return message
-
-    # pylint: enable=redefined-builtin
-
-    def deinit(self):
-        """Deinitialises the PulseIn and releases any hardware and software
-        resources for reuse."""
-        # Clean up after ourselves
-        self._process.terminate()
-        procs.remove(self._process)
-        self._mq.remove()
-        queues.remove(self._mq)
-
-    def __enter__(self):
-        """No-op used by Context Managers."""
-        return self
-
-    def __exit__(self, exc_type, exc_value, tb):
-        """Automatically deinitializes the hardware when exiting a context."""
-        self.deinit()
-
-    def resume(self, trigger_duration=0):
-        """Resumes pulse capture after an optional trigger pulse."""
-        if trigger_duration != 0:
-            self._mq.send("t%d" % trigger_duration, True, type=1)
-        else:
-            self._mq.send("r", True, type=1)
-        self._paused = False
-
-    def pause(self):
-        """Pause pulse capture"""
-        self._mq.send("p", True, type=1)
-        self._paused = True
-
-    @property
-    def paused(self):
-        """True when pulse capture is paused as a result of pause() or
-        an error during capture such as a signal that is too fast."""
-        return self._paused
-
-    @property
-    def maxlen(self):
-        """The maximum length of the PulseIn. When len() is equal to maxlen,
-        it is unclear which pulses are active and which are idle."""
-        return self._maxlen
-
-    def clear(self):
-        """Clears all captured pulses"""
-        self._mq.send("c", True, type=1)
-
-    def popleft(self):
-        """Removes and returns the oldest read pulse."""
-        self._mq.send("^", True, type=1)
-        message = self._wait_receive_msg()
-        reply = int(message[0].decode("utf-8"))
-        # print(reply)
-        if reply == -1:
-            raise IndexError("pop from empty list")
-        return reply
-
-    def __len__(self):
-        """Returns the current pulse length"""
-        self._mq.send("l", True, type=1)
-        message = self._wait_receive_msg()
-        return int(message[0].decode("utf-8"))
-
-    # pylint: disable=redefined-builtin
-    def __getitem__(self, index, type=None):
-        """Returns the value at the given index or values in slice."""
-        self._mq.send("i%d" % index, True, type=1)
-        message = self._wait_receive_msg()
-        ret = int(message[0].decode("utf-8"))
-        if ret == -1:
-            raise IndexError("list index out of range")
-        return ret
-
-    # pylint: enable=redefined-builtin