X-Git-Url: https://git.ayoreis.com/Adafruit_Blinka-hackapet.git/blobdiff_plain/a3770186a94117be56a8d9a8038a2d540bd2884c..80a2f37a06c217e101373df892bf4a083c03012a:/src/adafruit_blinka/microcontroller/generic_linux/sysfs_pin.py diff --git a/src/adafruit_blinka/microcontroller/generic_linux/sysfs_pin.py b/src/adafruit_blinka/microcontroller/generic_linux/sysfs_pin.py new file mode 100644 index 0000000..c5b56e6 --- /dev/null +++ b/src/adafruit_blinka/microcontroller/generic_linux/sysfs_pin.py @@ -0,0 +1,325 @@ +""" +Much code from https://github.com/vsergeev/python-periphery/blob/master/periphery/gpio.py +Copyright (c) 2015-2019 vsergeev / Ivan (Vanya) A. Sergeev +License: MIT +""" + +import os +import errno +import time + +# pylint: disable=unnecessary-pass +class GPIOError(IOError): + """Base class for GPIO errors.""" + + pass + + +# pylint: enable=unnecessary-pass + + +class Pin: + """SysFS GPIO Pin Class""" + + # Number of retries to check for GPIO export or direction write on open + GPIO_OPEN_RETRIES = 10 + # Delay between check for GPIO export or direction write on open (100ms) + GPIO_OPEN_DELAY = 0.1 + + IN = "in" + OUT = "out" + LOW = 0 + HIGH = 1 + PULL_NONE = 0 + PULL_UP = 1 + PULL_DOWN = 2 + + id = None + _value = LOW + _mode = IN + + # Sysfs paths + _sysfs_path = "/sys/class/gpio/" + _channel_path = "gpiochip{}" + + # Channel paths + _export_path = "export" + _unexport_path = "unexport" + _pin_path = "gpio{}" + + def __init__(self, pin_id): + """Instantiate a Pin object and open the sysfs GPIO with the specified + pin number. + + Args: + pin_id (int): GPIO pin number. + + Returns: + SysfsGPIO: GPIO object. + + Raises: + GPIOError: if an I/O or OS error occurs. + TypeError: if `line` or `direction` types are invalid. + ValueError: if `direction` value is invalid. + TimeoutError: if waiting for GPIO export times out. + + """ + + if not isinstance(pin_id, int): + raise TypeError("Invalid Pin ID, should be integer.") + + self.id = pin_id + self._fd = None + self._line = None + self._path = None + + def __del__(self): + self._close() + + def __enter__(self): + return self + + def __exit__(self, t, value, traceback): + self._close() + + def init(self, mode=IN, pull=None): + """Initialize the Pin""" + if mode is not None: + if mode == self.IN: + self._mode = self.IN + self._open(self.IN) + elif mode == self.OUT: + self._mode = self.OUT + self._open(self.OUT) + else: + raise RuntimeError("Invalid mode for pin: %s" % self.id) + + if pull is not None: + if pull == self.PULL_UP: + raise NotImplementedError( + "Internal pullups not supported in periphery, " + "use physical resistor instead!" + ) + if pull == self.PULL_DOWN: + raise NotImplementedError( + "Internal pulldowns not supported in periphery, " + "use physical resistor instead!" + ) + raise RuntimeError("Invalid pull for pin: %s" % self.id) + + def value(self, val=None): + """Set or return the Pin Value""" + if val is not None: + if val == self.LOW: + self._value = val + self._write(False) + return None + if val == self.HIGH: + self._value = val + self._write(True) + return None + raise RuntimeError("Invalid value for pin") + return self.HIGH if self._read() else self.LOW + + # pylint: disable=too-many-branches + def _open(self, direction): + if not isinstance(direction, str): + raise TypeError("Invalid direction type, should be string.") + if direction.lower() not in ["in", "out", "high", "low"]: + raise ValueError('Invalid direction, can be: "in", "out", "high", "low".') + gpio_path = "/sys/class/gpio/gpio{:d}".format(self.id) + + if not os.path.isdir(gpio_path): + # Export the line + try: + with open("/sys/class/gpio/export", "w") as f_export: + f_export.write("{:d}\n".format(self.id)) + except IOError as e: + raise GPIOError(e.errno, "Exporting GPIO: " + e.strerror) + + # Loop until GPIO is exported + exported = False + for i in range(self.GPIO_OPEN_RETRIES): + if os.path.isdir(gpio_path): + exported = True + break + + time.sleep(self.GPIO_OPEN_DELAY) + + if not exported: + raise TimeoutError( + 'Exporting GPIO: waiting for "{:s}" timed out'.format(gpio_path) + ) + + # Write direction, looping in case of EACCES errors due to delayed udev + # permission rule application after export + for i in range(self.GPIO_OPEN_RETRIES): + try: + with open(os.path.join(gpio_path, "direction"), "w") as f_direction: + f_direction.write(direction.lower() + "\n") + break + except IOError as e: + if e.errno != errno.EACCES or ( + e.errno == errno.EACCES and i == self.GPIO_OPEN_RETRIES - 1 + ): + raise GPIOError( + e.errno, "Setting GPIO direction: " + e.strerror + ) + + time.sleep(self.GPIO_OPEN_DELAY) + else: + # Write direction + try: + with open(os.path.join(gpio_path, "direction"), "w") as f_direction: + f_direction.write(direction.lower() + "\n") + except IOError as e: + raise GPIOError(e.errno, "Setting GPIO direction: " + e.strerror) + + # Open value + try: + self._fd = os.open(os.path.join(gpio_path, "value"), os.O_RDWR) + except OSError as e: + raise GPIOError(e.errno, "Opening GPIO: " + e.strerror) + + self._path = gpio_path + + # pylint: enable=too-many-branches + + def _close(self): + if self._fd is None: + return + + try: + os.close(self._fd) + except OSError as e: + raise GPIOError(e.errno, "Closing GPIO: " + e.strerror) + + self._fd = None + + # Unexport the line + try: + unexport_fd = os.open("/sys/class/gpio/unexport", os.O_WRONLY) + os.write(unexport_fd, "{:d}\n".format(self.id).encode()) + os.close(unexport_fd) + except OSError as e: + raise GPIOError(e.errno, "Unexporting GPIO: " + e.strerror) + + def _read(self): + # Read value + try: + buf = os.read(self._fd, 2) + except OSError as e: + raise GPIOError(e.errno, "Reading GPIO: " + e.strerror) + + # Rewind + try: + os.lseek(self._fd, 0, os.SEEK_SET) + except OSError as e: + raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror) + + if buf[0] == b"0"[0]: + return False + if buf[0] == b"1"[0]: + return True + + raise GPIOError(None, "Unknown GPIO value: {}".format(buf)) + + def _write(self, value): + if not isinstance(value, bool): + raise TypeError("Invalid value type, should be bool.") + + # Write value + try: + if value: + os.write(self._fd, b"1\n") + else: + os.write(self._fd, b"0\n") + except OSError as e: + raise GPIOError(e.errno, "Writing GPIO: " + e.strerror) + + # Rewind + try: + os.lseek(self._fd, 0, os.SEEK_SET) + except OSError as e: + raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror) + + @property + def chip_name(self): + """Return the Chip Name""" + gpio_path = os.path.join(self._path, "device") + + gpiochip_path = os.readlink(gpio_path) + + if "/" not in gpiochip_path: + raise GPIOError( + None, + 'Reading gpiochip name: invalid device symlink "{:s}"'.format( + gpiochip_path + ), + ) + + return gpiochip_path.split("/")[-1] + + @property + def chip_label(self): + """Return the Chip Label""" + gpio_path = "/sys/class/gpio/{:s}/label".format(self.chip_name) + + try: + with open(gpio_path, "r") as f_label: + label = f_label.read() + except (GPIOError, IOError) as e: + if isinstance(e, IOError): + raise GPIOError(e.errno, "Reading gpiochip label: " + e.strerror) + + raise GPIOError(None, "Reading gpiochip label: " + e.strerror) + + return label.strip() + + # Mutable properties + + def _get_direction(self): + # Read direction + try: + with open(os.path.join(self._path, "direction"), "r") as f_direction: + direction = f_direction.read() + except IOError as e: + raise GPIOError(e.errno, "Getting GPIO direction: " + e.strerror) + + return direction.strip() + + def _set_direction(self, direction): + if not isinstance(direction, str): + raise TypeError("Invalid direction type, should be string.") + if direction.lower() not in ["in", "out", "high", "low"]: + raise ValueError('Invalid direction, can be: "in", "out", "high", "low".') + + # Write direction + try: + with open(os.path.join(self._path, "direction"), "w") as f_direction: + f_direction.write(direction.lower() + "\n") + except IOError as e: + raise GPIOError(e.errno, "Setting GPIO direction: " + e.strerror) + + direction = property(_get_direction, _set_direction) + + def __str__(self): + try: + str_direction = self.direction + except GPIOError: + str_direction = "" + + try: + str_chip_name = self.chip_name + except GPIOError: + str_chip_name = "" + + try: + str_chip_label = self.chip_label + except GPIOError: + str_chip_label = "" + + output = "Pin {:d} (dev={:s}, fd={:d}, dir={:s}, chip_name='{:s}', chip_label='{:s}')" + return output.format( + self.id, self._path, self._fd, str_direction, str_chip_name, str_chip_label + )