+ if usb_cmd_status == RESP_I2C_WRITINGNOSTOP and cmd == 0x94:
+ break # this is OK too!
+ if usb_cmd_status in (
+ RESP_I2C_START_TOUT,
+ RESP_I2C_WRADDRL_TOUT,
+ RESP_I2C_WRADDRL_NACK,
+ RESP_I2C_WRDATA_TOUT,
+ RESP_I2C_STOP_TOUT,
+ ):
+ raise RuntimeError("Unrecoverable I2C state failure")
+ time.sleep(0.001)
+ else:
+ raise RuntimeError("I2C write error: max retries reached.")
+ # whew success!
+
+ def _i2c_read(self, cmd, address, buffer, start=0, end=None):
+ if self._i2c_state() not in (RESP_I2C_WRITINGNOSTOP, 0):
+ self._i2c_cancel()
+
+ end = end if end else len(buffer)
+ length = end - start
+
+ # tell it we want to read
+ resp = self._hid_xfer(
+ bytes([cmd, length & 0xFF, (length >> 8) & 0xFF, (address << 1) | 0x01])
+ )
+
+ # check for success
+ if resp[1] != 0x00:
+ raise RuntimeError("Unrecoverable I2C read failure")
+
+ # and now the read part
+ while (end - start) > 0:
+ for _ in range(MCP2221_RETRY_MAX):
+ # the actual read
+ resp = self._hid_xfer(b"\x40")
+ # check for success
+ if resp[1] == RESP_I2C_PARTIALDATA:
+ time.sleep(0.001)
+ continue
+ if resp[1] != 0x00:
+ raise RuntimeError("Unrecoverable I2C read failure")
+ if resp[2] == RESP_ADDR_NACK:
+ raise RuntimeError("I2C NACK")
+ if resp[3] == 0x00 and resp[2] == 0x00:
+ break
+ if resp[3] == RESP_READ_ERR:
+ time.sleep(0.001)
+ continue
+ if resp[2] in (RESP_READ_COMPL, RESP_READ_PARTIAL):
+ break
+ else:
+ raise RuntimeError("I2C read error: max retries reached.")
+
+ # move data into buffer
+ chunk = min(end - start, 60)
+ for i, k in enumerate(range(start, start + chunk)):
+ buffer[k] = resp[4 + i]
+ start += chunk
+
+ # pylint: enable=too-many-arguments
+
+ def _i2c_configure(self, baudrate=100000):
+ """Configure I2C"""
+ self._hid_xfer(
+ bytes(
+ [
+ 0x10, # set parameters
+ 0x00, # don't care
+ 0x00, # no effect
+ 0x20, # next byte is clock divider
+ 12000000 // baudrate - 3,
+ ]
+ )
+ )
+
+ def i2c_writeto(self, address, buffer, *, start=0, end=None):
+ """Write data from the buffer to an address"""
+ self._i2c_write(0x90, address, buffer, start, end)
+
+ def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
+ """Read data from an address and into the buffer"""
+ self._i2c_read(0x91, address, buffer, start, end)
+
+ def i2c_writeto_then_readfrom(
+ self,
+ address,
+ out_buffer,
+ in_buffer,
+ *,
+ out_start=0,
+ out_end=None,
+ in_start=0,
+ in_end=None,
+ ):
+ """Write data from buffer_out to an address and then
+ read data from an address and into buffer_in
+ """
+ self._i2c_write(0x94, address, out_buffer, out_start, out_end)
+ self._i2c_read(0x93, address, in_buffer, in_start, in_end)