X-Git-Url: https://git.ayoreis.com/Adafruit_Blinka-hackapet.git/blobdiff_plain/120c7a7f4c7559ede6a7d098e4800663381fc93d..050c6e5bdbe0d75d4b0f053addb368421835a3a7:/src/adafruit_blinka/microcontroller/pico_u2if/pico_u2if.py diff --git a/src/adafruit_blinka/microcontroller/pico_u2if/pico_u2if.py b/src/adafruit_blinka/microcontroller/pico_u2if/pico_u2if.py deleted file mode 100644 index 687bfc0..0000000 --- a/src/adafruit_blinka/microcontroller/pico_u2if/pico_u2if.py +++ /dev/null @@ -1,492 +0,0 @@ -"""Chip Definition for Pico with u2if firmware""" -# https://github.com/execuc/u2if - -import os -import time -import hid - -# Use to set delay between reset and device reopen. if negative, don't reset at all -PICO_U2IF_RESET_DELAY = float(os.environ.get("PICO_U2IF_RESET_DELAY", 1)) - -# pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements -# pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods - - -class Pico_u2if: - """MCP2221 Device Class Definition""" - - VID = 0xCAFE - PID = 0x4005 - - # MISC - RESP_OK = 0x01 - SYS_RESET = 0x10 - - # GPIO - GPIO_INIT_PIN = 0x20 - GPIO_SET_VALUE = 0x21 - GPIO_GET_VALUE = 0x22 - - # ADC - ADC_INIT_PIN = 0x40 - ADC_GET_VALUE = 0x41 - - # I2C - I2C0_INIT = 0x80 - I2C0_DEINIT = 0x81 - I2C0_WRITE = 0x82 - I2C0_READ = 0x83 - I2C0_WRITE_FROM_UART = 0x84 - I2C1_INIT = I2C0_INIT + 0x10 - I2C1_DEINIT = I2C0_DEINIT + 0x10 - I2C1_WRITE = I2C0_WRITE + 0x10 - I2C1_READ = I2C0_READ + 0x10 - I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10 - - # SPI - SPI0_INIT = 0x60 - SPI0_DEINIT = 0x61 - SPI0_WRITE = 0x62 - SPI0_READ = 0x63 - SPI0_WRITE_FROM_UART = 0x64 - SPI1_INIT = SPI0_INIT + 0x10 - SPI1_DEINIT = SPI0_DEINIT + 0x10 - SPI1_WRITE = SPI0_WRITE + 0x10 - SPI1_READ = SPI0_READ + 0x10 - SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10 - - # WS2812B (LED) - WS2812B_INIT = 0xA0 - WS2812B_DEINIT = 0xA1 - WS2812B_WRITE = 0xA2 - - # PWM - PWM_INIT_PIN = 0x30 - PWM_DEINIT_PIN = 0x31 - PWM_SET_FREQ = 0x32 - PWM_GET_FREQ = 0x33 - PWM_SET_DUTY_U16 = 0x34 - PWM_GET_DUTY_U16 = 0x35 - PWM_SET_DUTY_NS = 0x36 - PWM_GET_DUTY_NS = 0x37 - - def __init__(self): - self._hid = hid.device() - self._hid.open(Pico_u2if.VID, Pico_u2if.PID) - if PICO_U2IF_RESET_DELAY >= 0: - self._reset() - self._i2c_index = None - self._spi_index = None - self._serial = None - self._neopixel_initialized = False - self._uart_rx_buffer = None - - def _hid_xfer(self, report, response=True): - """Perform HID Transfer""" - # first byte is report ID, which =0 - # remaing bytes = 64 byte report data - # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185 - self._hid.write(b"\0" + report + b"\0" * (64 - len(report))) - if response: - # return is 64 byte response report - return self._hid.read(64) - return None - - def _reset(self): - self._hid_xfer(bytes([self.SYS_RESET]), False) - time.sleep(PICO_U2IF_RESET_DELAY) - start = time.monotonic() - while time.monotonic() - start < 5: - try: - self._hid.open(Pico_u2if.VID, Pico_u2if.PID) - except OSError: - time.sleep(0.1) - continue - return - raise OSError("Pico open error.") - - # ---------------------------------------------------------------- - # GPIO - # ---------------------------------------------------------------- - def gpio_init_pin(self, pin_id, direction, pull): - """Configure GPIO Pin.""" - self._hid_xfer( - bytes( - [ - self.GPIO_INIT_PIN, - pin_id, - direction, - pull, - ] - ) - ) - - def gpio_set_pin(self, pin_id, value): - """Set Current GPIO Pin Value""" - self._hid_xfer( - bytes( - [ - self.GPIO_SET_VALUE, - pin_id, - int(value), - ] - ) - ) - - def gpio_get_pin(self, pin_id): - """Get Current GPIO Pin Value""" - resp = self._hid_xfer( - bytes( - [ - self.GPIO_GET_VALUE, - pin_id, - ] - ), - True, - ) - return resp[3] != 0x00 - - # ---------------------------------------------------------------- - # ADC - # ---------------------------------------------------------------- - def adc_init_pin(self, pin_id): - """Configure ADC Pin.""" - self._hid_xfer( - bytes( - [ - self.ADC_INIT_PIN, - pin_id, - ] - ) - ) - - def adc_get_value(self, pin_id): - """Get ADC value for pin.""" - resp = self._hid_xfer( - bytes( - [ - self.ADC_GET_VALUE, - pin_id, - ] - ), - True, - ) - return int.from_bytes(resp[3 : 3 + 2], byteorder="little") - - # ---------------------------------------------------------------- - # I2C - # ---------------------------------------------------------------- - def i2c_configure(self, baudrate, pullup=False): - """Configure I2C.""" - if self._i2c_index is None: - raise RuntimeError("I2C bus not initialized.") - - resp = self._hid_xfer( - bytes( - [ - self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT, - 0x00 if not pullup else 0x01, - ] - ) - + baudrate.to_bytes(4, byteorder="little"), - True, - ) - if resp[1] != self.RESP_OK: - raise RuntimeError("I2C init error.") - - def i2c_set_port(self, index): - """Set I2C port.""" - if index not in (0, 1): - raise ValueError("I2C index must be 0 or 1.") - self._i2c_index = index - - def _i2c_write(self, address, buffer, start=0, end=None, stop=True): - """Write data from the buffer to an address""" - if self._i2c_index is None: - raise RuntimeError("I2C bus not initialized.") - - end = end if end else len(buffer) - - write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE - stop_flag = 0x01 if stop else 0x00 - - while (end - start) > 0: - remain_bytes = end - start - chunk = min(remain_bytes, 64 - 7) - resp = self._hid_xfer( - bytes([write_cmd, address, stop_flag]) - + remain_bytes.to_bytes(4, byteorder="little") - + buffer[start : (start + chunk)], - True, - ) - if resp[1] != self.RESP_OK: - raise RuntimeError("I2C write error") - start += chunk - - def _i2c_read(self, address, buffer, start=0, end=None): - """Read data from an address and into the buffer""" - # TODO: support chunkified reads - if self._i2c_index is None: - raise RuntimeError("I2C bus not initialized.") - - end = end if end else len(buffer) - - read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ - stop_flag = 0x01 # always stop - read_size = end - start - - resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True) - if resp[1] != self.RESP_OK: - raise RuntimeError("I2C write error") - # move into buffer - for i in range(read_size): - buffer[start + i] = resp[i + 2] - - def i2c_writeto(self, address, buffer, *, start=0, end=None): - """Write data from the buffer to an address""" - self._i2c_write(address, buffer, start, end) - - def i2c_readfrom_into(self, address, buffer, *, start=0, end=None): - """Read data from an address and into the buffer""" - self._i2c_read(address, buffer, start, end) - - def i2c_writeto_then_readfrom( - self, - address, - out_buffer, - in_buffer, - *, - out_start=0, - out_end=None, - in_start=0, - in_end=None - ): - """Write data from buffer_out to an address and then - read data from an address and into buffer_in - """ - self._i2c_write(address, out_buffer, out_start, out_end, False) - self._i2c_read(address, in_buffer, in_start, in_end) - - def i2c_scan(self, *, start=0, end=0x79): - """Perform an I2C Device Scan""" - if self._i2c_index is None: - raise RuntimeError("I2C bus not initialized.") - found = [] - for addr in range(start, end + 1): - # try a write - try: - self.i2c_writeto(addr, b"\x00\x00\x00") - except RuntimeError: # no reply! - continue - # store if success - found.append(addr) - return found - - # ---------------------------------------------------------------- - # SPI - # ---------------------------------------------------------------- - def spi_configure(self, baudrate): - """Configure SPI.""" - if self._spi_index is None: - raise RuntimeError("SPI bus not initialized.") - - resp = self._hid_xfer( - bytes( - [ - self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT, - 0x00, # mode, not yet implemented - ] - ) - + baudrate.to_bytes(4, byteorder="little"), - True, - ) - if resp[1] != self.RESP_OK: - raise RuntimeError("SPI init error.") - - def spi_set_port(self, index): - """Set SPI port.""" - if index not in (0, 1): - raise ValueError("SPI index must be 0 or 1.") - self._spi_index = index - - def spi_write(self, buffer, *, start=0, end=None): - """SPI write.""" - if self._spi_index is None: - raise RuntimeError("SPI bus not initialized.") - - end = end if end else len(buffer) - - write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE - - while (end - start) > 0: - remain_bytes = end - start - chunk = min(remain_bytes, 64 - 3) - resp = self._hid_xfer( - bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True - ) - if resp[1] != self.RESP_OK: - raise RuntimeError("SPI write error") - start += chunk - - def spi_readinto(self, buffer, *, start=0, end=None, write_value=0): - """SPI readinto.""" - if self._spi_index is None: - raise RuntimeError("SPI bus not initialized.") - - end = end if end else len(buffer) - read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ - read_size = end - start - - resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True) - if resp[1] != self.RESP_OK: - raise RuntimeError("SPI write error") - # move into buffer - for i in range(read_size): - buffer[start + i] = resp[i + 2] - - def spi_write_readinto( - self, - buffer_out, - buffer_in, - *, - out_start=0, - out_end=None, - in_start=0, - in_end=None - ): - """SPI write and readinto.""" - raise NotImplementedError("SPI write_readinto Not implemented") - - # ---------------------------------------------------------------- - # NEOPIXEL - # ---------------------------------------------------------------- - def neopixel_write(self, gpio, buf): - """NeoPixel write.""" - # open serial (data is sent over this) - if self._serial is None: - import serial - import serial.tools.list_ports - - ports = serial.tools.list_ports.comports() - for port in ports: - if port.vid == self.VID and port.pid == self.PID: - self._serial = serial.Serial(port.device) - break - if self._serial is None: - raise RuntimeError("Could not find Pico com port.") - - # init - if not self._neopixel_initialized: - # deinit any current setup - # pylint: disable=protected-access - self._hid_xfer(bytes([self.WS2812B_DEINIT])) - resp = self._hid_xfer( - bytes( - [ - self.WS2812B_INIT, - gpio._pin.id, - ] - ), - True, - ) - if resp[1] != self.RESP_OK: - raise RuntimeError("Neopixel init error") - self._neopixel_initialized = True - - self._serial.reset_output_buffer() - - # write - # command is done over HID - remain_bytes = len(buf) - resp = self._hid_xfer( - bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"), - True, - ) - if resp[1] != self.RESP_OK: - # pylint: disable=no-else-raise - if resp[2] == 0x01: - raise RuntimeError( - "Neopixel write error : too many pixel for the firmware." - ) - elif resp[2] == 0x02: - print(resp[0:10]) - raise RuntimeError( - "Neopixel write error : transfer already in progress." - ) - else: - raise RuntimeError("Neopixel write error.") - # buffer is sent over serial - self._serial.write(buf) - # hack (see u2if) - if len(buf) % 64 == 0: - self._serial.write([0]) - self._serial.flush() - # polling loop to wait for write complete? - resp = self._hid.read(64) - while resp[0] != self.WS2812B_WRITE: - resp = self._hid.read(64) - if resp[1] != self.RESP_OK: - raise RuntimeError("Neopixel write (flush) error.") - - # ---------------------------------------------------------------- - # PWM - # ---------------------------------------------------------------- - # pylint: disable=unused-argument - def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False): - """Configure PWM.""" - self.pwm_deinit(pin) - resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True) - if resp[1] != self.RESP_OK: - raise RuntimeError("PWM init error.") - - self.pwm_set_frequency(pin, frequency) - self.pwm_set_duty_cycle(pin, duty_cycle) - - def pwm_deinit(self, pin): - """Deinit PWM.""" - self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id])) - - def pwm_get_frequency(self, pin): - """PWM get freq.""" - resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True) - if resp[1] != self.RESP_OK: - raise RuntimeError("PWM get frequency error.") - return int.from_bytes(resp[3 : 3 + 4], byteorder="little") - - def pwm_set_frequency(self, pin, frequency): - """PWM set freq.""" - resp = self._hid_xfer( - bytes([self.PWM_SET_FREQ, pin.id]) - + frequency.to_bytes(4, byteorder="little"), - True, - ) - if resp[1] != self.RESP_OK: - # pylint: disable=no-else-raise - if resp[3] == 0x01: - raise RuntimeError("PWM different frequency on same slice.") - elif resp[3] == 0x02: - raise RuntimeError("PWM frequency too low.") - elif resp[3] == 0x03: - raise RuntimeError("PWM frequency too high.") - else: - raise RuntimeError("PWM frequency error.") - - def pwm_get_duty_cycle(self, pin): - """PWM get duty cycle.""" - resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True) - if resp[1] != self.RESP_OK: - raise RuntimeError("PWM get duty cycle error.") - return int.from_bytes(resp[3 : 3 + 4], byteorder="little") - - def pwm_set_duty_cycle(self, pin, duty_cycle): - """PWM set duty cycle.""" - resp = self._hid_xfer( - bytes([self.PWM_SET_DUTY_U16, pin.id]) - + duty_cycle.to_bytes(2, byteorder="little"), - True, - ) - if resp[1] != self.RESP_OK: - raise RuntimeError("PWM set duty cycle error.") - - -pico_u2if = Pico_u2if()