+import os
import time
import hid
+# Small values seem to help on some Windows setups
+MCP2221_HID_DELAY = float(os.environ.get('BLINKA_MCP2221_HID_DELAY', 0))
+
+# from the C driver
+# http://ww1.microchip.com/downloads/en/DeviceDoc/mcp2221_0_1.tar.gz
+# others (???) determined during driver developement
+# pylint: disable=bad-whitespace
+RESP_ERR_NOERR = 0x00
+RESP_ADDR_NACK = 0x25
+RESP_READ_ERR = 0x7F
+RESP_READ_COMPL = 0x55
+RESP_READ_PARTIAL = 0x54 # ???
+RESP_I2C_IDLE = 0x00
+RESP_I2C_START_TOUT = 0x12
+RESP_I2C_RSTART_TOUT = 0x17
+RESP_I2C_WRADDRL_TOUT = 0x23
+RESP_I2C_WRADDRL_WSEND = 0x21
+RESP_I2C_WRADDRL_NACK = 0x25
+RESP_I2C_WRDATA_TOUT = 0x44
+RESP_I2C_RDDATA_TOUT = 0x52
+RESP_I2C_STOP_TOUT = 0x62
+
+RESP_I2C_MOREDATA = 0x43 # ???
+RESP_I2C_PARTIALDATA = 0x41 # ???
+RESP_I2C_WRITINGNOSTOP = 0x45 # ???
+
+MCP2221_RETRY_MAX = 50
+MCP2221_MAX_I2C_DATA_LEN = 60
+MASK_ADDR_NACK = 0x40
+# pylint: enable=bad-whitespace
+
class MCP2221:
VID = 0x04D8
self._hid = hid.device()
self._hid.open(MCP2221.VID, MCP2221.PID)
self._reset()
+ time.sleep(0.25)
def _hid_xfer(self, report, response=True):
# first byte is report ID, which =0 for MCP2221
# remaing bytes = 64 byte report data
# https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
self._hid.write(b'\0' + report + b'\0'*(64-len(report)))
+ time.sleep(MCP2221_HID_DELAY)
if response:
# return is 64 byte response report
return self._hid.read(64)
#----------------------------------------------------------------
# I2C
- #
- # cribbed from the C driver
- # http://ww1.microchip.com/downloads/en/DeviceDoc/mcp2221_0_1.tar.gz
- # define RESP_I2C_IDLE 0x00
- # define RESP_I2C_START_TOUT 0x12
- # define RESP_I2C_RSTART_TOUT 0x17
- # define RESP_I2C_WRADDRL_TOUT 0x23
- # define RESP_I2C_WRADDRL_WSEND 0x21
- # define RESP_I2C_WRADDRL_NACK 0x25
- # define RESP_I2C_WRDATA_TOUT 0x44
- # define RESP_I2C_RDDATA_TOUT 0x52
- # define RESP_I2C_STOP_TOUT 0x62
#----------------------------------------------------------------
def _i2c_status(self):
- return self._hid_xfer(b'\x10')[8]
+ resp = self._hid_xfer(b'\x10')
+ if resp[1] != 0:
+ raise RuntimeError("Couldn't get I2C status")
+ return resp
+
+ def _i2c_state(self):
+ return self._i2c_status()[8]
def _i2c_cancel(self):
resp = self._hid_xfer(b'\x10\x00\x10')
+ if resp[1] != 0x00:
+ raise RuntimeError("Couldn't cancel I2C")
if resp[2] == 0x10:
# bus release will need "a few hundred microseconds"
time.sleep(0.001)
def _i2c_write(self, cmd, address, buffer, start=0, end=None):
- if self._i2c_status():
+ if self._i2c_state() != 0x00:
self._i2c_cancel()
+
end = end if end else len(buffer)
length = end - start
retries = 0
+
while (end - start) > 0:
- chunk = min(end - start, 60)
+ chunk = min(end - start, MCP2221_MAX_I2C_DATA_LEN)
# write out current chunk
resp = self._hid_xfer(bytes([cmd,
length & 0xFF,
buffer[start:(start+chunk)])
# check for success
if resp[1] != 0x00:
+ if resp[2] in (RESP_I2C_START_TOUT,
+ RESP_I2C_WRADDRL_TOUT,
+ RESP_I2C_WRADDRL_NACK,
+ RESP_I2C_WRDATA_TOUT,
+ RESP_I2C_STOP_TOUT):
+ raise RuntimeError("Unrecoverable I2C state failure")
retries += 1
- if retries >= 5:
+ if retries >= MCP2221_RETRY_MAX:
raise RuntimeError("I2C write error, max retries reached.")
time.sleep(0.001)
continue # try again
+ # yay chunk sent!
+ while self._i2c_state() == RESP_I2C_PARTIALDATA:
+ time.sleep(0.001)
start += chunk
retries = 0
+ # check status in another loop
+ for _ in range(MCP2221_RETRY_MAX):
+ status = self._i2c_status()
+ if status[20] & MASK_ADDR_NACK:
+ raise RuntimeError("I2C slave address was NACK'd")
+ usb_cmd_status = status[8]
+ if usb_cmd_status == 0:
+ break
+ if usb_cmd_status == RESP_I2C_WRITINGNOSTOP and cmd == 0x94:
+ break # this is OK too!
+ if usb_cmd_status in (RESP_I2C_START_TOUT,
+ RESP_I2C_WRADDRL_TOUT,
+ RESP_I2C_WRADDRL_NACK,
+ RESP_I2C_WRDATA_TOUT,
+ RESP_I2C_STOP_TOUT):
+ raise RuntimeError("Unrecoverable I2C state failure")
+ time.sleep(0.001)
+ else:
+ raise RuntimeError("I2C write error: max retries reached.")
+ # whew success!
+
def _i2c_read(self, cmd, address, buffer, start=0, end=None):
- if self._i2c_status():
+ if self._i2c_state() not in (RESP_I2C_WRITINGNOSTOP, 0):
self._i2c_cancel()
+
end = end if end else len(buffer)
length = end - start
- retries = 0
+
+ # tell it we want to read
+ resp = self._hid_xfer(bytes([cmd,
+ length & 0xFF,
+ (length >> 8) & 0xFF,
+ (address << 1) | 0x01]))
+
+ # check for success
+ if resp[1] != 0x00:
+ raise RuntimeError("Unrecoverable I2C read failure")
+
+ # and now the read part
while (end - start) > 0:
- # tell it we want to read
- resp = self._hid_xfer(bytes([cmd,
- length & 0xFF,
- (length >> 8) & 0xFF,
- (address << 1) | 0x01]))
- # and then actually read
- resp = self._hid_xfer(b'\x40')
- # check for success
- if resp[1] != 0x00:
- retries += 1
- if retries >= 5:
- raise RuntimeError("I2C write error, max retries reached.")
- time.sleep(0.001)
- continue
+ for retry in range(MCP2221_RETRY_MAX):
+ # the actual read
+ resp = self._hid_xfer(b'\x40')
+ # check for success
+ if resp[1] == RESP_I2C_PARTIALDATA:
+ time.sleep(0.001)
+ continue
+ if resp[1] != 0x00:
+ raise RuntimeError("Unrecoverable I2C read failure")
+ if resp[2] == RESP_ADDR_NACK:
+ raise RuntimeError("I2C NACK")
+ if resp[3] == 0x00 and resp[2] == 0x00:
+ break
+ if resp[3] == RESP_READ_ERR:
+ time.sleep(0.001)
+ continue
+ if resp[2] in (RESP_READ_COMPL, RESP_READ_PARTIAL):
+ break
+
# move data into buffer
chunk = min(end - start, 60)
for i, k in enumerate(range(start, start+chunk)):
found = []
for addr in range(start, end+1):
# try a write
- self.i2c_writeto(addr, b'\x00')
+ try:
+ self.i2c_writeto(addr, b'\x00')
+ except RuntimeError: # no reply!
+ continue
# store if success
- if self._i2c_status() == 0x00:
- found.append(addr)
- # cancel and continue
- self._i2c_cancel()
+ found.append(addr)
return found
#----------------------------------------------------------------