]> Repositories - Adafruit_Blinka-hackapet.git/blobdiff - src/adafruit_blinka/microcontroller/generic_linux/sysfs_pin.py
Added Rock Pi S
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / generic_linux / sysfs_pin.py
diff --git a/src/adafruit_blinka/microcontroller/generic_linux/sysfs_pin.py b/src/adafruit_blinka/microcontroller/generic_linux/sysfs_pin.py
new file mode 100644 (file)
index 0000000..c5b56e6
--- /dev/null
@@ -0,0 +1,325 @@
+"""
+Much code from https://github.com/vsergeev/python-periphery/blob/master/periphery/gpio.py
+Copyright (c) 2015-2019 vsergeev / Ivan (Vanya) A. Sergeev
+License: MIT
+"""
+
+import os
+import errno
+import time
+
+# pylint: disable=unnecessary-pass
+class GPIOError(IOError):
+    """Base class for GPIO errors."""
+
+    pass
+
+
+# pylint: enable=unnecessary-pass
+
+
+class Pin:
+    """SysFS GPIO Pin Class"""
+
+    # Number of retries to check for GPIO export or direction write on open
+    GPIO_OPEN_RETRIES = 10
+    # Delay between check for GPIO export or direction write on open (100ms)
+    GPIO_OPEN_DELAY = 0.1
+
+    IN = "in"
+    OUT = "out"
+    LOW = 0
+    HIGH = 1
+    PULL_NONE = 0
+    PULL_UP = 1
+    PULL_DOWN = 2
+
+    id = None
+    _value = LOW
+    _mode = IN
+
+    # Sysfs paths
+    _sysfs_path = "/sys/class/gpio/"
+    _channel_path = "gpiochip{}"
+
+    # Channel paths
+    _export_path = "export"
+    _unexport_path = "unexport"
+    _pin_path = "gpio{}"
+
+    def __init__(self, pin_id):
+        """Instantiate a Pin object and open the sysfs GPIO with the specified
+        pin number.
+
+        Args:
+            pin_id (int): GPIO pin number.
+
+        Returns:
+            SysfsGPIO: GPIO object.
+
+        Raises:
+            GPIOError: if an I/O or OS error occurs.
+            TypeError: if `line` or `direction`  types are invalid.
+            ValueError: if `direction` value is invalid.
+            TimeoutError: if waiting for GPIO export times out.
+
+        """
+
+        if not isinstance(pin_id, int):
+            raise TypeError("Invalid Pin ID, should be integer.")
+
+        self.id = pin_id
+        self._fd = None
+        self._line = None
+        self._path = None
+
+    def __del__(self):
+        self._close()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, t, value, traceback):
+        self._close()
+
+    def init(self, mode=IN, pull=None):
+        """Initialize the Pin"""
+        if mode is not None:
+            if mode == self.IN:
+                self._mode = self.IN
+                self._open(self.IN)
+            elif mode == self.OUT:
+                self._mode = self.OUT
+                self._open(self.OUT)
+            else:
+                raise RuntimeError("Invalid mode for pin: %s" % self.id)
+
+            if pull is not None:
+                if pull == self.PULL_UP:
+                    raise NotImplementedError(
+                        "Internal pullups not supported in periphery, "
+                        "use physical resistor instead!"
+                    )
+                if pull == self.PULL_DOWN:
+                    raise NotImplementedError(
+                        "Internal pulldowns not supported in periphery, "
+                        "use physical resistor instead!"
+                    )
+                raise RuntimeError("Invalid pull for pin: %s" % self.id)
+
+    def value(self, val=None):
+        """Set or return the Pin Value"""
+        if val is not None:
+            if val == self.LOW:
+                self._value = val
+                self._write(False)
+                return None
+            if val == self.HIGH:
+                self._value = val
+                self._write(True)
+                return None
+            raise RuntimeError("Invalid value for pin")
+        return self.HIGH if self._read() else self.LOW
+
+    # pylint: disable=too-many-branches
+    def _open(self, direction):
+        if not isinstance(direction, str):
+            raise TypeError("Invalid direction type, should be string.")
+        if direction.lower() not in ["in", "out", "high", "low"]:
+            raise ValueError('Invalid direction, can be: "in", "out", "high", "low".')
+        gpio_path = "/sys/class/gpio/gpio{:d}".format(self.id)
+
+        if not os.path.isdir(gpio_path):
+            # Export the line
+            try:
+                with open("/sys/class/gpio/export", "w") as f_export:
+                    f_export.write("{:d}\n".format(self.id))
+            except IOError as e:
+                raise GPIOError(e.errno, "Exporting GPIO: " + e.strerror)
+
+            # Loop until GPIO is exported
+            exported = False
+            for i in range(self.GPIO_OPEN_RETRIES):
+                if os.path.isdir(gpio_path):
+                    exported = True
+                    break
+
+                time.sleep(self.GPIO_OPEN_DELAY)
+
+            if not exported:
+                raise TimeoutError(
+                    'Exporting GPIO: waiting for "{:s}" timed out'.format(gpio_path)
+                )
+
+            # Write direction, looping in case of EACCES errors due to delayed udev
+            # permission rule application after export
+            for i in range(self.GPIO_OPEN_RETRIES):
+                try:
+                    with open(os.path.join(gpio_path, "direction"), "w") as f_direction:
+                        f_direction.write(direction.lower() + "\n")
+                    break
+                except IOError as e:
+                    if e.errno != errno.EACCES or (
+                        e.errno == errno.EACCES and i == self.GPIO_OPEN_RETRIES - 1
+                    ):
+                        raise GPIOError(
+                            e.errno, "Setting GPIO direction: " + e.strerror
+                        )
+
+                time.sleep(self.GPIO_OPEN_DELAY)
+        else:
+            # Write direction
+            try:
+                with open(os.path.join(gpio_path, "direction"), "w") as f_direction:
+                    f_direction.write(direction.lower() + "\n")
+            except IOError as e:
+                raise GPIOError(e.errno, "Setting GPIO direction: " + e.strerror)
+
+        # Open value
+        try:
+            self._fd = os.open(os.path.join(gpio_path, "value"), os.O_RDWR)
+        except OSError as e:
+            raise GPIOError(e.errno, "Opening GPIO: " + e.strerror)
+
+        self._path = gpio_path
+
+    # pylint: enable=too-many-branches
+
+    def _close(self):
+        if self._fd is None:
+            return
+
+        try:
+            os.close(self._fd)
+        except OSError as e:
+            raise GPIOError(e.errno, "Closing GPIO: " + e.strerror)
+
+        self._fd = None
+
+        # Unexport the line
+        try:
+            unexport_fd = os.open("/sys/class/gpio/unexport", os.O_WRONLY)
+            os.write(unexport_fd, "{:d}\n".format(self.id).encode())
+            os.close(unexport_fd)
+        except OSError as e:
+            raise GPIOError(e.errno, "Unexporting GPIO: " + e.strerror)
+
+    def _read(self):
+        # Read value
+        try:
+            buf = os.read(self._fd, 2)
+        except OSError as e:
+            raise GPIOError(e.errno, "Reading GPIO: " + e.strerror)
+
+        # Rewind
+        try:
+            os.lseek(self._fd, 0, os.SEEK_SET)
+        except OSError as e:
+            raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror)
+
+        if buf[0] == b"0"[0]:
+            return False
+        if buf[0] == b"1"[0]:
+            return True
+
+        raise GPIOError(None, "Unknown GPIO value: {}".format(buf))
+
+    def _write(self, value):
+        if not isinstance(value, bool):
+            raise TypeError("Invalid value type, should be bool.")
+
+        # Write value
+        try:
+            if value:
+                os.write(self._fd, b"1\n")
+            else:
+                os.write(self._fd, b"0\n")
+        except OSError as e:
+            raise GPIOError(e.errno, "Writing GPIO: " + e.strerror)
+
+        # Rewind
+        try:
+            os.lseek(self._fd, 0, os.SEEK_SET)
+        except OSError as e:
+            raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror)
+
+    @property
+    def chip_name(self):
+        """Return the Chip Name"""
+        gpio_path = os.path.join(self._path, "device")
+
+        gpiochip_path = os.readlink(gpio_path)
+
+        if "/" not in gpiochip_path:
+            raise GPIOError(
+                None,
+                'Reading gpiochip name: invalid device symlink "{:s}"'.format(
+                    gpiochip_path
+                ),
+            )
+
+        return gpiochip_path.split("/")[-1]
+
+    @property
+    def chip_label(self):
+        """Return the Chip Label"""
+        gpio_path = "/sys/class/gpio/{:s}/label".format(self.chip_name)
+
+        try:
+            with open(gpio_path, "r") as f_label:
+                label = f_label.read()
+        except (GPIOError, IOError) as e:
+            if isinstance(e, IOError):
+                raise GPIOError(e.errno, "Reading gpiochip label: " + e.strerror)
+
+            raise GPIOError(None, "Reading gpiochip label: " + e.strerror)
+
+        return label.strip()
+
+    # Mutable properties
+
+    def _get_direction(self):
+        # Read direction
+        try:
+            with open(os.path.join(self._path, "direction"), "r") as f_direction:
+                direction = f_direction.read()
+        except IOError as e:
+            raise GPIOError(e.errno, "Getting GPIO direction: " + e.strerror)
+
+        return direction.strip()
+
+    def _set_direction(self, direction):
+        if not isinstance(direction, str):
+            raise TypeError("Invalid direction type, should be string.")
+        if direction.lower() not in ["in", "out", "high", "low"]:
+            raise ValueError('Invalid direction, can be: "in", "out", "high", "low".')
+
+        # Write direction
+        try:
+            with open(os.path.join(self._path, "direction"), "w") as f_direction:
+                f_direction.write(direction.lower() + "\n")
+        except IOError as e:
+            raise GPIOError(e.errno, "Setting GPIO direction: " + e.strerror)
+
+    direction = property(_get_direction, _set_direction)
+
+    def __str__(self):
+        try:
+            str_direction = self.direction
+        except GPIOError:
+            str_direction = "<error>"
+
+        try:
+            str_chip_name = self.chip_name
+        except GPIOError:
+            str_chip_name = "<error>"
+
+        try:
+            str_chip_label = self.chip_label
+        except GPIOError:
+            str_chip_label = "<error>"
+
+        output = "Pin {:d} (dev={:s}, fd={:d}, dir={:s}, chip_name='{:s}', chip_label='{:s}')"
+        return output.format(
+            self.id, self._path, self._fd, str_direction, str_chip_name, str_chip_label
+        )