1 """Helper class for use with RP2040 running u2if firmware"""
2 # https://github.com/execuc/u2if
8 # Use to set delay between reset and device reopen. if negative, don't reset at all
9 RP2040_U2IF_RESET_DELAY = float(os.environ.get("RP2040_U2IF_RESET_DELAY", 1))
11 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
12 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
16 """Helper class for use with RP2040 running u2if firmware"""
36 I2C0_WRITE_FROM_UART = 0x84
37 I2C1_INIT = I2C0_INIT + 0x10
38 I2C1_DEINIT = I2C0_DEINIT + 0x10
39 I2C1_WRITE = I2C0_WRITE + 0x10
40 I2C1_READ = I2C0_READ + 0x10
41 I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
48 SPI0_WRITE_FROM_UART = 0x64
49 SPI1_INIT = SPI0_INIT + 0x10
50 SPI1_DEINIT = SPI0_DEINIT + 0x10
51 SPI1_WRITE = SPI0_WRITE + 0x10
52 SPI1_READ = SPI0_READ + 0x10
53 SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
65 PWM_SET_DUTY_U16 = 0x34
66 PWM_GET_DUTY_U16 = 0x35
67 PWM_SET_DUTY_NS = 0x36
68 PWM_GET_DUTY_NS = 0x37
75 self._i2c_index = None
76 self._spi_index = None
78 self._neopixel_initialized = False
79 self._uart_rx_buffer = None
81 def _hid_xfer(self, report, response=True):
82 """Perform HID Transfer"""
83 # first byte is report ID, which =0
84 # remaing bytes = 64 byte report data
85 # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
86 self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
88 # return is 64 byte response report
89 return self._hid.read(64)
93 self._hid_xfer(bytes([self.SYS_RESET]), False)
94 time.sleep(RP2040_U2IF_RESET_DELAY)
95 start = time.monotonic()
96 while time.monotonic() - start < 5:
98 self._hid.open(self._vid, self._pid)
103 raise OSError("RP2040 u2if open error.")
105 # ----------------------------------------------------------------
107 # ----------------------------------------------------------------
108 def open(self, vid, pid):
109 """Open HID interface for given USB VID and PID."""
115 self._hid = hid.device()
116 self._hid.open(self._vid, self._pid)
117 if RP2040_U2IF_RESET_DELAY >= 0:
121 # ----------------------------------------------------------------
123 # ----------------------------------------------------------------
124 def gpio_init_pin(self, pin_id, direction, pull):
125 """Configure GPIO Pin."""
137 def gpio_set_pin(self, pin_id, value):
138 """Set Current GPIO Pin Value"""
149 def gpio_get_pin(self, pin_id):
150 """Get Current GPIO Pin Value"""
151 resp = self._hid_xfer(
160 return resp[3] != 0x00
162 # ----------------------------------------------------------------
164 # ----------------------------------------------------------------
165 def adc_init_pin(self, pin_id):
166 """Configure ADC Pin."""
176 def adc_get_value(self, pin_id):
177 """Get ADC value for pin."""
178 resp = self._hid_xfer(
187 return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
189 # ----------------------------------------------------------------
191 # ----------------------------------------------------------------
192 def i2c_configure(self, baudrate, pullup=False):
194 if self._i2c_index is None:
195 raise RuntimeError("I2C bus not initialized.")
197 resp = self._hid_xfer(
200 self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
201 0x00 if not pullup else 0x01,
204 + baudrate.to_bytes(4, byteorder="little"),
207 if resp[1] != self.RESP_OK:
208 raise RuntimeError("I2C init error.")
210 def i2c_set_port(self, index):
212 if index not in (0, 1):
213 raise ValueError("I2C index must be 0 or 1.")
214 self._i2c_index = index
216 def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
217 """Write data from the buffer to an address"""
218 if self._i2c_index is None:
219 raise RuntimeError("I2C bus not initialized.")
221 end = end if end else len(buffer)
223 write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
224 stop_flag = 0x01 if stop else 0x00
226 while (end - start) > 0:
227 remain_bytes = end - start
228 chunk = min(remain_bytes, 64 - 7)
229 resp = self._hid_xfer(
230 bytes([write_cmd, address, stop_flag])
231 + remain_bytes.to_bytes(4, byteorder="little")
232 + buffer[start : (start + chunk)],
235 if resp[1] != self.RESP_OK:
236 raise RuntimeError("I2C write error")
239 def _i2c_read(self, address, buffer, start=0, end=None):
240 """Read data from an address and into the buffer"""
241 # TODO: support chunkified reads
242 if self._i2c_index is None:
243 raise RuntimeError("I2C bus not initialized.")
245 end = end if end else len(buffer)
247 read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
248 stop_flag = 0x01 # always stop
249 read_size = end - start
251 resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
252 if resp[1] != self.RESP_OK:
253 raise RuntimeError("I2C write error")
255 for i in range(read_size):
256 buffer[start + i] = resp[i + 2]
258 def i2c_writeto(self, address, buffer, *, start=0, end=None):
259 """Write data from the buffer to an address"""
260 self._i2c_write(address, buffer, start, end)
262 def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
263 """Read data from an address and into the buffer"""
264 self._i2c_read(address, buffer, start, end)
266 def i2c_writeto_then_readfrom(
277 """Write data from buffer_out to an address and then
278 read data from an address and into buffer_in
280 self._i2c_write(address, out_buffer, out_start, out_end, False)
281 self._i2c_read(address, in_buffer, in_start, in_end)
283 def i2c_scan(self, *, start=0, end=0x79):
284 """Perform an I2C Device Scan"""
285 if self._i2c_index is None:
286 raise RuntimeError("I2C bus not initialized.")
288 for addr in range(start, end + 1):
291 self.i2c_writeto(addr, b"\x00\x00\x00")
292 except RuntimeError: # no reply!
298 # ----------------------------------------------------------------
300 # ----------------------------------------------------------------
301 def spi_configure(self, baudrate):
303 if self._spi_index is None:
304 raise RuntimeError("SPI bus not initialized.")
306 resp = self._hid_xfer(
309 self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
310 0x00, # mode, not yet implemented
313 + baudrate.to_bytes(4, byteorder="little"),
316 if resp[1] != self.RESP_OK:
317 raise RuntimeError("SPI init error.")
319 def spi_set_port(self, index):
321 if index not in (0, 1):
322 raise ValueError("SPI index must be 0 or 1.")
323 self._spi_index = index
325 def spi_write(self, buffer, *, start=0, end=None):
327 if self._spi_index is None:
328 raise RuntimeError("SPI bus not initialized.")
330 end = end if end else len(buffer)
332 write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
334 while (end - start) > 0:
335 remain_bytes = end - start
336 chunk = min(remain_bytes, 64 - 3)
337 resp = self._hid_xfer(
338 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
340 if resp[1] != self.RESP_OK:
341 raise RuntimeError("SPI write error")
344 def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
346 if self._spi_index is None:
347 raise RuntimeError("SPI bus not initialized.")
349 end = end if end else len(buffer)
350 read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
351 read_size = end - start
353 resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
354 if resp[1] != self.RESP_OK:
355 raise RuntimeError("SPI write error")
357 for i in range(read_size):
358 buffer[start + i] = resp[i + 2]
360 def spi_write_readinto(
370 """SPI write and readinto."""
371 raise NotImplementedError("SPI write_readinto Not implemented")
373 # ----------------------------------------------------------------
375 # ----------------------------------------------------------------
376 def neopixel_write(self, gpio, buf):
377 """NeoPixel write."""
378 # open serial (data is sent over this)
379 if self._serial is None:
381 import serial.tools.list_ports
383 ports = serial.tools.list_ports.comports()
385 if port.vid == self._vid and port.pid == self._pid:
386 self._serial = serial.Serial(port.device)
388 if self._serial is None:
389 raise RuntimeError("Could not find Pico com port.")
392 if not self._neopixel_initialized:
393 # deinit any current setup
394 # pylint: disable=protected-access
395 self._hid_xfer(bytes([self.WS2812B_DEINIT]))
396 resp = self._hid_xfer(
405 if resp[1] != self.RESP_OK:
406 raise RuntimeError("Neopixel init error")
407 self._neopixel_initialized = True
409 self._serial.reset_output_buffer()
412 # command is done over HID
413 remain_bytes = len(buf)
414 resp = self._hid_xfer(
415 bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
418 if resp[1] != self.RESP_OK:
419 # pylint: disable=no-else-raise
422 "Neopixel write error : too many pixel for the firmware."
424 elif resp[2] == 0x02:
426 "Neopixel write error : transfer already in progress."
429 raise RuntimeError("Neopixel write error.")
430 # buffer is sent over serial
431 self._serial.write(buf)
433 if len(buf) % 64 == 0:
434 self._serial.write([0])
436 # polling loop to wait for write complete?
438 resp = self._hid.read(64)
439 while resp[0] != self.WS2812B_WRITE:
440 resp = self._hid.read(64)
441 if resp[1] != self.RESP_OK:
442 raise RuntimeError("Neopixel write (flush) error.")
444 # ----------------------------------------------------------------
446 # ----------------------------------------------------------------
447 # pylint: disable=unused-argument
448 def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
451 resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
452 if resp[1] != self.RESP_OK:
453 raise RuntimeError("PWM init error.")
455 self.pwm_set_frequency(pin, frequency)
456 self.pwm_set_duty_cycle(pin, duty_cycle)
458 def pwm_deinit(self, pin):
460 self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
462 def pwm_get_frequency(self, pin):
464 resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
465 if resp[1] != self.RESP_OK:
466 raise RuntimeError("PWM get frequency error.")
467 return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
469 def pwm_set_frequency(self, pin, frequency):
471 resp = self._hid_xfer(
472 bytes([self.PWM_SET_FREQ, pin.id])
473 + frequency.to_bytes(4, byteorder="little"),
476 if resp[1] != self.RESP_OK:
477 # pylint: disable=no-else-raise
479 raise RuntimeError("PWM different frequency on same slice.")
480 elif resp[3] == 0x02:
481 raise RuntimeError("PWM frequency too low.")
482 elif resp[3] == 0x03:
483 raise RuntimeError("PWM frequency too high.")
485 raise RuntimeError("PWM frequency error.")
487 def pwm_get_duty_cycle(self, pin):
488 """PWM get duty cycle."""
489 resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
490 if resp[1] != self.RESP_OK:
491 raise RuntimeError("PWM get duty cycle error.")
492 return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
494 def pwm_set_duty_cycle(self, pin, duty_cycle):
495 """PWM set duty cycle."""
496 resp = self._hid_xfer(
497 bytes([self.PWM_SET_DUTY_U16, pin.id])
498 + duty_cycle.to_bytes(2, byteorder="little"),
501 if resp[1] != self.RESP_OK:
502 raise RuntimeError("PWM set duty cycle error.")
505 rp2040_u2if = RP2040_u2if()