def __init__(self, pin, maxlen=2, idle_state=False):
"""Create a PulseIn object associated with the given pin.
The object acts as a read-only sequence of pulse lengths with
def __init__(self, pin, maxlen=2, idle_state=False):
"""Create a PulseIn object associated with the given pin.
The object acts as a read-only sequence of pulse lengths with
raise RuntimeError("Message queue creation failed")
dir_path = os.path.dirname(os.path.realpath(__file__))
raise RuntimeError("Message queue creation failed")
dir_path = os.path.dirname(os.path.realpath(__file__))
- cmd = [dir_path+"/libgpiod_pulsein",
- "--pulses", str(maxlen),
- "--queue", str(self._mq.key)]
+ cmd = [
+ dir_path + "/libgpiod_pulsein",
+ "--pulses",
+ str(maxlen),
+ "--queue",
+ str(self._mq.key),
+ ]
def _wait_receive_msg(self, timeout=0.25, type=2):
"""Internal helper that will wait for new messages of a given type,
and throw an exception on timeout"""
stamp = time.monotonic()
while (time.monotonic() - stamp) < timeout:
try:
def _wait_receive_msg(self, timeout=0.25, type=2):
"""Internal helper that will wait for new messages of a given type,
and throw an exception on timeout"""
stamp = time.monotonic()
while (time.monotonic() - stamp) < timeout:
try:
"""Removes and returns the oldest read pulse."""
self._mq.send("^", True, type=1)
message = self._wait_receive_msg()
"""Removes and returns the oldest read pulse."""
self._mq.send("^", True, type=1)
message = self._wait_receive_msg()
"""Returns the current pulse length"""
self._mq.send("l", True, type=1)
message = self._wait_receive_msg()
"""Returns the current pulse length"""
self._mq.send("l", True, type=1)
message = self._wait_receive_msg()
def __getitem__(self, index, type=None):
"""Returns the value at the given index or values in slice."""
self._mq.send("i%d" % index, True, type=1)
message = self._wait_receive_msg()
def __getitem__(self, index, type=None):
"""Returns the value at the given index or values in slice."""
self._mq.send("i%d" % index, True, type=1)
message = self._wait_receive_msg()