+ def _i2c_status(self):
+ return self._hid_xfer(b'\x10')[8]
+
+ def _i2c_cancel(self):
+ resp = self._hid_xfer(b'\x10\x00\x10')
+ if resp[2] == 0x10:
+ # bus release will need "a few hundred microseconds"
+ time.sleep(0.001)
+
+ def _i2c_write(self, cmd, address, buffer, start=0, end=None):
+ if self._i2c_status():
+ self._i2c_cancel()
+ end = end if end else len(buffer)
+ length = end - start
+ retries = 0
+ while (end - start) > 0:
+ chunk = min(end - start, 60)
+ # write out current chunk
+ resp = self._hid_xfer(bytes([cmd,
+ length & 0xFF,
+ (length >> 8) & 0xFF,
+ address << 1]) +
+ buffer[start:(start+chunk)])
+ # check for success
+ if resp[1] != 0x00:
+ retries += 1
+ if retries >= 5:
+ raise RuntimeError("I2C write error, max retries reached.")
+ time.sleep(0.001)
+ continue # try again
+ start += chunk
+ retries = 0
+
+ def _i2c_read(self, cmd, address, buffer, start=0, end=None):
+ if self._i2c_status():
+ self._i2c_cancel()
+ end = end if end else len(buffer)
+ length = end - start
+ retries = 0
+ while (end - start) > 0:
+ # tell it we want to read
+ resp = self._hid_xfer(bytes([cmd,
+ length & 0xFF,
+ (length >> 8) & 0xFF,
+ (address << 1) | 0x01]))
+ # and then actually read
+ resp = self._hid_xfer(b'\x40')
+ # check for success
+ if resp[1] != 0x00:
+ retries += 1
+ if retries >= 5:
+ raise RuntimeError("I2C write error, max retries reached.")
+ time.sleep(0.001)
+ continue
+ # move data into buffer
+ chunk = min(end - start, 60)
+ for i, k in enumerate(range(start, start+chunk)):
+ buffer[k] = resp[4 + i]
+ start += chunk
+