1 """Helper class for use with RP2040 running u2if firmware"""
2 # https://github.com/execuc/u2if
8 # Use to set delay between reset and device reopen. if negative, don't reset at all
9 RP2040_U2IF_RESET_DELAY = float(os.environ.get("RP2040_U2IF_RESET_DELAY", 1))
11 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
12 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
16 """Helper class for use with RP2040 running u2if firmware"""
36 I2C0_WRITE_FROM_UART = 0x84
37 I2C1_INIT = I2C0_INIT + 0x10
38 I2C1_DEINIT = I2C0_DEINIT + 0x10
39 I2C1_WRITE = I2C0_WRITE + 0x10
40 I2C1_READ = I2C0_READ + 0x10
41 I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
48 SPI0_WRITE_FROM_UART = 0x64
49 SPI1_INIT = SPI0_INIT + 0x10
50 SPI1_DEINIT = SPI0_DEINIT + 0x10
51 SPI1_WRITE = SPI0_WRITE + 0x10
52 SPI1_READ = SPI0_READ + 0x10
53 SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
65 PWM_SET_DUTY_U16 = 0x34
66 PWM_GET_DUTY_U16 = 0x35
67 PWM_SET_DUTY_NS = 0x36
68 PWM_GET_DUTY_NS = 0x37
72 self._i2c_index = None
73 self._spi_index = None
75 self._neopixel_initialized = False
78 # self._hid = hid.device()
79 # self._hid.open(self._vid, self._pid)
80 # if RP2040_U2IF_RESET_DELAY >= 0:
82 # self._i2c_index = None
83 # self._spi_index = None
85 # self._neopixel_initialized = False
86 # self._uart_rx_buffer = None
88 def _hid_xfer(self, report, response=True):
89 """Perform HID Transfer"""
90 # first byte is report ID, which =0
91 # remaing bytes = 64 byte report data
92 # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
93 self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
95 # return is 64 byte response report
96 return self._hid.read(64)
100 self._hid_xfer(bytes([self.SYS_RESET]), False)
101 time.sleep(RP2040_U2IF_RESET_DELAY)
102 start = time.monotonic()
103 while time.monotonic() - start < 5:
105 self._hid.open(self._vid, self._pid)
110 raise OSError("RP2040 u2if open error.")
112 # ----------------------------------------------------------------
114 # ----------------------------------------------------------------
115 def open(self, vid, pid):
120 self._hid = hid.device()
121 self._hid.open(self._vid, self._pid)
122 if RP2040_U2IF_RESET_DELAY >= 0:
126 # ----------------------------------------------------------------
128 # ----------------------------------------------------------------
129 def gpio_init_pin(self, pin_id, direction, pull):
130 """Configure GPIO Pin."""
142 def gpio_set_pin(self, pin_id, value):
143 """Set Current GPIO Pin Value"""
154 def gpio_get_pin(self, pin_id):
155 """Get Current GPIO Pin Value"""
156 resp = self._hid_xfer(
165 return resp[3] != 0x00
167 # ----------------------------------------------------------------
169 # ----------------------------------------------------------------
170 def adc_init_pin(self, pin_id):
171 """Configure ADC Pin."""
181 def adc_get_value(self, pin_id):
182 """Get ADC value for pin."""
183 resp = self._hid_xfer(
192 return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
194 # ----------------------------------------------------------------
196 # ----------------------------------------------------------------
197 def i2c_configure(self, baudrate, pullup=False):
199 if self._i2c_index is None:
200 raise RuntimeError("I2C bus not initialized.")
202 resp = self._hid_xfer(
205 self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
206 0x00 if not pullup else 0x01,
209 + baudrate.to_bytes(4, byteorder="little"),
212 if resp[1] != self.RESP_OK:
213 raise RuntimeError("I2C init error.")
215 def i2c_set_port(self, index):
217 if index not in (0, 1):
218 raise ValueError("I2C index must be 0 or 1.")
219 self._i2c_index = index
221 def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
222 """Write data from the buffer to an address"""
223 if self._i2c_index is None:
224 raise RuntimeError("I2C bus not initialized.")
226 end = end if end else len(buffer)
228 write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
229 stop_flag = 0x01 if stop else 0x00
231 while (end - start) > 0:
232 remain_bytes = end - start
233 chunk = min(remain_bytes, 64 - 7)
234 resp = self._hid_xfer(
235 bytes([write_cmd, address, stop_flag])
236 + remain_bytes.to_bytes(4, byteorder="little")
237 + buffer[start : (start + chunk)],
240 if resp[1] != self.RESP_OK:
241 raise RuntimeError("I2C write error")
244 def _i2c_read(self, address, buffer, start=0, end=None):
245 """Read data from an address and into the buffer"""
246 # TODO: support chunkified reads
247 if self._i2c_index is None:
248 raise RuntimeError("I2C bus not initialized.")
250 end = end if end else len(buffer)
252 read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
253 stop_flag = 0x01 # always stop
254 read_size = end - start
256 resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
257 if resp[1] != self.RESP_OK:
258 raise RuntimeError("I2C write error")
260 for i in range(read_size):
261 buffer[start + i] = resp[i + 2]
263 def i2c_writeto(self, address, buffer, *, start=0, end=None):
264 """Write data from the buffer to an address"""
265 self._i2c_write(address, buffer, start, end)
267 def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
268 """Read data from an address and into the buffer"""
269 self._i2c_read(address, buffer, start, end)
271 def i2c_writeto_then_readfrom(
282 """Write data from buffer_out to an address and then
283 read data from an address and into buffer_in
285 self._i2c_write(address, out_buffer, out_start, out_end, False)
286 self._i2c_read(address, in_buffer, in_start, in_end)
288 def i2c_scan(self, *, start=0, end=0x79):
289 """Perform an I2C Device Scan"""
290 if self._i2c_index is None:
291 raise RuntimeError("I2C bus not initialized.")
293 for addr in range(start, end + 1):
296 self.i2c_writeto(addr, b"\x00\x00\x00")
297 except RuntimeError: # no reply!
303 # ----------------------------------------------------------------
305 # ----------------------------------------------------------------
306 def spi_configure(self, baudrate):
308 if self._spi_index is None:
309 raise RuntimeError("SPI bus not initialized.")
311 resp = self._hid_xfer(
314 self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
315 0x00, # mode, not yet implemented
318 + baudrate.to_bytes(4, byteorder="little"),
321 if resp[1] != self.RESP_OK:
322 raise RuntimeError("SPI init error.")
324 def spi_set_port(self, index):
326 if index not in (0, 1):
327 raise ValueError("SPI index must be 0 or 1.")
328 self._spi_index = index
330 def spi_write(self, buffer, *, start=0, end=None):
332 if self._spi_index is None:
333 raise RuntimeError("SPI bus not initialized.")
335 end = end if end else len(buffer)
337 write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
339 while (end - start) > 0:
340 remain_bytes = end - start
341 chunk = min(remain_bytes, 64 - 3)
342 resp = self._hid_xfer(
343 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
345 if resp[1] != self.RESP_OK:
346 raise RuntimeError("SPI write error")
349 def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
351 if self._spi_index is None:
352 raise RuntimeError("SPI bus not initialized.")
354 end = end if end else len(buffer)
355 read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
356 read_size = end - start
358 resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
359 if resp[1] != self.RESP_OK:
360 raise RuntimeError("SPI write error")
362 for i in range(read_size):
363 buffer[start + i] = resp[i + 2]
365 def spi_write_readinto(
375 """SPI write and readinto."""
376 raise NotImplementedError("SPI write_readinto Not implemented")
378 # ----------------------------------------------------------------
380 # ----------------------------------------------------------------
381 def neopixel_write(self, gpio, buf):
382 """NeoPixel write."""
384 # open serial (data is sent over this)
385 if self._serial is None:
387 import serial.tools.list_ports
389 ports = serial.tools.list_ports.comports()
391 if port.vid == self._vid and port.pid == self._pid:
392 self._serial = serial.Serial(port.device)
394 if self._serial is None:
395 raise RuntimeError("Could not find Pico com port.")
399 if not self._neopixel_initialized:
400 # deinit any current setup
401 # pylint: disable=protected-access
402 self._hid_xfer(bytes([self.WS2812B_DEINIT]))
403 resp = self._hid_xfer(
412 if resp[1] != self.RESP_OK:
413 raise RuntimeError("Neopixel init error")
414 self._neopixel_initialized = True
416 self._serial.reset_output_buffer()
420 # command is done over HID
421 remain_bytes = len(buf)
422 resp = self._hid_xfer(
423 bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
426 if resp[1] != self.RESP_OK:
427 # pylint: disable=no-else-raise
430 "Neopixel write error : too many pixel for the firmware."
432 elif resp[2] == 0x02:
435 "Neopixel write error : transfer already in progress."
438 raise RuntimeError("Neopixel write error.")
440 # buffer is sent over serial
441 self._serial.write(buf)
444 if len(buf) % 64 == 0:
445 self._serial.write([0])
447 # polling loop to wait for write complete?
450 resp = self._hid.read(64)
452 while resp[0] != self.WS2812B_WRITE:
453 resp = self._hid.read(64)
455 if resp[1] != self.RESP_OK:
456 raise RuntimeError("Neopixel write (flush) error.")
458 # ----------------------------------------------------------------
460 # ----------------------------------------------------------------
461 # pylint: disable=unused-argument
462 def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
465 resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
466 if resp[1] != self.RESP_OK:
467 raise RuntimeError("PWM init error.")
469 self.pwm_set_frequency(pin, frequency)
470 self.pwm_set_duty_cycle(pin, duty_cycle)
472 def pwm_deinit(self, pin):
474 self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
476 def pwm_get_frequency(self, pin):
478 resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
479 if resp[1] != self.RESP_OK:
480 raise RuntimeError("PWM get frequency error.")
481 return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
483 def pwm_set_frequency(self, pin, frequency):
485 resp = self._hid_xfer(
486 bytes([self.PWM_SET_FREQ, pin.id])
487 + frequency.to_bytes(4, byteorder="little"),
490 if resp[1] != self.RESP_OK:
491 # pylint: disable=no-else-raise
493 raise RuntimeError("PWM different frequency on same slice.")
494 elif resp[3] == 0x02:
495 raise RuntimeError("PWM frequency too low.")
496 elif resp[3] == 0x03:
497 raise RuntimeError("PWM frequency too high.")
499 raise RuntimeError("PWM frequency error.")
501 def pwm_get_duty_cycle(self, pin):
502 """PWM get duty cycle."""
503 resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
504 if resp[1] != self.RESP_OK:
505 raise RuntimeError("PWM get duty cycle error.")
506 return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
508 def pwm_set_duty_cycle(self, pin, duty_cycle):
509 """PWM set duty cycle."""
510 resp = self._hid_xfer(
511 bytes([self.PWM_SET_DUTY_U16, pin.id])
512 + duty_cycle.to_bytes(2, byteorder="little"),
515 if resp[1] != self.RESP_OK:
516 raise RuntimeError("PWM set duty cycle error.")
518 rp2040_u2if = RP2040_u2if()