1 # SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
 
   3 # SPDX-License-Identifier: MIT
 
   4 """Helper class for use with RP2040 running u2if firmware"""
 
   5 # https://github.com/execuc/u2if
 
  11 # Use to set delay between reset and device reopen. if negative, don't reset at all
 
  12 RP2040_U2IF_RESET_DELAY = float(os.environ.get("RP2040_U2IF_RESET_DELAY", 1))
 
  14 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
 
  15 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
 
  19     """Helper class for use with RP2040 running u2if firmware"""
 
  39     I2C0_WRITE_FROM_UART = 0x84
 
  40     I2C1_INIT = I2C0_INIT + 0x10
 
  41     I2C1_DEINIT = I2C0_DEINIT + 0x10
 
  42     I2C1_WRITE = I2C0_WRITE + 0x10
 
  43     I2C1_READ = I2C0_READ + 0x10
 
  44     I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
 
  51     SPI0_WRITE_FROM_UART = 0x64
 
  52     SPI1_INIT = SPI0_INIT + 0x10
 
  53     SPI1_DEINIT = SPI0_DEINIT + 0x10
 
  54     SPI1_WRITE = SPI0_WRITE + 0x10
 
  55     SPI1_READ = SPI0_READ + 0x10
 
  56     SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
 
  68     PWM_SET_DUTY_U16 = 0x34
 
  69     PWM_GET_DUTY_U16 = 0x35
 
  70     PWM_SET_DUTY_NS = 0x36
 
  71     PWM_GET_DUTY_NS = 0x37
 
  78         self._i2c_index = None
 
  79         self._spi_index = None
 
  81         self._neopixel_initialized = False
 
  82         self._uart_rx_buffer = None
 
  84     def _hid_xfer(self, report, response=True):
 
  85         """Perform HID Transfer"""
 
  86         # first byte is report ID, which =0
 
  87         # remaing bytes = 64 byte report data
 
  88         # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
 
  89         self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
 
  91             # return is 64 byte response report
 
  92             return self._hid.read(64)
 
  96         self._hid_xfer(bytes([self.SYS_RESET]), False)
 
  97         time.sleep(RP2040_U2IF_RESET_DELAY)
 
  98         start = time.monotonic()
 
  99         while time.monotonic() - start < 5:
 
 101                 self._hid.open(self._vid, self._pid)
 
 106         raise OSError("RP2040 u2if open error.")
 
 108     # ----------------------------------------------------------------
 
 110     # ----------------------------------------------------------------
 
 111     def open(self, vid, pid):
 
 112         """Open HID interface for given USB VID and PID."""
 
 118         self._hid = hid.device()
 
 119         self._hid.open(self._vid, self._pid)
 
 120         if RP2040_U2IF_RESET_DELAY >= 0:
 
 124     # ----------------------------------------------------------------
 
 126     # ----------------------------------------------------------------
 
 127     def gpio_init_pin(self, pin_id, direction, pull):
 
 128         """Configure GPIO Pin."""
 
 140     def gpio_set_pin(self, pin_id, value):
 
 141         """Set Current GPIO Pin Value"""
 
 152     def gpio_get_pin(self, pin_id):
 
 153         """Get Current GPIO Pin Value"""
 
 154         resp = self._hid_xfer(
 
 163         return resp[3] != 0x00
 
 165     # ----------------------------------------------------------------
 
 167     # ----------------------------------------------------------------
 
 168     def adc_init_pin(self, pin_id):
 
 169         """Configure ADC Pin."""
 
 179     def adc_get_value(self, pin_id):
 
 180         """Get ADC value for pin."""
 
 181         resp = self._hid_xfer(
 
 190         return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
 
 192     # ----------------------------------------------------------------
 
 194     # ----------------------------------------------------------------
 
 195     def i2c_configure(self, baudrate, pullup=False):
 
 197         if self._i2c_index is None:
 
 198             raise RuntimeError("I2C bus not initialized.")
 
 200         resp = self._hid_xfer(
 
 203                     self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
 
 204                     0x00 if not pullup else 0x01,
 
 207             + baudrate.to_bytes(4, byteorder="little"),
 
 210         if resp[1] != self.RESP_OK:
 
 211             raise RuntimeError("I2C init error.")
 
 213     def i2c_set_port(self, index):
 
 215         if index not in (0, 1):
 
 216             raise ValueError("I2C index must be 0 or 1.")
 
 217         self._i2c_index = index
 
 219     def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
 
 220         """Write data from the buffer to an address"""
 
 221         if self._i2c_index is None:
 
 222             raise RuntimeError("I2C bus not initialized.")
 
 224         end = end if end else len(buffer)
 
 226         write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
 
 227         stop_flag = 0x01 if stop else 0x00
 
 229         while (end - start) > 0:
 
 230             remain_bytes = end - start
 
 231             chunk = min(remain_bytes, 64 - 7)
 
 232             resp = self._hid_xfer(
 
 233                 bytes([write_cmd, address, stop_flag])
 
 234                 + remain_bytes.to_bytes(4, byteorder="little")
 
 235                 + buffer[start : (start + chunk)],
 
 238             if resp[1] != self.RESP_OK:
 
 239                 raise RuntimeError("I2C write error")
 
 242     def _i2c_read(self, address, buffer, start=0, end=None):
 
 243         """Read data from an address and into the buffer"""
 
 244         # TODO: support chunkified reads
 
 245         if self._i2c_index is None:
 
 246             raise RuntimeError("I2C bus not initialized.")
 
 248         end = end if end else len(buffer)
 
 250         read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
 
 251         stop_flag = 0x01  # always stop
 
 252         read_size = end - start
 
 254         resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
 
 255         if resp[1] != self.RESP_OK:
 
 256             raise RuntimeError("I2C write error")
 
 258         for i in range(read_size):
 
 259             buffer[start + i] = resp[i + 2]
 
 261     def i2c_writeto(self, address, buffer, *, start=0, end=None):
 
 262         """Write data from the buffer to an address"""
 
 263         self._i2c_write(address, buffer, start, end)
 
 265     def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
 
 266         """Read data from an address and into the buffer"""
 
 267         self._i2c_read(address, buffer, start, end)
 
 269     def i2c_writeto_then_readfrom(
 
 280         """Write data from buffer_out to an address and then
 
 281         read data from an address and into buffer_in
 
 283         self._i2c_write(address, out_buffer, out_start, out_end, False)
 
 284         self._i2c_read(address, in_buffer, in_start, in_end)
 
 286     def i2c_scan(self, *, start=0, end=0x79):
 
 287         """Perform an I2C Device Scan"""
 
 288         if self._i2c_index is None:
 
 289             raise RuntimeError("I2C bus not initialized.")
 
 291         for addr in range(start, end + 1):
 
 294                 self.i2c_writeto(addr, b"\x00\x00\x00")
 
 295             except RuntimeError:  # no reply!
 
 301     # ----------------------------------------------------------------
 
 303     # ----------------------------------------------------------------
 
 304     def spi_configure(self, baudrate):
 
 306         if self._spi_index is None:
 
 307             raise RuntimeError("SPI bus not initialized.")
 
 309         resp = self._hid_xfer(
 
 312                     self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
 
 313                     0x00,  # mode, not yet implemented
 
 316             + baudrate.to_bytes(4, byteorder="little"),
 
 319         if resp[1] != self.RESP_OK:
 
 320             raise RuntimeError("SPI init error.")
 
 322     def spi_set_port(self, index):
 
 324         if index not in (0, 1):
 
 325             raise ValueError("SPI index must be 0 or 1.")
 
 326         self._spi_index = index
 
 328     def spi_write(self, buffer, *, start=0, end=None):
 
 330         if self._spi_index is None:
 
 331             raise RuntimeError("SPI bus not initialized.")
 
 333         end = end if end else len(buffer)
 
 335         write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
 
 337         while (end - start) > 0:
 
 338             remain_bytes = end - start
 
 339             chunk = min(remain_bytes, 64 - 3)
 
 340             resp = self._hid_xfer(
 
 341                 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
 
 343             if resp[1] != self.RESP_OK:
 
 344                 raise RuntimeError("SPI write error")
 
 347     def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
 
 349         if self._spi_index is None:
 
 350             raise RuntimeError("SPI bus not initialized.")
 
 352         end = end if end else len(buffer)
 
 353         read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
 
 354         read_size = end - start
 
 356         resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
 
 357         if resp[1] != self.RESP_OK:
 
 358             raise RuntimeError("SPI write error")
 
 360         for i in range(read_size):
 
 361             buffer[start + i] = resp[i + 2]
 
 363     def spi_write_readinto(
 
 373         """SPI write and readinto."""
 
 374         raise NotImplementedError("SPI write_readinto Not implemented")
 
 376     # ----------------------------------------------------------------
 
 378     # ----------------------------------------------------------------
 
 379     def neopixel_write(self, gpio, buf):
 
 380         """NeoPixel write."""
 
 381         # open serial (data is sent over this)
 
 382         if self._serial is None:
 
 384             import serial.tools.list_ports
 
 386             ports = serial.tools.list_ports.comports()
 
 388                 if port.vid == self._vid and port.pid == self._pid:
 
 389                     self._serial = serial.Serial(port.device)
 
 391         if self._serial is None:
 
 392             raise RuntimeError("Could not find Pico com port.")
 
 395         if not self._neopixel_initialized:
 
 396             # deinit any current setup
 
 397             # pylint: disable=protected-access
 
 398             self._hid_xfer(bytes([self.WS2812B_DEINIT]))
 
 399             resp = self._hid_xfer(
 
 408             if resp[1] != self.RESP_OK:
 
 409                 raise RuntimeError("Neopixel init error")
 
 410             self._neopixel_initialized = True
 
 412         self._serial.reset_output_buffer()
 
 415         # command is done over HID
 
 416         remain_bytes = len(buf)
 
 417         resp = self._hid_xfer(
 
 418             bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
 
 421         if resp[1] != self.RESP_OK:
 
 422             # pylint: disable=no-else-raise
 
 425                     "Neopixel write error : too many pixel for the firmware."
 
 427             elif resp[2] == 0x02:
 
 429                     "Neopixel write error : transfer already in progress."
 
 432                 raise RuntimeError("Neopixel write error.")
 
 433         # buffer is sent over serial
 
 434         self._serial.write(buf)
 
 436         if len(buf) % 64 == 0:
 
 437             self._serial.write([0])
 
 439         # polling loop to wait for write complete?
 
 441         resp = self._hid.read(64)
 
 442         while resp[0] != self.WS2812B_WRITE:
 
 443             resp = self._hid.read(64)
 
 444         if resp[1] != self.RESP_OK:
 
 445             raise RuntimeError("Neopixel write (flush) error.")
 
 447     # ----------------------------------------------------------------
 
 449     # ----------------------------------------------------------------
 
 450     # pylint: disable=unused-argument
 
 451     def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
 
 454         resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
 
 455         if resp[1] != self.RESP_OK:
 
 456             raise RuntimeError("PWM init error.")
 
 458         self.pwm_set_frequency(pin, frequency)
 
 459         self.pwm_set_duty_cycle(pin, duty_cycle)
 
 461     def pwm_deinit(self, pin):
 
 463         self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
 
 465     def pwm_get_frequency(self, pin):
 
 467         resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
 
 468         if resp[1] != self.RESP_OK:
 
 469             raise RuntimeError("PWM get frequency error.")
 
 470         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
 
 472     def pwm_set_frequency(self, pin, frequency):
 
 474         resp = self._hid_xfer(
 
 475             bytes([self.PWM_SET_FREQ, pin.id])
 
 476             + frequency.to_bytes(4, byteorder="little"),
 
 479         if resp[1] != self.RESP_OK:
 
 480             # pylint: disable=no-else-raise
 
 482                 raise RuntimeError("PWM different frequency on same slice.")
 
 483             elif resp[3] == 0x02:
 
 484                 raise RuntimeError("PWM frequency too low.")
 
 485             elif resp[3] == 0x03:
 
 486                 raise RuntimeError("PWM frequency too high.")
 
 488                 raise RuntimeError("PWM frequency error.")
 
 490     def pwm_get_duty_cycle(self, pin):
 
 491         """PWM get duty cycle."""
 
 492         resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
 
 493         if resp[1] != self.RESP_OK:
 
 494             raise RuntimeError("PWM get duty cycle error.")
 
 495         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
 
 497     def pwm_set_duty_cycle(self, pin, duty_cycle):
 
 498         """PWM set duty cycle."""
 
 499         resp = self._hid_xfer(
 
 500             bytes([self.PWM_SET_DUTY_U16, pin.id])
 
 501             + duty_cycle.to_bytes(2, byteorder="little"),
 
 504         if resp[1] != self.RESP_OK:
 
 505             raise RuntimeError("PWM set duty cycle error.")
 
 508 rp2040_u2if = RP2040_u2if()