1 # SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
3 # SPDX-License-Identifier: MIT
4 """Helper class for use with RP2040 running u2if firmware"""
5 # https://github.com/execuc/u2if
11 # Use to set delay between reset and device reopen. if negative, don't reset at all
12 RP2040_U2IF_RESET_DELAY = float(os.environ.get("RP2040_U2IF_RESET_DELAY", 1))
14 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
15 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
19 """Helper class for use with RP2040 running u2if firmware"""
39 I2C0_WRITE_FROM_UART = 0x84
40 I2C1_INIT = I2C0_INIT + 0x10
41 I2C1_DEINIT = I2C0_DEINIT + 0x10
42 I2C1_WRITE = I2C0_WRITE + 0x10
43 I2C1_READ = I2C0_READ + 0x10
44 I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
51 SPI0_WRITE_FROM_UART = 0x64
52 SPI1_INIT = SPI0_INIT + 0x10
53 SPI1_DEINIT = SPI0_DEINIT + 0x10
54 SPI1_WRITE = SPI0_WRITE + 0x10
55 SPI1_READ = SPI0_READ + 0x10
56 SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
68 PWM_SET_DUTY_U16 = 0x34
69 PWM_GET_DUTY_U16 = 0x35
70 PWM_SET_DUTY_NS = 0x36
71 PWM_GET_DUTY_NS = 0x37
78 self._i2c_index = None
79 self._spi_index = None
81 self._neopixel_initialized = False
82 self._uart_rx_buffer = None
84 def _hid_xfer(self, report, response=True):
85 """Perform HID Transfer"""
86 # first byte is report ID, which =0
87 # remaing bytes = 64 byte report data
88 # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
89 self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
91 # return is 64 byte response report
92 return self._hid.read(64)
96 self._hid_xfer(bytes([self.SYS_RESET]), False)
98 time.sleep(RP2040_U2IF_RESET_DELAY)
99 start = time.monotonic()
100 while time.monotonic() - start < 5:
102 self._hid.open(self._vid, self._pid)
107 raise OSError("RP2040 u2if open error.")
109 # ----------------------------------------------------------------
111 # ----------------------------------------------------------------
112 def open(self, vid, pid):
113 """Open HID interface for given USB VID and PID."""
119 self._hid = hid.device()
120 self._hid.open(self._vid, self._pid)
121 if RP2040_U2IF_RESET_DELAY >= 0:
125 # ----------------------------------------------------------------
127 # ----------------------------------------------------------------
128 def gpio_init_pin(self, pin_id, direction, pull):
129 """Configure GPIO Pin."""
141 def gpio_set_pin(self, pin_id, value):
142 """Set Current GPIO Pin Value"""
153 def gpio_get_pin(self, pin_id):
154 """Get Current GPIO Pin Value"""
155 resp = self._hid_xfer(
164 return resp[3] != 0x00
166 # ----------------------------------------------------------------
168 # ----------------------------------------------------------------
169 def adc_init_pin(self, pin_id):
170 """Configure ADC Pin."""
180 def adc_get_value(self, pin_id):
181 """Get ADC value for pin."""
182 resp = self._hid_xfer(
191 return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
193 # ----------------------------------------------------------------
195 # ----------------------------------------------------------------
196 def i2c_configure(self, baudrate, pullup=False):
198 if self._i2c_index is None:
199 raise RuntimeError("I2C bus not initialized.")
201 resp = self._hid_xfer(
204 self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
205 0x00 if not pullup else 0x01,
208 + baudrate.to_bytes(4, byteorder="little"),
211 if resp[1] != self.RESP_OK:
212 raise RuntimeError("I2C init error.")
214 def i2c_set_port(self, index):
216 if index not in (0, 1):
217 raise ValueError("I2C index must be 0 or 1.")
218 self._i2c_index = index
220 def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
221 """Write data from the buffer to an address"""
222 if self._i2c_index is None:
223 raise RuntimeError("I2C bus not initialized.")
225 end = end if end else len(buffer)
227 write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
228 stop_flag = 0x01 if stop else 0x00
230 while (end - start) > 0:
231 remain_bytes = end - start
232 chunk = min(remain_bytes, 64 - 7)
233 resp = self._hid_xfer(
234 bytes([write_cmd, address, stop_flag])
235 + remain_bytes.to_bytes(4, byteorder="little")
236 + buffer[start : (start + chunk)],
239 if resp[1] != self.RESP_OK:
240 raise RuntimeError("I2C write error")
243 def _i2c_read(self, address, buffer, start=0, end=None):
244 """Read data from an address and into the buffer"""
245 # TODO: support chunkified reads
246 if self._i2c_index is None:
247 raise RuntimeError("I2C bus not initialized.")
249 end = end if end else len(buffer)
251 read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
252 stop_flag = 0x01 # always stop
253 read_size = end - start
255 resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
256 if resp[1] != self.RESP_OK:
257 raise RuntimeError("I2C write error")
259 for i in range(read_size):
260 buffer[start + i] = resp[i + 2]
262 def i2c_writeto(self, address, buffer, *, start=0, end=None):
263 """Write data from the buffer to an address"""
264 self._i2c_write(address, buffer, start, end)
266 def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
267 """Read data from an address and into the buffer"""
268 self._i2c_read(address, buffer, start, end)
270 def i2c_writeto_then_readfrom(
281 """Write data from buffer_out to an address and then
282 read data from an address and into buffer_in
284 self._i2c_write(address, out_buffer, out_start, out_end, False)
285 self._i2c_read(address, in_buffer, in_start, in_end)
287 def i2c_scan(self, *, start=0, end=0x79):
288 """Perform an I2C Device Scan"""
289 if self._i2c_index is None:
290 raise RuntimeError("I2C bus not initialized.")
292 for addr in range(start, end + 1):
295 self.i2c_writeto(addr, b"\x00\x00\x00")
296 except RuntimeError: # no reply!
302 # ----------------------------------------------------------------
304 # ----------------------------------------------------------------
305 def spi_configure(self, baudrate):
307 if self._spi_index is None:
308 raise RuntimeError("SPI bus not initialized.")
310 resp = self._hid_xfer(
313 self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
314 0x00, # mode, not yet implemented
317 + baudrate.to_bytes(4, byteorder="little"),
320 if resp[1] != self.RESP_OK:
321 raise RuntimeError("SPI init error.")
323 def spi_set_port(self, index):
325 if index not in (0, 1):
326 raise ValueError("SPI index must be 0 or 1.")
327 self._spi_index = index
329 def spi_write(self, buffer, *, start=0, end=None):
331 if self._spi_index is None:
332 raise RuntimeError("SPI bus not initialized.")
334 end = end if end else len(buffer)
336 write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
338 while (end - start) > 0:
339 remain_bytes = end - start
340 chunk = min(remain_bytes, 64 - 3)
341 resp = self._hid_xfer(
342 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
344 if resp[1] != self.RESP_OK:
345 raise RuntimeError("SPI write error")
348 def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
350 if self._spi_index is None:
351 raise RuntimeError("SPI bus not initialized.")
353 end = end if end else len(buffer)
354 read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
355 read_size = end - start
357 resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
358 if resp[1] != self.RESP_OK:
359 raise RuntimeError("SPI write error")
361 for i in range(read_size):
362 buffer[start + i] = resp[i + 2]
364 def spi_write_readinto(
374 """SPI write and readinto."""
375 raise NotImplementedError("SPI write_readinto Not implemented")
377 # ----------------------------------------------------------------
379 # ----------------------------------------------------------------
380 def neopixel_write(self, gpio, buf):
381 """NeoPixel write."""
382 # open serial (data is sent over this)
383 if self._serial is None:
385 import serial.tools.list_ports
387 ports = serial.tools.list_ports.comports()
389 if port.vid == self._vid and port.pid == self._pid:
390 self._serial = serial.Serial(port.device)
392 if self._serial is None:
393 raise RuntimeError("Could not find Pico com port.")
396 if not self._neopixel_initialized:
397 # deinit any current setup
398 # pylint: disable=protected-access
399 self._hid_xfer(bytes([self.WS2812B_DEINIT]))
400 resp = self._hid_xfer(
409 if resp[1] != self.RESP_OK:
410 raise RuntimeError("Neopixel init error")
411 self._neopixel_initialized = True
413 self._serial.reset_output_buffer()
416 # command is done over HID
417 remain_bytes = len(buf)
418 resp = self._hid_xfer(
419 bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
422 if resp[1] != self.RESP_OK:
423 # pylint: disable=no-else-raise
426 "Neopixel write error : too many pixel for the firmware."
428 elif resp[2] == 0x02:
430 "Neopixel write error : transfer already in progress."
433 raise RuntimeError("Neopixel write error.")
434 # buffer is sent over serial
435 self._serial.write(buf)
437 if len(buf) % 64 == 0:
438 self._serial.write([0])
440 # polling loop to wait for write complete?
442 resp = self._hid.read(64)
443 while resp[0] != self.WS2812B_WRITE:
444 resp = self._hid.read(64)
445 if resp[1] != self.RESP_OK:
446 raise RuntimeError("Neopixel write (flush) error.")
448 # ----------------------------------------------------------------
450 # ----------------------------------------------------------------
451 # pylint: disable=unused-argument
452 def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
455 resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
456 if resp[1] != self.RESP_OK:
457 raise RuntimeError("PWM init error.")
459 self.pwm_set_frequency(pin, frequency)
460 self.pwm_set_duty_cycle(pin, duty_cycle)
462 def pwm_deinit(self, pin):
464 self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
466 def pwm_get_frequency(self, pin):
468 resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
469 if resp[1] != self.RESP_OK:
470 raise RuntimeError("PWM get frequency error.")
471 return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
473 def pwm_set_frequency(self, pin, frequency):
475 resp = self._hid_xfer(
476 bytes([self.PWM_SET_FREQ, pin.id])
477 + frequency.to_bytes(4, byteorder="little"),
480 if resp[1] != self.RESP_OK:
481 # pylint: disable=no-else-raise
483 raise RuntimeError("PWM different frequency on same slice.")
484 elif resp[3] == 0x02:
485 raise RuntimeError("PWM frequency too low.")
486 elif resp[3] == 0x03:
487 raise RuntimeError("PWM frequency too high.")
489 raise RuntimeError("PWM frequency error.")
491 def pwm_get_duty_cycle(self, pin):
492 """PWM get duty cycle."""
493 resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
494 if resp[1] != self.RESP_OK:
495 raise RuntimeError("PWM get duty cycle error.")
496 return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
498 def pwm_set_duty_cycle(self, pin, duty_cycle):
499 """PWM set duty cycle."""
500 resp = self._hid_xfer(
501 bytes([self.PWM_SET_DUTY_U16, pin.id])
502 + duty_cycle.to_bytes(2, byteorder="little"),
505 if resp[1] != self.RESP_OK:
506 raise RuntimeError("PWM set duty cycle error.")
509 rp2040_u2if = RP2040_u2if()