]> Repositories - hackapet/Adafruit_Blinka.git/blob - src/adafruit_blinka/microcontroller/generic_linux/sysfs_pin.py
Merge pull request #400 from luxarf/fix/rockpis-pins
[hackapet/Adafruit_Blinka.git] / src / adafruit_blinka / microcontroller / generic_linux / sysfs_pin.py
1 """
2 Much code from https://github.com/vsergeev/python-periphery/blob/master/periphery/gpio.py
3 Copyright (c) 2015-2019 vsergeev / Ivan (Vanya) A. Sergeev
4 License: MIT
5 """
6
7 import os
8 import errno
9 import time
10
11 # pylint: disable=unnecessary-pass
12 class GPIOError(IOError):
13     """Base class for GPIO errors."""
14
15     pass
16
17
18 # pylint: enable=unnecessary-pass
19
20
21 class Pin:
22     """SysFS GPIO Pin Class"""
23
24     # Number of retries to check for GPIO export or direction write on open
25     GPIO_OPEN_RETRIES = 10
26     # Delay between check for GPIO export or direction write on open (100ms)
27     GPIO_OPEN_DELAY = 0.1
28
29     IN = "in"
30     OUT = "out"
31     LOW = 0
32     HIGH = 1
33     PULL_NONE = 0
34     PULL_UP = 1
35     PULL_DOWN = 2
36
37     id = None
38     _value = LOW
39     _mode = IN
40
41     # Sysfs paths
42     _sysfs_path = "/sys/class/gpio/"
43     _channel_path = "gpiochip{}"
44
45     # Channel paths
46     _export_path = "export"
47     _unexport_path = "unexport"
48     _pin_path = "gpio{}"
49
50     def __init__(self, pin_id):
51         """Instantiate a Pin object and open the sysfs GPIO with the specified
52         pin number.
53
54         Args:
55             pin_id (int): GPIO pin number.
56
57         Returns:
58             SysfsGPIO: GPIO object.
59
60         Raises:
61             GPIOError: if an I/O or OS error occurs.
62             TypeError: if `line` or `direction`  types are invalid.
63             ValueError: if `direction` value is invalid.
64             TimeoutError: if waiting for GPIO export times out.
65
66         """
67
68         if not isinstance(pin_id, int):
69             raise TypeError("Invalid Pin ID, should be integer.")
70
71         self.id = pin_id
72         self._fd = None
73         self._line = None
74         self._path = None
75
76     def __del__(self):
77         self._close()
78
79     def __enter__(self):
80         return self
81
82     def __exit__(self, t, value, traceback):
83         self._close()
84
85     def init(self, mode=IN, pull=None):
86         """Initialize the Pin"""
87         if mode is not None:
88             if mode == self.IN:
89                 self._mode = self.IN
90                 self._open(self.IN)
91             elif mode == self.OUT:
92                 self._mode = self.OUT
93                 self._open(self.OUT)
94             else:
95                 raise RuntimeError("Invalid mode for pin: %s" % self.id)
96
97             if pull is not None:
98                 if pull == self.PULL_UP:
99                     raise NotImplementedError(
100                         "Internal pullups not supported in periphery, "
101                         "use physical resistor instead!"
102                     )
103                 if pull == self.PULL_DOWN:
104                     raise NotImplementedError(
105                         "Internal pulldowns not supported in periphery, "
106                         "use physical resistor instead!"
107                     )
108                 raise RuntimeError("Invalid pull for pin: %s" % self.id)
109
110     def value(self, val=None):
111         """Set or return the Pin Value"""
112         if val is not None:
113             if val == self.LOW:
114                 self._value = val
115                 self._write(False)
116                 return None
117             if val == self.HIGH:
118                 self._value = val
119                 self._write(True)
120                 return None
121             raise RuntimeError("Invalid value for pin")
122         return self.HIGH if self._read() else self.LOW
123
124     # pylint: disable=too-many-branches
125     def _open(self, direction):
126         if not isinstance(direction, str):
127             raise TypeError("Invalid direction type, should be string.")
128         if direction.lower() not in ["in", "out", "high", "low"]:
129             raise ValueError('Invalid direction, can be: "in", "out", "high", "low".')
130         gpio_path = "/sys/class/gpio/gpio{:d}".format(self.id)
131
132         if not os.path.isdir(gpio_path):
133             # Export the line
134             try:
135                 with open("/sys/class/gpio/export", "w") as f_export:
136                     f_export.write("{:d}\n".format(self.id))
137             except IOError as e:
138                 raise GPIOError(e.errno, "Exporting GPIO: " + e.strerror) from IOError
139
140             # Loop until GPIO is exported
141             exported = False
142             for i in range(self.GPIO_OPEN_RETRIES):
143                 if os.path.isdir(gpio_path):
144                     exported = True
145                     break
146
147                 time.sleep(self.GPIO_OPEN_DELAY)
148
149             if not exported:
150                 raise TimeoutError(
151                     'Exporting GPIO: waiting for "{:s}" timed out'.format(gpio_path)
152                 )
153
154             # Write direction, looping in case of EACCES errors due to delayed udev
155             # permission rule application after export
156             for i in range(self.GPIO_OPEN_RETRIES):
157                 try:
158                     with open(os.path.join(gpio_path, "direction"), "w") as f_direction:
159                         f_direction.write(direction.lower() + "\n")
160                     break
161                 except IOError as e:
162                     if e.errno != errno.EACCES or (
163                         e.errno == errno.EACCES and i == self.GPIO_OPEN_RETRIES - 1
164                     ):
165                         raise GPIOError(
166                             e.errno, "Setting GPIO direction: " + e.strerror
167                         ) from IOError
168
169                 time.sleep(self.GPIO_OPEN_DELAY)
170         else:
171             # Write direction
172             try:
173                 with open(os.path.join(gpio_path, "direction"), "w") as f_direction:
174                     f_direction.write(direction.lower() + "\n")
175             except IOError as e:
176                 raise GPIOError(
177                     e.errno, "Setting GPIO direction: " + e.strerror
178                 ) from IOError
179
180         # Open value
181         try:
182             self._fd = os.open(os.path.join(gpio_path, "value"), os.O_RDWR)
183         except OSError as e:
184             raise GPIOError(e.errno, "Opening GPIO: " + e.strerror) from OSError
185
186         self._path = gpio_path
187
188     # pylint: enable=too-many-branches
189
190     def _close(self):
191         if self._fd is None:
192             return
193
194         try:
195             os.close(self._fd)
196         except OSError as e:
197             raise GPIOError(e.errno, "Closing GPIO: " + e.strerror) from OSError
198
199         self._fd = None
200
201         # Unexport the line
202         try:
203             unexport_fd = os.open("/sys/class/gpio/unexport", os.O_WRONLY)
204             os.write(unexport_fd, "{:d}\n".format(self.id).encode())
205             os.close(unexport_fd)
206         except OSError as e:
207             raise GPIOError(e.errno, "Unexporting GPIO: " + e.strerror) from OSError
208
209     def _read(self):
210         # Read value
211         try:
212             buf = os.read(self._fd, 2)
213         except OSError as e:
214             raise GPIOError(e.errno, "Reading GPIO: " + e.strerror) from OSError
215
216         # Rewind
217         try:
218             os.lseek(self._fd, 0, os.SEEK_SET)
219         except OSError as e:
220             raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror) from OSError
221
222         if buf[0] == b"0"[0]:
223             return False
224         if buf[0] == b"1"[0]:
225             return True
226
227         raise GPIOError(None, "Unknown GPIO value: {}".format(buf))
228
229     def _write(self, value):
230         if not isinstance(value, bool):
231             raise TypeError("Invalid value type, should be bool.")
232
233         # Write value
234         try:
235             if value:
236                 os.write(self._fd, b"1\n")
237             else:
238                 os.write(self._fd, b"0\n")
239         except OSError as e:
240             raise GPIOError(e.errno, "Writing GPIO: " + e.strerror) from OSError
241
242         # Rewind
243         try:
244             os.lseek(self._fd, 0, os.SEEK_SET)
245         except OSError as e:
246             raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror) from OSError
247
248     @property
249     def chip_name(self):
250         """Return the Chip Name"""
251         gpio_path = os.path.join(self._path, "device")
252
253         gpiochip_path = os.readlink(gpio_path)
254
255         if "/" not in gpiochip_path:
256             raise GPIOError(
257                 None,
258                 'Reading gpiochip name: invalid device symlink "{:s}"'.format(
259                     gpiochip_path
260                 ),
261             )
262
263         return gpiochip_path.split("/")[-1]
264
265     @property
266     def chip_label(self):
267         """Return the Chip Label"""
268         gpio_path = "/sys/class/gpio/{:s}/label".format(self.chip_name)
269
270         try:
271             with open(gpio_path, "r") as f_label:
272                 label = f_label.read()
273         except (GPIOError, IOError) as e:
274             if isinstance(e, IOError):
275                 raise GPIOError(
276                     e.errno, "Reading gpiochip label: " + e.strerror
277                 ) from IOError
278
279             raise GPIOError(
280                 None, "Reading gpiochip label: " + e.strerror
281             ) from GPIOError
282
283         return label.strip()
284
285     # Mutable properties
286
287     def _get_direction(self):
288         # Read direction
289         try:
290             with open(os.path.join(self._path, "direction"), "r") as f_direction:
291                 direction = f_direction.read()
292         except IOError as e:
293             raise GPIOError(
294                 e.errno, "Getting GPIO direction: " + e.strerror
295             ) from IOError
296
297         return direction.strip()
298
299     def _set_direction(self, direction):
300         if not isinstance(direction, str):
301             raise TypeError("Invalid direction type, should be string.")
302         if direction.lower() not in ["in", "out", "high", "low"]:
303             raise ValueError('Invalid direction, can be: "in", "out", "high", "low".')
304
305         # Write direction
306         try:
307             with open(os.path.join(self._path, "direction"), "w") as f_direction:
308                 f_direction.write(direction.lower() + "\n")
309         except IOError as e:
310             raise GPIOError(
311                 e.errno, "Setting GPIO direction: " + e.strerror
312             ) from IOError
313
314     direction = property(_get_direction, _set_direction)
315
316     def __str__(self):
317         try:
318             str_direction = self.direction
319         except GPIOError:
320             str_direction = "<error>"
321
322         try:
323             str_chip_name = self.chip_name
324         except GPIOError:
325             str_chip_name = "<error>"
326
327         try:
328             str_chip_label = self.chip_label
329         except GPIOError:
330             str_chip_label = "<error>"
331
332         output = "Pin {:d} (dev={:s}, fd={:d}, dir={:s}, chip_name='{:s}', chip_label='{:s}')"
333         return output.format(
334             self.id, self._path, self._fd, str_direction, str_chip_name, str_chip_label
335         )