]> Repositories - hackapet/Adafruit_Blinka.git/blob - src/adafruit_blinka/microcontroller/rp2040_u2if/rp2040_u2if.py
Update __init__.py
[hackapet/Adafruit_Blinka.git] / src / adafruit_blinka / microcontroller / rp2040_u2if / rp2040_u2if.py
1 """Helper class for use with RP2040 running u2if firmware"""
2 # https://github.com/execuc/u2if
3
4 import os
5 import time
6 import hid
7
8 # Use to set delay between reset and device reopen. if negative, don't reset at all
9 RP2040_U2IF_RESET_DELAY = float(os.environ.get("RP2040_U2IF_RESET_DELAY", 1))
10
11 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
12 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
13
14
15 class RP2040_u2if:
16     """Helper class for use with RP2040 running u2if firmware"""
17
18     # MISC
19     RESP_OK = 0x01
20     SYS_RESET = 0x10
21
22     # GPIO
23     GPIO_INIT_PIN = 0x20
24     GPIO_SET_VALUE = 0x21
25     GPIO_GET_VALUE = 0x22
26
27     # ADC
28     ADC_INIT_PIN = 0x40
29     ADC_GET_VALUE = 0x41
30
31     # I2C
32     I2C0_INIT = 0x80
33     I2C0_DEINIT = 0x81
34     I2C0_WRITE = 0x82
35     I2C0_READ = 0x83
36     I2C0_WRITE_FROM_UART = 0x84
37     I2C1_INIT = I2C0_INIT + 0x10
38     I2C1_DEINIT = I2C0_DEINIT + 0x10
39     I2C1_WRITE = I2C0_WRITE + 0x10
40     I2C1_READ = I2C0_READ + 0x10
41     I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
42
43     # SPI
44     SPI0_INIT = 0x60
45     SPI0_DEINIT = 0x61
46     SPI0_WRITE = 0x62
47     SPI0_READ = 0x63
48     SPI0_WRITE_FROM_UART = 0x64
49     SPI1_INIT = SPI0_INIT + 0x10
50     SPI1_DEINIT = SPI0_DEINIT + 0x10
51     SPI1_WRITE = SPI0_WRITE + 0x10
52     SPI1_READ = SPI0_READ + 0x10
53     SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
54
55     # WS2812B (LED)
56     WS2812B_INIT = 0xA0
57     WS2812B_DEINIT = 0xA1
58     WS2812B_WRITE = 0xA2
59
60     # PWM
61     PWM_INIT_PIN = 0x30
62     PWM_DEINIT_PIN = 0x31
63     PWM_SET_FREQ = 0x32
64     PWM_GET_FREQ = 0x33
65     PWM_SET_DUTY_U16 = 0x34
66     PWM_GET_DUTY_U16 = 0x35
67     PWM_SET_DUTY_NS = 0x36
68     PWM_GET_DUTY_NS = 0x37
69
70     def __init__(self):
71         self._vid = None
72         self._pid = None
73         self._hid = None
74         self._opened = False
75         self._i2c_index = None
76         self._spi_index = None
77         self._serial = None
78         self._neopixel_initialized = False
79         self._uart_rx_buffer = None
80
81     def _hid_xfer(self, report, response=True):
82         """Perform HID Transfer"""
83         # first byte is report ID, which =0
84         # remaing bytes = 64 byte report data
85         # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
86         self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
87         if response:
88             # return is 64 byte response report
89             return self._hid.read(64)
90         return None
91
92     def _reset(self):
93         self._hid_xfer(bytes([self.SYS_RESET]), False)
94         time.sleep(RP2040_U2IF_RESET_DELAY)
95         start = time.monotonic()
96         while time.monotonic() - start < 5:
97             try:
98                 self._hid.open(self._vid, self._pid)
99             except OSError:
100                 time.sleep(0.1)
101                 continue
102             return
103         raise OSError("RP2040 u2if open error.")
104
105     # ----------------------------------------------------------------
106     # MISC
107     # ----------------------------------------------------------------
108     def open(self, vid, pid):
109         """Open HID interface for given USB VID and PID."""
110
111         if self._opened:
112             return
113         self._vid = vid
114         self._pid = pid
115         self._hid = hid.device()
116         self._hid.open(self._vid, self._pid)
117         if RP2040_U2IF_RESET_DELAY >= 0:
118             self._reset()
119         self._opened = True
120
121     # ----------------------------------------------------------------
122     # GPIO
123     # ----------------------------------------------------------------
124     def gpio_init_pin(self, pin_id, direction, pull):
125         """Configure GPIO Pin."""
126         self._hid_xfer(
127             bytes(
128                 [
129                     self.GPIO_INIT_PIN,
130                     pin_id,
131                     direction,
132                     pull,
133                 ]
134             )
135         )
136
137     def gpio_set_pin(self, pin_id, value):
138         """Set Current GPIO Pin Value"""
139         self._hid_xfer(
140             bytes(
141                 [
142                     self.GPIO_SET_VALUE,
143                     pin_id,
144                     int(value),
145                 ]
146             )
147         )
148
149     def gpio_get_pin(self, pin_id):
150         """Get Current GPIO Pin Value"""
151         resp = self._hid_xfer(
152             bytes(
153                 [
154                     self.GPIO_GET_VALUE,
155                     pin_id,
156                 ]
157             ),
158             True,
159         )
160         return resp[3] != 0x00
161
162     # ----------------------------------------------------------------
163     # ADC
164     # ----------------------------------------------------------------
165     def adc_init_pin(self, pin_id):
166         """Configure ADC Pin."""
167         self._hid_xfer(
168             bytes(
169                 [
170                     self.ADC_INIT_PIN,
171                     pin_id,
172                 ]
173             )
174         )
175
176     def adc_get_value(self, pin_id):
177         """Get ADC value for pin."""
178         resp = self._hid_xfer(
179             bytes(
180                 [
181                     self.ADC_GET_VALUE,
182                     pin_id,
183                 ]
184             ),
185             True,
186         )
187         return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
188
189     # ----------------------------------------------------------------
190     # I2C
191     # ----------------------------------------------------------------
192     def i2c_configure(self, baudrate, pullup=False):
193         """Configure I2C."""
194         if self._i2c_index is None:
195             raise RuntimeError("I2C bus not initialized.")
196
197         resp = self._hid_xfer(
198             bytes(
199                 [
200                     self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
201                     0x00 if not pullup else 0x01,
202                 ]
203             )
204             + baudrate.to_bytes(4, byteorder="little"),
205             True,
206         )
207         if resp[1] != self.RESP_OK:
208             raise RuntimeError("I2C init error.")
209
210     def i2c_set_port(self, index):
211         """Set I2C port."""
212         if index not in (0, 1):
213             raise ValueError("I2C index must be 0 or 1.")
214         self._i2c_index = index
215
216     def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
217         """Write data from the buffer to an address"""
218         if self._i2c_index is None:
219             raise RuntimeError("I2C bus not initialized.")
220
221         end = end if end else len(buffer)
222
223         write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
224         stop_flag = 0x01 if stop else 0x00
225
226         while (end - start) > 0:
227             remain_bytes = end - start
228             chunk = min(remain_bytes, 64 - 7)
229             resp = self._hid_xfer(
230                 bytes([write_cmd, address, stop_flag])
231                 + remain_bytes.to_bytes(4, byteorder="little")
232                 + buffer[start : (start + chunk)],
233                 True,
234             )
235             if resp[1] != self.RESP_OK:
236                 raise RuntimeError("I2C write error")
237             start += chunk
238
239     def _i2c_read(self, address, buffer, start=0, end=None):
240         """Read data from an address and into the buffer"""
241         # TODO: support chunkified reads
242         if self._i2c_index is None:
243             raise RuntimeError("I2C bus not initialized.")
244
245         end = end if end else len(buffer)
246
247         read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
248         stop_flag = 0x01  # always stop
249         read_size = end - start
250
251         resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
252         if resp[1] != self.RESP_OK:
253             raise RuntimeError("I2C write error")
254         # move into buffer
255         for i in range(read_size):
256             buffer[start + i] = resp[i + 2]
257
258     def i2c_writeto(self, address, buffer, *, start=0, end=None):
259         """Write data from the buffer to an address"""
260         self._i2c_write(address, buffer, start, end)
261
262     def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
263         """Read data from an address and into the buffer"""
264         self._i2c_read(address, buffer, start, end)
265
266     def i2c_writeto_then_readfrom(
267         self,
268         address,
269         out_buffer,
270         in_buffer,
271         *,
272         out_start=0,
273         out_end=None,
274         in_start=0,
275         in_end=None
276     ):
277         """Write data from buffer_out to an address and then
278         read data from an address and into buffer_in
279         """
280         self._i2c_write(address, out_buffer, out_start, out_end, False)
281         self._i2c_read(address, in_buffer, in_start, in_end)
282
283     def i2c_scan(self, *, start=0, end=0x79):
284         """Perform an I2C Device Scan"""
285         if self._i2c_index is None:
286             raise RuntimeError("I2C bus not initialized.")
287         found = []
288         for addr in range(start, end + 1):
289             # try a write
290             try:
291                 self.i2c_writeto(addr, b"\x00\x00\x00")
292             except RuntimeError:  # no reply!
293                 continue
294             # store if success
295             found.append(addr)
296         return found
297
298     # ----------------------------------------------------------------
299     # SPI
300     # ----------------------------------------------------------------
301     def spi_configure(self, baudrate):
302         """Configure SPI."""
303         if self._spi_index is None:
304             raise RuntimeError("SPI bus not initialized.")
305
306         resp = self._hid_xfer(
307             bytes(
308                 [
309                     self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
310                     0x00,  # mode, not yet implemented
311                 ]
312             )
313             + baudrate.to_bytes(4, byteorder="little"),
314             True,
315         )
316         if resp[1] != self.RESP_OK:
317             raise RuntimeError("SPI init error.")
318
319     def spi_set_port(self, index):
320         """Set SPI port."""
321         if index not in (0, 1):
322             raise ValueError("SPI index must be 0 or 1.")
323         self._spi_index = index
324
325     def spi_write(self, buffer, *, start=0, end=None):
326         """SPI write."""
327         if self._spi_index is None:
328             raise RuntimeError("SPI bus not initialized.")
329
330         end = end if end else len(buffer)
331
332         write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
333
334         while (end - start) > 0:
335             remain_bytes = end - start
336             chunk = min(remain_bytes, 64 - 3)
337             resp = self._hid_xfer(
338                 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
339             )
340             if resp[1] != self.RESP_OK:
341                 raise RuntimeError("SPI write error")
342             start += chunk
343
344     def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
345         """SPI readinto."""
346         if self._spi_index is None:
347             raise RuntimeError("SPI bus not initialized.")
348
349         end = end if end else len(buffer)
350         read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
351         read_size = end - start
352
353         resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
354         if resp[1] != self.RESP_OK:
355             raise RuntimeError("SPI write error")
356         # move into buffer
357         for i in range(read_size):
358             buffer[start + i] = resp[i + 2]
359
360     def spi_write_readinto(
361         self,
362         buffer_out,
363         buffer_in,
364         *,
365         out_start=0,
366         out_end=None,
367         in_start=0,
368         in_end=None
369     ):
370         """SPI write and readinto."""
371         raise NotImplementedError("SPI write_readinto Not implemented")
372
373     # ----------------------------------------------------------------
374     # NEOPIXEL
375     # ----------------------------------------------------------------
376     def neopixel_write(self, gpio, buf):
377         """NeoPixel write."""
378         # open serial (data is sent over this)
379         if self._serial is None:
380             import serial
381             import serial.tools.list_ports
382
383             ports = serial.tools.list_ports.comports()
384             for port in ports:
385                 if port.vid == self._vid and port.pid == self._pid:
386                     self._serial = serial.Serial(port.device)
387                     break
388         if self._serial is None:
389             raise RuntimeError("Could not find Pico com port.")
390
391         # init
392         if not self._neopixel_initialized:
393             # deinit any current setup
394             # pylint: disable=protected-access
395             self._hid_xfer(bytes([self.WS2812B_DEINIT]))
396             resp = self._hid_xfer(
397                 bytes(
398                     [
399                         self.WS2812B_INIT,
400                         gpio._pin.id,
401                     ]
402                 ),
403                 True,
404             )
405             if resp[1] != self.RESP_OK:
406                 raise RuntimeError("Neopixel init error")
407             self._neopixel_initialized = True
408
409         self._serial.reset_output_buffer()
410
411         # write
412         # command is done over HID
413         remain_bytes = len(buf)
414         resp = self._hid_xfer(
415             bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
416             True,
417         )
418         if resp[1] != self.RESP_OK:
419             # pylint: disable=no-else-raise
420             if resp[2] == 0x01:
421                 raise RuntimeError(
422                     "Neopixel write error : too many pixel for the firmware."
423                 )
424             elif resp[2] == 0x02:
425                 raise RuntimeError(
426                     "Neopixel write error : transfer already in progress."
427                 )
428             else:
429                 raise RuntimeError("Neopixel write error.")
430         # buffer is sent over serial
431         self._serial.write(buf)
432         # hack (see u2if)
433         if len(buf) % 64 == 0:
434             self._serial.write([0])
435         self._serial.flush()
436         # polling loop to wait for write complete?
437         time.sleep(0.1)
438         resp = self._hid.read(64)
439         while resp[0] != self.WS2812B_WRITE:
440             resp = self._hid.read(64)
441         if resp[1] != self.RESP_OK:
442             raise RuntimeError("Neopixel write (flush) error.")
443
444     # ----------------------------------------------------------------
445     # PWM
446     # ----------------------------------------------------------------
447     # pylint: disable=unused-argument
448     def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
449         """Configure PWM."""
450         self.pwm_deinit(pin)
451         resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
452         if resp[1] != self.RESP_OK:
453             raise RuntimeError("PWM init error.")
454
455         self.pwm_set_frequency(pin, frequency)
456         self.pwm_set_duty_cycle(pin, duty_cycle)
457
458     def pwm_deinit(self, pin):
459         """Deinit PWM."""
460         self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
461
462     def pwm_get_frequency(self, pin):
463         """PWM get freq."""
464         resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
465         if resp[1] != self.RESP_OK:
466             raise RuntimeError("PWM get frequency error.")
467         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
468
469     def pwm_set_frequency(self, pin, frequency):
470         """PWM set freq."""
471         resp = self._hid_xfer(
472             bytes([self.PWM_SET_FREQ, pin.id])
473             + frequency.to_bytes(4, byteorder="little"),
474             True,
475         )
476         if resp[1] != self.RESP_OK:
477             # pylint: disable=no-else-raise
478             if resp[3] == 0x01:
479                 raise RuntimeError("PWM different frequency on same slice.")
480             elif resp[3] == 0x02:
481                 raise RuntimeError("PWM frequency too low.")
482             elif resp[3] == 0x03:
483                 raise RuntimeError("PWM frequency too high.")
484             else:
485                 raise RuntimeError("PWM frequency error.")
486
487     def pwm_get_duty_cycle(self, pin):
488         """PWM get duty cycle."""
489         resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
490         if resp[1] != self.RESP_OK:
491             raise RuntimeError("PWM get duty cycle error.")
492         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
493
494     def pwm_set_duty_cycle(self, pin, duty_cycle):
495         """PWM set duty cycle."""
496         resp = self._hid_xfer(
497             bytes([self.PWM_SET_DUTY_U16, pin.id])
498             + duty_cycle.to_bytes(2, byteorder="little"),
499             True,
500         )
501         if resp[1] != self.RESP_OK:
502             raise RuntimeError("PWM set duty cycle error.")
503
504
505 rp2040_u2if = RP2040_u2if()