def __init__(self):
self._hid = hid.device()
self._hid.open(MCP2221.VID, MCP2221.PID)
+ self._reset()
def _hid_xfer(self, report, response=True):
# first byte is report ID, which =0 for MCP2221
# MISC
#----------------------------------------------------------------
def gp_get_mode(self, pin):
- return self._hid_xfer(bytes([0x61]))[22+pin] & 0x07
+ return self._hid_xfer(b'\x61')[22+pin] & 0x07
def gp_set_mode(self, pin, mode):
# get current settings
- current = self._hid_xfer(bytes([0x61]))
+ current = self._hid_xfer(b'\x61')
# empty report, this is safe since 0's = no change
- report = bytearray([0x60]+[0]*63)
+ report = bytearray(b'\x60'+b'\x00'*63)
# set the alter GP flag byte
report[7] = 0xFF
# each pin can be set individually
print()
def _status_dump(self):
- self._pretty_report(self._hid_xfer(bytes([0x10])))
+ self._pretty_report(self._hid_xfer(b'\x10'))
def _sram_dump(self):
- self._pretty_report(self._hid_xfer(bytes([0x61])))
+ self._pretty_report(self._hid_xfer(b'\x61'))
def _reset(self):
self._hid_xfer(b'\x70\xAB\xCD\xEF', response=False)
- time.sleep(1)
- self._hid.open(MCP2221.VID, MCP2221.PID)
+ start = time.monotonic()
+ while time.monotonic() - start < 5:
+ try:
+ self._hid.open(MCP2221.VID, MCP2221.PID)
+ except OSError:
+ # try again
+ time.sleep(0.1)
+ continue
+ return
+ raise OSError("open failed")
#----------------------------------------------------------------
# GPIO
#----------------------------------------------------------------
def gpio_set_direction(self, pin, mode):
- report = bytearray([0x50]+[0]*63) # empty set GPIO report
+ report = bytearray(b'\x50'+b'\x00'*63) # empty set GPIO report
offset = 4 * (pin + 1)
- report[offset] = 0x01 # set pin direction
- report[offset+1] = mode # to this
+ report[offset] = 0x01 # set pin direction
+ report[offset+1] = mode # to this
self._hid_xfer(report)
def gpio_set_pin(self, pin, value):
- report = bytearray([0x50]+[0]*63) # empty set GPIO report
+ report = bytearray(b'\x50'+b'\x00'*63) # empty set GPIO report
offset = 2 + 4 * pin
- report[offset] = 0x01 # set pin value
- report[offset+1] = value # to this
+ report[offset] = 0x01 # set pin value
+ report[offset+1] = value # to this
self._hid_xfer(report)
def gpio_get_pin(self, pin):
- resp = self._hid_xfer(bytes([0x51]))
+ resp = self._hid_xfer(b'\x51')
offset = 2 + 2 * pin
if resp[offset] == 0xEE:
raise RuntimeError("Pin is not set for GPIO operation.")
#----------------------------------------------------------------
# I2C
#
- # cribbed from the C driver:
- # define RESP_ERR_NOERR 0x00
- # define RESP_ADDR_NACK 0x25
- # define RESP_READ_ERR 0x7F
- # define RESP_READ_COMPL 0x55
+ # cribbed from the C driver
+ # http://ww1.microchip.com/downloads/en/DeviceDoc/mcp2221_0_1.tar.gz
# define RESP_I2C_IDLE 0x00
# define RESP_I2C_START_TOUT 0x12
# define RESP_I2C_RSTART_TOUT 0x17
# define RESP_I2C_RDDATA_TOUT 0x52
# define RESP_I2C_STOP_TOUT 0x62
#----------------------------------------------------------------
+ def _i2c_status(self):
+ return self._hid_xfer(b'\x10')[8]
+
+ def _i2c_cancel(self):
+ resp = self._hid_xfer(b'\x10\x00\x10')
+ if resp[2] == 0x10:
+ # bus release will need "a few hundred microseconds"
+ time.sleep(0.001)
+
+ def _i2c_write(self, cmd, address, buffer, start=0, end=None):
+ if self._i2c_status():
+ self._i2c_cancel()
+ end = end if end else len(buffer)
+ length = end - start
+ retries = 0
+ while (end - start) > 0:
+ chunk = min(end - start, 60)
+ # write out current chunk
+ resp = self._hid_xfer(bytes([cmd,
+ length & 0xFF,
+ (length >> 8) & 0xFF,
+ address << 1]) +
+ buffer[start:(start+chunk)])
+ # check for success
+ if resp[1] != 0x00:
+ retries += 1
+ if retries >= 5:
+ raise RuntimeError("I2C write error, max retries reached.")
+ time.sleep(0.001)
+ continue # try again
+ start += chunk
+ retries = 0
+
+ def _i2c_read(self, cmd, address, buffer, start=0, end=None):
+ if self._i2c_status():
+ self._i2c_cancel()
+ end = end if end else len(buffer)
+ length = end - start
+ retries = 0
+ while (end - start) > 0:
+ # tell it we want to read
+ resp = self._hid_xfer(bytes([cmd,
+ length & 0xFF,
+ (length >> 8) & 0xFF,
+ (address << 1) | 0x01]))
+ # and then actually read
+ resp = self._hid_xfer(b'\x40')
+ # check for success
+ if resp[1] != 0x00:
+ retries += 1
+ if retries >= 5:
+ raise RuntimeError("I2C write error, max retries reached.")
+ time.sleep(0.001)
+ continue
+ # move data into buffer
+ chunk = min(end - start, 60)
+ for i, k in enumerate(range(start, start+chunk)):
+ buffer[k] = resp[4 + i]
+ start += chunk
+
def i2c_configure(self, baudrate=100000):
self._hid_xfer(bytes([0x10, # set parameters
0x00, # don't care
12000000 // baudrate - 3]))
def i2c_writeto(self, address, buffer, *, start=0, end=None):
- end = end if end else len(buffer)
- self._hid_xfer(bytes([0x90, # i2c write data
- end - start & 0xFF, # xfer length lo byte
- end - start >> 8 & 0xFF, # xfer length hi byte
- address << 1]) + # i2c slave address
- buffer[start:end]) # user data to be sent
+ self._i2c_write(0x90, address, buffer, start, end)
def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
- end = end if end else len(buffer)
- retries = 0
- while retries < 5:
- #
- # why does this require two xfers?
- #
- # 1. tell it we want to read
- self._hid_xfer(bytes([0x91, # i2c read data
- end - start & 0xFF, # xfer length lo byte
- end - start >> 8 & 0xFF, # xfer length hi byte
- address << 1 | 0x01])) # i2c slave address
- # 2. and then actually read
- response = self._hid_xfer(bytes([0x40]))
- # check for success
- if response[1] == 0x00:
- break
- retries += 1
- if retries >= 5:
- raise RuntimeError("I2C read error, max retries reached.")
- # move data into buffer
- for i in range(end - start):
- buffer[start + i] = response[4 + i]
+ self._i2c_read(0x91, address, buffer, start, end)
def i2c_writeto_then_readfrom(self, address, out_buffer, in_buffer, *,
out_start=0, out_end=None,
in_start=0, in_end=None):
- out_end = out_end if out_end else len(buffer_out)
- in_end = in_end if in_end else len(buffer_in)
- self._hid_xfer(bytes([0x94, # i2c write data no stop
- out_end - out_start & 0xFF, # xfer length lo byte
- out_end - out_start >> 8 & 0xFF, # xfer length hi byte
- address << 1]) + # i2c slave address
- out_buffer[out_start:out_end]) # user data to be sent
- retries = 5
- while retries < 5:
- #
- # why does this require two xfers?
- #
- # 1. tell it we want to read
- self._hid_xfer(bytes([0x93, # i2c read data repeated start
- in_end - in_start & 0xFF, # xfer length lo byte
- in_end - in_start >> 8 & 0xFF, # xfer length hi byte
- address << 1 | 0x01])) # i2c slave address
- # 2. and then actually read
- response = self._hid_xfer(bytes([0x40]))
- # check for success
- if response[1] == 0x00:
- break
- retries += 1
- if retries >= 5:
- raise RuntimeError("I2C read error, max retries reached.")
- # move data into buffer
- for i in range(in_end - in_start):
- in_buffer[in_start + i] = response[4 + i]
+ self._i2c_write(0x94, address, out_buffer, out_start, out_end)
+ self._i2c_read(0x93, address, in_buffer, in_start, in_end)
def i2c_scan(self, *, start=0, end=0x79):
found = []
# try a write
self.i2c_writeto(addr, b'\x00')
# store if success
- if self._hid_xfer(b'\x10')[8] == 0x00:
+ if self._i2c_status() == 0x00:
found.append(addr)
# cancel and continue
- self._hid_xfer(b'\x10\x00\x10')
+ self._i2c_cancel()
return found
#----------------------------------------------------------------
# ADC
#----------------------------------------------------------------
def adc_configure(self, vref=0):
- report = bytearray([0x60]+[0]*63)
+ report = bytearray(b'\x60'+b'\x00'*63)
report[5] = 1 << 7 | (vref & 0b111)
self._hid_xfer(report)
def adc_read(self, pin):
- resp = self._hid_xfer(bytes([0x10]))
+ resp = self._hid_xfer(b'\x10')
return resp[49 + 2 * pin] << 8 | resp[48 + 2 * pin]
#----------------------------------------------------------------
# DAC
#----------------------------------------------------------------
def dac_configure(self, vref=0):
- report = bytearray([0x60]+[0]*63)
+ report = bytearray(b'\x60'+b'\x00'*63)
report[3] = 1 << 7 | (vref & 0b111)
self._hid_xfer(report)
def dac_write(self, pin, value):
- report = bytearray([0x60]+[0]*63)
+ report = bytearray(b'\x60'+b'\x00'*63)
report[4] = 1 << 7 | (value & 0b11111)
self._hid_xfer(report)