]> Repositories - hackapet/Adafruit_Blinka.git/blob - src/adafruit_blinka/microcontroller/rp2040_u2if/rp2040_u2if.py
Add support for Jetson Orin
[hackapet/Adafruit_Blinka.git] / src / adafruit_blinka / microcontroller / rp2040_u2if / rp2040_u2if.py
1 # SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
2 #
3 # SPDX-License-Identifier: MIT
4 """Helper class for use with RP2040 running u2if firmware"""
5 # https://github.com/execuc/u2if
6
7 import os
8 import time
9 import hid
10
11 # Use to set delay between reset and device reopen. if negative, don't reset at all
12 RP2040_U2IF_RESET_DELAY = float(os.environ.get("RP2040_U2IF_RESET_DELAY", 1))
13
14 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
15 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
16
17
18 class RP2040_u2if:
19     """Helper class for use with RP2040 running u2if firmware"""
20
21     # MISC
22     RESP_OK = 0x01
23     SYS_RESET = 0x10
24
25     # GPIO
26     GPIO_INIT_PIN = 0x20
27     GPIO_SET_VALUE = 0x21
28     GPIO_GET_VALUE = 0x22
29
30     # ADC
31     ADC_INIT_PIN = 0x40
32     ADC_GET_VALUE = 0x41
33
34     # I2C
35     I2C0_INIT = 0x80
36     I2C0_DEINIT = 0x81
37     I2C0_WRITE = 0x82
38     I2C0_READ = 0x83
39     I2C0_WRITE_FROM_UART = 0x84
40     I2C1_INIT = I2C0_INIT + 0x10
41     I2C1_DEINIT = I2C0_DEINIT + 0x10
42     I2C1_WRITE = I2C0_WRITE + 0x10
43     I2C1_READ = I2C0_READ + 0x10
44     I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
45
46     # SPI
47     SPI0_INIT = 0x60
48     SPI0_DEINIT = 0x61
49     SPI0_WRITE = 0x62
50     SPI0_READ = 0x63
51     SPI0_WRITE_FROM_UART = 0x64
52     SPI1_INIT = SPI0_INIT + 0x10
53     SPI1_DEINIT = SPI0_DEINIT + 0x10
54     SPI1_WRITE = SPI0_WRITE + 0x10
55     SPI1_READ = SPI0_READ + 0x10
56     SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
57
58     # WS2812B (LED)
59     WS2812B_INIT = 0xA0
60     WS2812B_DEINIT = 0xA1
61     WS2812B_WRITE = 0xA2
62
63     # PWM
64     PWM_INIT_PIN = 0x30
65     PWM_DEINIT_PIN = 0x31
66     PWM_SET_FREQ = 0x32
67     PWM_GET_FREQ = 0x33
68     PWM_SET_DUTY_U16 = 0x34
69     PWM_GET_DUTY_U16 = 0x35
70     PWM_SET_DUTY_NS = 0x36
71     PWM_GET_DUTY_NS = 0x37
72
73     def __init__(self):
74         self._vid = None
75         self._pid = None
76         self._hid = None
77         self._opened = False
78         self._i2c_index = None
79         self._spi_index = None
80         self._serial = None
81         self._neopixel_initialized = False
82         self._uart_rx_buffer = None
83
84     def _hid_xfer(self, report, response=True):
85         """Perform HID Transfer"""
86         # first byte is report ID, which =0
87         # remaing bytes = 64 byte report data
88         # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
89         self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
90         if response:
91             # return is 64 byte response report
92             return self._hid.read(64)
93         return None
94
95     def _reset(self):
96         self._hid_xfer(bytes([self.SYS_RESET]), False)
97         self._hid.close()
98         time.sleep(RP2040_U2IF_RESET_DELAY)
99         start = time.monotonic()
100         while time.monotonic() - start < 5:
101             try:
102                 self._hid.open(self._vid, self._pid)
103             except OSError:
104                 time.sleep(0.1)
105                 continue
106             return
107         raise OSError("RP2040 u2if open error.")
108
109     # ----------------------------------------------------------------
110     # MISC
111     # ----------------------------------------------------------------
112     def open(self, vid, pid):
113         """Open HID interface for given USB VID and PID."""
114
115         if self._opened:
116             return
117         self._vid = vid
118         self._pid = pid
119         self._hid = hid.device()
120         self._hid.open(self._vid, self._pid)
121         if RP2040_U2IF_RESET_DELAY >= 0:
122             self._reset()
123         self._opened = True
124
125     # ----------------------------------------------------------------
126     # GPIO
127     # ----------------------------------------------------------------
128     def gpio_init_pin(self, pin_id, direction, pull):
129         """Configure GPIO Pin."""
130         self._hid_xfer(
131             bytes(
132                 [
133                     self.GPIO_INIT_PIN,
134                     pin_id,
135                     direction,
136                     pull,
137                 ]
138             )
139         )
140
141     def gpio_set_pin(self, pin_id, value):
142         """Set Current GPIO Pin Value"""
143         self._hid_xfer(
144             bytes(
145                 [
146                     self.GPIO_SET_VALUE,
147                     pin_id,
148                     int(value),
149                 ]
150             )
151         )
152
153     def gpio_get_pin(self, pin_id):
154         """Get Current GPIO Pin Value"""
155         resp = self._hid_xfer(
156             bytes(
157                 [
158                     self.GPIO_GET_VALUE,
159                     pin_id,
160                 ]
161             ),
162             True,
163         )
164         return resp[3] != 0x00
165
166     # ----------------------------------------------------------------
167     # ADC
168     # ----------------------------------------------------------------
169     def adc_init_pin(self, pin_id):
170         """Configure ADC Pin."""
171         self._hid_xfer(
172             bytes(
173                 [
174                     self.ADC_INIT_PIN,
175                     pin_id,
176                 ]
177             )
178         )
179
180     def adc_get_value(self, pin_id):
181         """Get ADC value for pin."""
182         resp = self._hid_xfer(
183             bytes(
184                 [
185                     self.ADC_GET_VALUE,
186                     pin_id,
187                 ]
188             ),
189             True,
190         )
191         return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
192
193     # ----------------------------------------------------------------
194     # I2C
195     # ----------------------------------------------------------------
196     def i2c_configure(self, baudrate, pullup=False):
197         """Configure I2C."""
198         if self._i2c_index is None:
199             raise RuntimeError("I2C bus not initialized.")
200
201         resp = self._hid_xfer(
202             bytes(
203                 [
204                     self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
205                     0x00 if not pullup else 0x01,
206                 ]
207             )
208             + baudrate.to_bytes(4, byteorder="little"),
209             True,
210         )
211         if resp[1] != self.RESP_OK:
212             raise RuntimeError("I2C init error.")
213
214     def i2c_set_port(self, index):
215         """Set I2C port."""
216         if index not in (0, 1):
217             raise ValueError("I2C index must be 0 or 1.")
218         self._i2c_index = index
219
220     def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
221         """Write data from the buffer to an address"""
222         if self._i2c_index is None:
223             raise RuntimeError("I2C bus not initialized.")
224
225         end = end if end else len(buffer)
226
227         write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
228         stop_flag = 0x01 if stop else 0x00
229
230         while (end - start) > 0:
231             remain_bytes = end - start
232             chunk = min(remain_bytes, 64 - 7)
233             resp = self._hid_xfer(
234                 bytes([write_cmd, address, stop_flag])
235                 + remain_bytes.to_bytes(4, byteorder="little")
236                 + buffer[start : (start + chunk)],
237                 True,
238             )
239             if resp[1] != self.RESP_OK:
240                 raise RuntimeError("I2C write error")
241             start += chunk
242
243     def _i2c_read(self, address, buffer, start=0, end=None):
244         """Read data from an address and into the buffer"""
245         # TODO: support chunkified reads
246         if self._i2c_index is None:
247             raise RuntimeError("I2C bus not initialized.")
248
249         end = end if end else len(buffer)
250
251         read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
252         stop_flag = 0x01  # always stop
253         read_size = end - start
254
255         resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
256         if resp[1] != self.RESP_OK:
257             raise RuntimeError("I2C write error")
258         # move into buffer
259         for i in range(read_size):
260             buffer[start + i] = resp[i + 2]
261
262     def i2c_writeto(self, address, buffer, *, start=0, end=None):
263         """Write data from the buffer to an address"""
264         self._i2c_write(address, buffer, start, end)
265
266     def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
267         """Read data from an address and into the buffer"""
268         self._i2c_read(address, buffer, start, end)
269
270     def i2c_writeto_then_readfrom(
271         self,
272         address,
273         out_buffer,
274         in_buffer,
275         *,
276         out_start=0,
277         out_end=None,
278         in_start=0,
279         in_end=None,
280     ):
281         """Write data from buffer_out to an address and then
282         read data from an address and into buffer_in
283         """
284         self._i2c_write(address, out_buffer, out_start, out_end, False)
285         self._i2c_read(address, in_buffer, in_start, in_end)
286
287     def i2c_scan(self, *, start=0, end=0x79):
288         """Perform an I2C Device Scan"""
289         if self._i2c_index is None:
290             raise RuntimeError("I2C bus not initialized.")
291         found = []
292         for addr in range(start, end + 1):
293             # try a write
294             try:
295                 self.i2c_writeto(addr, b"\x00\x00\x00")
296             except RuntimeError:  # no reply!
297                 continue
298             # store if success
299             found.append(addr)
300         return found
301
302     # ----------------------------------------------------------------
303     # SPI
304     # ----------------------------------------------------------------
305     def spi_configure(self, baudrate):
306         """Configure SPI."""
307         if self._spi_index is None:
308             raise RuntimeError("SPI bus not initialized.")
309
310         resp = self._hid_xfer(
311             bytes(
312                 [
313                     self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
314                     0x00,  # mode, not yet implemented
315                 ]
316             )
317             + baudrate.to_bytes(4, byteorder="little"),
318             True,
319         )
320         if resp[1] != self.RESP_OK:
321             raise RuntimeError("SPI init error.")
322
323     def spi_set_port(self, index):
324         """Set SPI port."""
325         if index not in (0, 1):
326             raise ValueError("SPI index must be 0 or 1.")
327         self._spi_index = index
328
329     def spi_write(self, buffer, *, start=0, end=None):
330         """SPI write."""
331         if self._spi_index is None:
332             raise RuntimeError("SPI bus not initialized.")
333
334         end = end if end else len(buffer)
335
336         write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
337
338         while (end - start) > 0:
339             remain_bytes = end - start
340             chunk = min(remain_bytes, 64 - 3)
341             resp = self._hid_xfer(
342                 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
343             )
344             if resp[1] != self.RESP_OK:
345                 raise RuntimeError("SPI write error")
346             start += chunk
347
348     def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
349         """SPI readinto."""
350         if self._spi_index is None:
351             raise RuntimeError("SPI bus not initialized.")
352
353         end = end if end else len(buffer)
354         read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
355         read_size = end - start
356
357         resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
358         if resp[1] != self.RESP_OK:
359             raise RuntimeError("SPI write error")
360         # move into buffer
361         for i in range(read_size):
362             buffer[start + i] = resp[i + 2]
363
364     def spi_write_readinto(
365         self,
366         buffer_out,
367         buffer_in,
368         *,
369         out_start=0,
370         out_end=None,
371         in_start=0,
372         in_end=None,
373     ):
374         """SPI write and readinto."""
375         raise NotImplementedError("SPI write_readinto Not implemented")
376
377     # ----------------------------------------------------------------
378     # NEOPIXEL
379     # ----------------------------------------------------------------
380     def neopixel_write(self, gpio, buf):
381         """NeoPixel write."""
382         # open serial (data is sent over this)
383         if self._serial is None:
384             import serial
385             import serial.tools.list_ports
386
387             ports = serial.tools.list_ports.comports()
388             for port in ports:
389                 if port.vid == self._vid and port.pid == self._pid:
390                     self._serial = serial.Serial(port.device)
391                     break
392         if self._serial is None:
393             raise RuntimeError("Could not find Pico com port.")
394
395         # init
396         if not self._neopixel_initialized:
397             # deinit any current setup
398             # pylint: disable=protected-access
399             self._hid_xfer(bytes([self.WS2812B_DEINIT]))
400             resp = self._hid_xfer(
401                 bytes(
402                     [
403                         self.WS2812B_INIT,
404                         gpio._pin.id,
405                     ]
406                 ),
407                 True,
408             )
409             if resp[1] != self.RESP_OK:
410                 raise RuntimeError("Neopixel init error")
411             self._neopixel_initialized = True
412
413         self._serial.reset_output_buffer()
414
415         # write
416         # command is done over HID
417         remain_bytes = len(buf)
418         resp = self._hid_xfer(
419             bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
420             True,
421         )
422         if resp[1] != self.RESP_OK:
423             # pylint: disable=no-else-raise
424             if resp[2] == 0x01:
425                 raise RuntimeError(
426                     "Neopixel write error : too many pixel for the firmware."
427                 )
428             elif resp[2] == 0x02:
429                 raise RuntimeError(
430                     "Neopixel write error : transfer already in progress."
431                 )
432             else:
433                 raise RuntimeError("Neopixel write error.")
434         # buffer is sent over serial
435         self._serial.write(buf)
436         # hack (see u2if)
437         if len(buf) % 64 == 0:
438             self._serial.write([0])
439         self._serial.flush()
440         # polling loop to wait for write complete?
441         time.sleep(0.1)
442         resp = self._hid.read(64)
443         while resp[0] != self.WS2812B_WRITE:
444             resp = self._hid.read(64)
445         if resp[1] != self.RESP_OK:
446             raise RuntimeError("Neopixel write (flush) error.")
447
448     # ----------------------------------------------------------------
449     # PWM
450     # ----------------------------------------------------------------
451     # pylint: disable=unused-argument
452     def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
453         """Configure PWM."""
454         self.pwm_deinit(pin)
455         resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
456         if resp[1] != self.RESP_OK:
457             raise RuntimeError("PWM init error.")
458
459         self.pwm_set_frequency(pin, frequency)
460         self.pwm_set_duty_cycle(pin, duty_cycle)
461
462     def pwm_deinit(self, pin):
463         """Deinit PWM."""
464         self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
465
466     def pwm_get_frequency(self, pin):
467         """PWM get freq."""
468         resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
469         if resp[1] != self.RESP_OK:
470             raise RuntimeError("PWM get frequency error.")
471         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
472
473     def pwm_set_frequency(self, pin, frequency):
474         """PWM set freq."""
475         resp = self._hid_xfer(
476             bytes([self.PWM_SET_FREQ, pin.id])
477             + frequency.to_bytes(4, byteorder="little"),
478             True,
479         )
480         if resp[1] != self.RESP_OK:
481             # pylint: disable=no-else-raise
482             if resp[3] == 0x01:
483                 raise RuntimeError("PWM different frequency on same slice.")
484             elif resp[3] == 0x02:
485                 raise RuntimeError("PWM frequency too low.")
486             elif resp[3] == 0x03:
487                 raise RuntimeError("PWM frequency too high.")
488             else:
489                 raise RuntimeError("PWM frequency error.")
490
491     def pwm_get_duty_cycle(self, pin):
492         """PWM get duty cycle."""
493         resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
494         if resp[1] != self.RESP_OK:
495             raise RuntimeError("PWM get duty cycle error.")
496         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
497
498     def pwm_set_duty_cycle(self, pin, duty_cycle):
499         """PWM set duty cycle."""
500         resp = self._hid_xfer(
501             bytes([self.PWM_SET_DUTY_U16, pin.id])
502             + duty_cycle.to_bytes(2, byteorder="little"),
503             True,
504         )
505         if resp[1] != self.RESP_OK:
506             raise RuntimeError("PWM set duty cycle error.")
507
508
509 rp2040_u2if = RP2040_u2if()