1 # SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
3 # SPDX-License-Identifier: MIT
4 """Helper class for use with RP2040 running u2if firmware"""
5 # https://github.com/execuc/u2if
11 # Use to set delay between reset and device reopen. if negative, don't reset at all
12 RP2040_U2IF_RESET_DELAY = float(os.environ.get("RP2040_U2IF_RESET_DELAY", 1))
14 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
15 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
19 """Helper class for use with RP2040 running u2if firmware"""
39 I2C0_WRITE_FROM_UART = 0x84
40 I2C1_INIT = I2C0_INIT + 0x10
41 I2C1_DEINIT = I2C0_DEINIT + 0x10
42 I2C1_WRITE = I2C0_WRITE + 0x10
43 I2C1_READ = I2C0_READ + 0x10
44 I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
51 SPI0_WRITE_FROM_UART = 0x64
52 SPI1_INIT = SPI0_INIT + 0x10
53 SPI1_DEINIT = SPI0_DEINIT + 0x10
54 SPI1_WRITE = SPI0_WRITE + 0x10
55 SPI1_READ = SPI0_READ + 0x10
56 SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
68 PWM_SET_DUTY_U16 = 0x34
69 PWM_GET_DUTY_U16 = 0x35
70 PWM_SET_DUTY_NS = 0x36
71 PWM_GET_DUTY_NS = 0x37
78 self._i2c_index = None
79 self._spi_index = None
81 self._neopixel_initialized = False
82 self._uart_rx_buffer = None
84 def _hid_xfer(self, report, response=True):
85 """Perform HID Transfer"""
86 # first byte is report ID, which =0
87 # remaing bytes = 64 byte report data
88 # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
89 self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
91 # return is 64 byte response report
92 return self._hid.read(64)
96 self._hid_xfer(bytes([self.SYS_RESET]), False)
97 time.sleep(RP2040_U2IF_RESET_DELAY)
98 start = time.monotonic()
99 while time.monotonic() - start < 5:
101 self._hid.open(self._vid, self._pid)
106 raise OSError("RP2040 u2if open error.")
108 # ----------------------------------------------------------------
110 # ----------------------------------------------------------------
111 def open(self, vid, pid):
112 """Open HID interface for given USB VID and PID."""
118 self._hid = hid.device()
119 self._hid.open(self._vid, self._pid)
120 if RP2040_U2IF_RESET_DELAY >= 0:
124 # ----------------------------------------------------------------
126 # ----------------------------------------------------------------
127 def gpio_init_pin(self, pin_id, direction, pull):
128 """Configure GPIO Pin."""
140 def gpio_set_pin(self, pin_id, value):
141 """Set Current GPIO Pin Value"""
152 def gpio_get_pin(self, pin_id):
153 """Get Current GPIO Pin Value"""
154 resp = self._hid_xfer(
163 return resp[3] != 0x00
165 # ----------------------------------------------------------------
167 # ----------------------------------------------------------------
168 def adc_init_pin(self, pin_id):
169 """Configure ADC Pin."""
179 def adc_get_value(self, pin_id):
180 """Get ADC value for pin."""
181 resp = self._hid_xfer(
190 return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
192 # ----------------------------------------------------------------
194 # ----------------------------------------------------------------
195 def i2c_configure(self, baudrate, pullup=False):
197 if self._i2c_index is None:
198 raise RuntimeError("I2C bus not initialized.")
200 resp = self._hid_xfer(
203 self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
204 0x00 if not pullup else 0x01,
207 + baudrate.to_bytes(4, byteorder="little"),
210 if resp[1] != self.RESP_OK:
211 raise RuntimeError("I2C init error.")
213 def i2c_set_port(self, index):
215 if index not in (0, 1):
216 raise ValueError("I2C index must be 0 or 1.")
217 self._i2c_index = index
219 def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
220 """Write data from the buffer to an address"""
221 if self._i2c_index is None:
222 raise RuntimeError("I2C bus not initialized.")
224 end = end if end else len(buffer)
226 write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
227 stop_flag = 0x01 if stop else 0x00
229 while (end - start) > 0:
230 remain_bytes = end - start
231 chunk = min(remain_bytes, 64 - 7)
232 resp = self._hid_xfer(
233 bytes([write_cmd, address, stop_flag])
234 + remain_bytes.to_bytes(4, byteorder="little")
235 + buffer[start : (start + chunk)],
238 if resp[1] != self.RESP_OK:
239 raise RuntimeError("I2C write error")
242 def _i2c_read(self, address, buffer, start=0, end=None):
243 """Read data from an address and into the buffer"""
244 # TODO: support chunkified reads
245 if self._i2c_index is None:
246 raise RuntimeError("I2C bus not initialized.")
248 end = end if end else len(buffer)
250 read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
251 stop_flag = 0x01 # always stop
252 read_size = end - start
254 resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
255 if resp[1] != self.RESP_OK:
256 raise RuntimeError("I2C write error")
258 for i in range(read_size):
259 buffer[start + i] = resp[i + 2]
261 def i2c_writeto(self, address, buffer, *, start=0, end=None):
262 """Write data from the buffer to an address"""
263 self._i2c_write(address, buffer, start, end)
265 def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
266 """Read data from an address and into the buffer"""
267 self._i2c_read(address, buffer, start, end)
269 def i2c_writeto_then_readfrom(
280 """Write data from buffer_out to an address and then
281 read data from an address and into buffer_in
283 self._i2c_write(address, out_buffer, out_start, out_end, False)
284 self._i2c_read(address, in_buffer, in_start, in_end)
286 def i2c_scan(self, *, start=0, end=0x79):
287 """Perform an I2C Device Scan"""
288 if self._i2c_index is None:
289 raise RuntimeError("I2C bus not initialized.")
291 for addr in range(start, end + 1):
294 self.i2c_writeto(addr, b"\x00\x00\x00")
295 except RuntimeError: # no reply!
301 # ----------------------------------------------------------------
303 # ----------------------------------------------------------------
304 def spi_configure(self, baudrate):
306 if self._spi_index is None:
307 raise RuntimeError("SPI bus not initialized.")
309 resp = self._hid_xfer(
312 self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
313 0x00, # mode, not yet implemented
316 + baudrate.to_bytes(4, byteorder="little"),
319 if resp[1] != self.RESP_OK:
320 raise RuntimeError("SPI init error.")
322 def spi_set_port(self, index):
324 if index not in (0, 1):
325 raise ValueError("SPI index must be 0 or 1.")
326 self._spi_index = index
328 def spi_write(self, buffer, *, start=0, end=None):
330 if self._spi_index is None:
331 raise RuntimeError("SPI bus not initialized.")
333 end = end if end else len(buffer)
335 write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
337 while (end - start) > 0:
338 remain_bytes = end - start
339 chunk = min(remain_bytes, 64 - 3)
340 resp = self._hid_xfer(
341 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
343 if resp[1] != self.RESP_OK:
344 raise RuntimeError("SPI write error")
347 def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
349 if self._spi_index is None:
350 raise RuntimeError("SPI bus not initialized.")
352 end = end if end else len(buffer)
353 read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
354 read_size = end - start
356 resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
357 if resp[1] != self.RESP_OK:
358 raise RuntimeError("SPI write error")
360 for i in range(read_size):
361 buffer[start + i] = resp[i + 2]
363 def spi_write_readinto(
373 """SPI write and readinto."""
374 raise NotImplementedError("SPI write_readinto Not implemented")
376 # ----------------------------------------------------------------
378 # ----------------------------------------------------------------
379 def neopixel_write(self, gpio, buf):
380 """NeoPixel write."""
381 # open serial (data is sent over this)
382 if self._serial is None:
384 import serial.tools.list_ports
386 ports = serial.tools.list_ports.comports()
388 if port.vid == self._vid and port.pid == self._pid:
389 self._serial = serial.Serial(port.device)
391 if self._serial is None:
392 raise RuntimeError("Could not find Pico com port.")
395 if not self._neopixel_initialized:
396 # deinit any current setup
397 # pylint: disable=protected-access
398 self._hid_xfer(bytes([self.WS2812B_DEINIT]))
399 resp = self._hid_xfer(
408 if resp[1] != self.RESP_OK:
409 raise RuntimeError("Neopixel init error")
410 self._neopixel_initialized = True
412 self._serial.reset_output_buffer()
415 # command is done over HID
416 remain_bytes = len(buf)
417 resp = self._hid_xfer(
418 bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
421 if resp[1] != self.RESP_OK:
422 # pylint: disable=no-else-raise
425 "Neopixel write error : too many pixel for the firmware."
427 elif resp[2] == 0x02:
429 "Neopixel write error : transfer already in progress."
432 raise RuntimeError("Neopixel write error.")
433 # buffer is sent over serial
434 self._serial.write(buf)
436 if len(buf) % 64 == 0:
437 self._serial.write([0])
439 # polling loop to wait for write complete?
441 resp = self._hid.read(64)
442 while resp[0] != self.WS2812B_WRITE:
443 resp = self._hid.read(64)
444 if resp[1] != self.RESP_OK:
445 raise RuntimeError("Neopixel write (flush) error.")
447 # ----------------------------------------------------------------
449 # ----------------------------------------------------------------
450 # pylint: disable=unused-argument
451 def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
454 resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
455 if resp[1] != self.RESP_OK:
456 raise RuntimeError("PWM init error.")
458 self.pwm_set_frequency(pin, frequency)
459 self.pwm_set_duty_cycle(pin, duty_cycle)
461 def pwm_deinit(self, pin):
463 self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
465 def pwm_get_frequency(self, pin):
467 resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
468 if resp[1] != self.RESP_OK:
469 raise RuntimeError("PWM get frequency error.")
470 return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
472 def pwm_set_frequency(self, pin, frequency):
474 resp = self._hid_xfer(
475 bytes([self.PWM_SET_FREQ, pin.id])
476 + frequency.to_bytes(4, byteorder="little"),
479 if resp[1] != self.RESP_OK:
480 # pylint: disable=no-else-raise
482 raise RuntimeError("PWM different frequency on same slice.")
483 elif resp[3] == 0x02:
484 raise RuntimeError("PWM frequency too low.")
485 elif resp[3] == 0x03:
486 raise RuntimeError("PWM frequency too high.")
488 raise RuntimeError("PWM frequency error.")
490 def pwm_get_duty_cycle(self, pin):
491 """PWM get duty cycle."""
492 resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
493 if resp[1] != self.RESP_OK:
494 raise RuntimeError("PWM get duty cycle error.")
495 return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
497 def pwm_set_duty_cycle(self, pin, duty_cycle):
498 """PWM set duty cycle."""
499 resp = self._hid_xfer(
500 bytes([self.PWM_SET_DUTY_U16, pin.id])
501 + duty_cycle.to_bytes(2, byteorder="little"),
504 if resp[1] != self.RESP_OK:
505 raise RuntimeError("PWM set duty cycle error.")
508 rp2040_u2if = RP2040_u2if()