+++ /dev/null
-import array
-import time
-import subprocess
-import os, signal
-import traceback
-import signal
-import sysv_ipc
-import atexit
-import random
-
-DEBUG = False
-queues = []
-procs = []
-
-# The message queues live outside of python space, and must be formally cleaned!
-def final():
- if DEBUG:
- print("Cleaning up message queues", queues)
- print("Cleaning up processes", procs)
- for q in queues:
- q.remove()
- for proc in procs:
- proc.terminate()
-atexit.register(final)
-
-class PulseIn:
- def __init__(self, pin, maxlen=2, idle_state=False):
- self._pin = pin
- self._maxlen = maxlen
- self._idle_state = idle_state
- self._queue_key = random.randint(1, 9999)
- try:
- self._mq = sysv_ipc.MessageQueue(None, flags=sysv_ipc.IPC_CREX)
- if DEBUG:
- print("Message Queue Key: ", self._mq.key)
- queues.append(self._mq)
- except sysv_ipc.ExistentialError:
- raise RuntimeError("Message queue creation failed")
-
- cmd = ["/home/pi/libgpiod_pulsein/src/libgpiod_pulsein",
- "--pulses", str(maxlen),
- "--queue", str(self._mq.key)]
- if not idle_state:
- cmd.append("-i")
- cmd.append("gpiochip0")
- cmd.append(str(pin))
- if DEBUG:
- print(cmd)
-
- self._process = subprocess.Popen(cmd)
- procs.append(self._process)
-
- # wait for it to start up
- if DEBUG:
- print("Waiting for startup success message from subprocess")
- message = self._mq.receive(type=2)
- if message[0] != b'!':
- raise RuntimeError("Could not establish message queue with subprocess")
-
-
- def deinit(self):
- # Clean up after ourselves
- self._process.terminate()
- procs.remove(self._process)
- self._mq.remove()
- queues.remove(self._mq)
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, tb):
- self.deinit()
-
- def resume(self, trigger_duration=0):
- if trigger_duration != 0:
- self._mq.send("t%d" % trigger_duration, True, type=1)
- else:
- self._mq.send("r", True, type=1)
-
- def pause(self):
- self._mq.send("p", True, type=1)
-
- def clear(self):
- self._mq.send("c", True, type=1)
-
- def popleft(self):
- self._mq.send("^", True, type=1)
- message = self._mq.receive(type=2)
- reply = int(message[0].decode('utf-8'))
- if reply == -1:
- reply = None
- return reply
-
- def __len__(self):
- self._mq.send("l", True, type=1)
- message = self._mq.receive(type=2)
- return int(message[0].decode('utf-8'))