+ def _i2c_status(self):
+ resp = self._hid_xfer(b'\x10')
+ if resp[1] != 0:
+ raise RuntimeError("Couldn't get I2C status")
+ return resp
+
+ def _i2c_state(self):
+ return self._i2c_status()[8]
+
+ def _i2c_cancel(self):
+ resp = self._hid_xfer(b'\x10\x00\x10')
+ if resp[1] != 0x00:
+ raise RuntimeError("Couldn't cancel I2C")
+ if resp[2] == 0x10:
+ # bus release will need "a few hundred microseconds"
+ time.sleep(0.001)
+
+ def _i2c_write(self, cmd, address, buffer, start=0, end=None):
+ if self._i2c_state() != 0x00:
+ self._i2c_cancel()
+
+ end = end if end else len(buffer)
+ length = end - start
+ retries = 0
+
+ while (end - start) > 0:
+ chunk = min(end - start, MCP2221_MAX_I2C_DATA_LEN)
+ # write out current chunk
+ resp = self._hid_xfer(bytes([cmd,
+ length & 0xFF,
+ (length >> 8) & 0xFF,
+ address << 1]) +
+ buffer[start:(start+chunk)])
+ # check for success
+ if resp[1] != 0x00:
+ if resp[2] in (RESP_I2C_START_TOUT,
+ RESP_I2C_WRADDRL_TOUT,
+ RESP_I2C_WRADDRL_NACK,
+ RESP_I2C_WRDATA_TOUT,
+ RESP_I2C_STOP_TOUT):
+ raise RuntimeError("Unrecoverable I2C state failure")
+ retries += 1
+ if retries >= MCP2221_RETRY_MAX:
+ raise RuntimeError("I2C write error, max retries reached.")
+ time.sleep(0.001)
+ continue # try again
+ # yay chunk sent!
+ while self._i2c_state() == RESP_I2C_PARTIALDATA:
+ time.sleep(0.001)
+ start += chunk
+ retries = 0
+
+ # check status in another loop
+ for _ in range(MCP2221_RETRY_MAX):
+ status = self._i2c_status()
+ if status[20] & MASK_ADDR_NACK:
+ raise RuntimeError("I2C slave address was NACK'd")
+ usb_cmd_status = status[8]
+ if usb_cmd_status == 0:
+ break
+ if usb_cmd_status == RESP_I2C_WRITINGNOSTOP and cmd == 0x94:
+ break # this is OK too!
+ if usb_cmd_status in (RESP_I2C_START_TOUT,
+ RESP_I2C_WRADDRL_TOUT,
+ RESP_I2C_WRADDRL_NACK,
+ RESP_I2C_WRDATA_TOUT,
+ RESP_I2C_STOP_TOUT):
+ raise RuntimeError("Unrecoverable I2C state failure")
+ time.sleep(0.001)
+ else:
+ raise RuntimeError("I2C write error: max retries reached.")
+ # whew success!
+
+ def _i2c_read(self, cmd, address, buffer, start=0, end=None):
+ if self._i2c_state() not in (RESP_I2C_WRITINGNOSTOP, 0):
+ self._i2c_cancel()
+
+ end = end if end else len(buffer)
+ length = end - start
+
+ # tell it we want to read
+ resp = self._hid_xfer(bytes([cmd,
+ length & 0xFF,
+ (length >> 8) & 0xFF,
+ (address << 1) | 0x01]))
+
+ # check for success
+ if resp[1] != 0x00:
+ raise RuntimeError("Unrecoverable I2C read failure")
+
+ # and now the read part
+ while (end - start) > 0:
+ for retry in range(MCP2221_RETRY_MAX):
+ # the actual read
+ resp = self._hid_xfer(b'\x40')
+ # check for success
+ if resp[1] == RESP_I2C_PARTIALDATA:
+ time.sleep(0.001)
+ continue
+ if resp[1] != 0x00:
+ raise RuntimeError("Unrecoverable I2C read failure")
+ if resp[2] == RESP_ADDR_NACK:
+ raise RuntimeError("I2C NACK")
+ if resp[3] == 0x00 and resp[2] == 0x00:
+ break
+ if resp[3] == RESP_READ_ERR:
+ time.sleep(0.001)
+ continue
+ if resp[2] in (RESP_READ_COMPL, RESP_READ_PARTIAL):
+ break
+
+ # move data into buffer
+ chunk = min(end - start, 60)
+ for i, k in enumerate(range(start, start+chunk)):
+ buffer[k] = resp[4 + i]
+ start += chunk
+