--- /dev/null
+"""Chip Definition for Pico with u2if firmware"""
+# https://github.com/execuc/u2if
+
+import hid
+
+
+class Pico_u2if:
+ """MCP2221 Device Class Definition"""
+
+ VID = 0xCAFE
+ PID = 0x4005
+
+ # MISC
+ RESP_OK = 0x01
+
+ # GPIO
+ GPIO_INIT_PIN = 0x20
+ GPIO_SET_VALUE = 0x21
+ GPIO_GET_VALUE = 0x22
+
+ # ADC
+ ADC_INIT_PIN = 0x40
+ ADC_GET_VALUE = 0x41
+
+ # I2C
+ I2C0_INIT = 0x80
+ I2C0_DEINIT = 0x81
+ I2C0_WRITE = 0x82
+ I2C0_READ = 0x83
+ I2C0_WRITE_FROM_UART = 0x84
+ I2C1_INIT = I2C0_INIT + 0x10
+ I2C1_DEINIT = I2C0_DEINIT + 0x10
+ I2C1_WRITE = I2C0_WRITE + 0x10
+ I2C1_READ = I2C0_READ + 0x10
+ I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
+
+ # SPI
+ SPI0_INIT = 0x60
+ SPI0_DEINIT = 0x61
+ SPI0_WRITE = 0x62
+ SPI0_READ = 0x63
+ SPI0_WRITE_FROM_UART = 0x64
+ SPI1_INIT = SPI0_INIT + 0x10
+ SPI1_DEINIT = SPI0_DEINIT + 0x10
+ SPI1_WRITE = SPI0_WRITE + 0x10
+ SPI1_READ = SPI0_READ + 0x10
+ SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
+
+ # WS2812B (LED)
+ WS2812B_INIT = 0xA0
+ WS2812B_DEINIT = 0xA1
+ WS2812B_WRITE = 0xA2
+
+ # PWM
+ PWM_INIT_PIN = 0x30
+ PWM_DEINIT_PIN = 0x31
+ PWM_SET_FREQ = 0x32
+ PWM_GET_FREQ = 0x33
+ PWM_SET_DUTY_U16 = 0x34
+ PWM_GET_DUTY_U16 = 0x35
+ PWM_SET_DUTY_NS = 0x36
+ PWM_GET_DUTY_NS = 0x37
+
+ # UART
+ UART0_INIT = 0x50
+ UART0_DEINIT = 0x51
+ UART0_WRITE = 0x52
+ UART0_READ = 0x53
+
+ def __init__(self):
+ self._hid = hid.device()
+ self._hid.open(Pico_u2if.VID, Pico_u2if.PID)
+ self._i2c_index = None
+ self._spi_index = None
+ self._serial = None
+ self._neopixel_initialized = False
+ self._uart_rx_buffer = None
+
+ def _hid_xfer(self, report, response=True):
+ """Perform HID Transfer"""
+ # first byte is report ID, which =0
+ # remaing bytes = 64 byte report data
+ # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
+ self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
+ if response:
+ # return is 64 byte response report
+ return self._hid.read(64)
+ return None
+
+ # ----------------------------------------------------------------
+ # GPIO
+ # ----------------------------------------------------------------
+ def gpio_init_pin(self, pin_id, direction, pull):
+ """Configure GPIO Pin."""
+ self._hid_xfer(
+ bytes(
+ [
+ self.GPIO_INIT_PIN,
+ pin_id,
+ direction,
+ pull,
+ ]
+ )
+ )
+
+ def gpio_set_pin(self, pin_id, value):
+ """Set Current GPIO Pin Value"""
+ self._hid_xfer(
+ bytes(
+ [
+ self.GPIO_SET_VALUE,
+ pin_id,
+ int(value),
+ ]
+ )
+ )
+
+ def gpio_get_pin(self, pin_id):
+ """Get Current GPIO Pin Value"""
+ resp = self._hid_xfer(
+ bytes(
+ [
+ self.GPIO_GET_VALUE,
+ pin_id,
+ ]
+ ),
+ True,
+ )
+ return resp[3] != 0x00
+
+ # ----------------------------------------------------------------
+ # ADC
+ # ----------------------------------------------------------------
+ def adc_init_pin(self, pin_id):
+ """Configure ADC Pin."""
+ self._hid_xfer(
+ bytes(
+ [
+ self.ADC_INIT_PIN,
+ pin_id,
+ ]
+ )
+ )
+
+ def adc_get_value(self, pin_id):
+ """Get ADC value for pin."""
+ resp = self._hid_xfer(
+ bytes(
+ [
+ self.ADC_GET_VALUE,
+ pin_id,
+ ]
+ ),
+ True,
+ )
+ return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
+
+ # ----------------------------------------------------------------
+ # I2C
+ # ----------------------------------------------------------------
+ def i2c_configure(self, baudrate, pullup=False):
+ if self._i2c_index is None:
+ raise RuntimeError("I2C bus not initialized.")
+
+ resp = self._hid_xfer(
+ bytes(
+ [
+ self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
+ 0x00 if not pullup else 0x01,
+ ]
+ )
+ + baudrate.to_bytes(4, byteorder="little"),
+ True,
+ )
+ if resp[1] != self.RESP_OK:
+ raise RuntimeError("I2C init error.")
+
+ def i2c_set_port(self, index):
+ if index not in (0, 1):
+ raise ValueError("I2C index must be 0 or 1.")
+ self._i2c_index = index
+
+ def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
+ """Write data from the buffer to an address"""
+ if self._i2c_index is None:
+ raise RuntimeError("I2C bus not initialized.")
+
+ end = end if end else len(buffer)
+
+ write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
+ stop_flag = 0x01 if stop else 0x00
+
+ while (end - start) > 0:
+ remain_bytes = end - start
+ chunk = min(remain_bytes, 64 - 7)
+ resp = self._hid_xfer(
+ bytes([write_cmd, address, stop_flag])
+ + remain_bytes.to_bytes(4, byteorder="little")
+ + buffer[start : (start + chunk)],
+ True,
+ )
+ if resp[1] != self.RESP_OK:
+ raise RuntimeError("I2C write error")
+ start += chunk
+
+ def _i2c_read(self, address, buffer, start=0, end=None):
+ """Read data from an address and into the buffer"""
+ # TODO: support chunkified reads
+ if self._i2c_index is None:
+ raise RuntimeError("I2C bus not initialized.")
+
+ end = end if end else len(buffer)
+
+ read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
+ stop_flag = 0x01 # always stop
+ read_size = end - start
+
+ resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
+ if resp[1] != self.RESP_OK:
+ raise RuntimeError("I2C write error")
+ # move into buffer
+ for i in range(read_size):
+ buffer[start + i] = resp[i + 2]
+
+ def i2c_writeto(self, address, buffer, *, start=0, end=None):
+ """Write data from the buffer to an address"""
+ self._i2c_write(address, buffer, start, end)
+
+ def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
+ """Read data from an address and into the buffer"""
+ self._i2c_read(address, buffer, start, end)
+
+ def i2c_writeto_then_readfrom(
+ self,
+ address,
+ out_buffer,
+ in_buffer,
+ *,
+ out_start=0,
+ out_end=None,
+ in_start=0,
+ in_end=None
+ ):
+ """Write data from buffer_out to an address and then
+ read data from an address and into buffer_in
+ """
+ self._i2c_write(address, out_buffer, out_start, out_end, False)
+ self._i2c_read(address, in_buffer, in_start, in_end)
+
+ def i2c_scan(self, *, start=0, end=0x79):
+ """Perform an I2C Device Scan"""
+ if self._i2c_index is None:
+ raise RuntimeError("I2C bus not initialized.")
+ found = []
+ for addr in range(start, end + 1):
+ # try a write
+ try:
+ self.i2c_writeto(addr, b"\x00\x00\x00")
+ except RuntimeError: # no reply!
+ continue
+ # store if success
+ found.append(addr)
+ return found
+
+ # ----------------------------------------------------------------
+ # SPI
+ # ----------------------------------------------------------------
+ def spi_configure(self, baudrate):
+ if self._spi_index is None:
+ raise RuntimeError("SPI bus not initialized.")
+
+ resp = self._hid_xfer(
+ bytes(
+ [
+ self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
+ 0x00, # mode, not yet implemented
+ ]
+ )
+ + baudrate.to_bytes(4, byteorder="little"),
+ True,
+ )
+ if resp[1] != self.RESP_OK:
+ raise RuntimeError("SPI init error.")
+
+ def spi_set_port(self, index):
+ if index not in (0, 1):
+ raise ValueError("SPI index must be 0 or 1.")
+ self._spi_index = index
+
+ def spi_write(self, buffer, *, start=0, end=None):
+ if self._spi_index is None:
+ raise RuntimeError("SPI bus not initialized.")
+
+ end = end if end else len(buffer)
+
+ write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
+
+ while (end - start) > 0:
+ remain_bytes = end - start
+ chunk = min(remain_bytes, 64 - 3)
+ resp = self._hid_xfer(
+ bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
+ )
+ if resp[1] != self.RESP_OK:
+ raise RuntimeError("SPI write error")
+ start += chunk
+
+ def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
+ if self._spi_index is None:
+ raise RuntimeError("SPI bus not initialized.")
+
+ end = end if end else len(buffer)
+ read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
+ read_size = end - start
+
+ resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
+ if resp[1] != self.RESP_OK:
+ raise RuntimeError("SPI write error")
+ # move into buffer
+ for i in range(read_size):
+ buffer[start + i] = resp[i + 2]
+
+ def spi_write_readinto(
+ self,
+ buffer_out,
+ buffer_in,
+ *,
+ out_start=0,
+ out_end=None,
+ in_start=0,
+ in_end=None
+ ):
+ raise NotImplementedError("SPI write_readinto Not implemented")
+
+ # ----------------------------------------------------------------
+ # NEOPIXEL
+ # ----------------------------------------------------------------
+ def neopixel_write(self, gpio, buf):
+ # open serial (data is sent over this)
+ if self._serial is None:
+ import serial
+ import serial.tools.list_ports
+
+ ports = serial.tools.list_ports.comports()
+ for port in ports:
+ if port.vid == self.VID and port.pid == self.PID:
+ self._serial = serial.Serial(port.device)
+ break
+ if self._serial is None:
+ raise RuntimeError("Could not find Pico com port.")
+
+ # init
+ if not self._neopixel_initialized:
+ # deinit any current setup
+ self._hid_xfer(bytes([self.WS2812B_DEINIT]))
+ resp = self._hid_xfer(
+ bytes(
+ [
+ self.WS2812B_INIT,
+ gpio._pin.id,
+ ]
+ ),
+ True,
+ )
+ if resp[1] != self.RESP_OK:
+ raise RuntimeError("Neopixel init error")
+ self._neopixel_initialized = True
+
+ # write
+ # command is done over HID
+ remain_bytes = len(buf)
+ resp = self._hid_xfer(
+ bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
+ True,
+ )
+ if resp[1] != self.RESP_OK:
+ if resp[2] == 0x01:
+ raise RuntimeError(
+ "Neopixel write error : too many pixel for the firmware."
+ )
+ elif resp[2] == 0x02:
+ raise RuntimeError(
+ "Neopixel write error : transfer already in progress."
+ )
+ else:
+ raise RuntimeError("Neopixel write error")
+ # buffer is sent over serial
+ self._serial.write(buf)
+ self._serial.flush()
+
+ # ----------------------------------------------------------------
+ # PWM
+ # ----------------------------------------------------------------
+ def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
+ self.pwm_deinit(pin)
+ resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
+ if resp[1] != self.RESP_OK:
+ raise RuntimeError("PWM init error.")
+
+ self.pwm_set_frequency(pin, frequency)
+ self.pwm_set_duty_cycle(pin, duty_cycle)
+
+ def pwm_deinit(self, pin):
+ self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
+
+ def pwm_get_frequency(self, pin):
+ resp = self._hid_xfer(
+ bytes([self.PWM_GET_FREQ, pin.id])
+ + frequency.to_bytes(4, byteorder="little"),
+ True,
+ )
+ if resp[1] != self.RESP_OK:
+ raise RuntimeError("PWM get frequency error.")
+ return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
+
+ def pwm_set_frequency(self, pin, frequency):
+ resp = self._hid_xfer(
+ bytes([self.PWM_SET_FREQ, pin.id])
+ + frequency.to_bytes(4, byteorder="little"),
+ True,
+ )
+ if resp[1] != self.RESP_OK:
+ if resp[3] == 0x01:
+ raise RuntimeError("PWM different frequency on same slice.")
+ elif resp[3] == 0x02:
+ raise RuntimeError("PWM frequency too low.")
+ elif resp[3] == 0x03:
+ raise RuntimeError("PWM frequency too high.")
+ else:
+ raise RuntimeError("PWM frequency error.")
+
+ def pwm_get_duty_cycle(self, pin):
+ resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
+ if resp[1] != self.RESP_OK:
+ raise RuntimeError("PWM get duty cycle error.")
+ return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
+
+ def pwm_set_duty_cycle(self, pin, duty_cycle):
+ resp = self._hid_xfer(
+ bytes([self.PWM_SET_DUTY_U16, pin.id])
+ + duty_cycle.to_bytes(2, byteorder="little"),
+ True,
+ )
+ if resp[1] != self.RESP_OK:
+ raise RuntimeError("PWM set duty cycle error.")
+
+
+pico_u2if = Pico_u2if()