5 # http://ww1.microchip.com/downloads/en/DeviceDoc/mcp2221_0_1.tar.gz
6 # others (???) determined during driver developement
7 # pylint: disable=bad-whitespace
11 RESP_READ_COMPL = 0x55
12 RESP_READ_PARTIAL = 0x54 # ???
14 RESP_I2C_START_TOUT = 0x12
15 RESP_I2C_RSTART_TOUT = 0x17
16 RESP_I2C_WRADDRL_TOUT = 0x23
17 RESP_I2C_WRADDRL_WSEND = 0x21
18 RESP_I2C_WRADDRL_NACK = 0x25
19 RESP_I2C_WRDATA_TOUT = 0x44
20 RESP_I2C_RDDATA_TOUT = 0x52
21 RESP_I2C_STOP_TOUT = 0x62
23 RESP_I2C_MOREDATA = 0x43 # ???
24 RESP_I2C_PARTIALDATA = 0x41 # ???
25 RESP_I2C_WRITINGNOSTOP = 0x45 # ???
27 MCP2221_RETRY_MAX = 50
28 MCP2221_MAX_I2C_DATA_LEN = 60
30 # pylint: enable=bad-whitespace
44 self._hid = hid.device()
45 self._hid.open(MCP2221.VID, MCP2221.PID)
49 def _hid_xfer(self, report, response=True):
50 # first byte is report ID, which =0 for MCP2221
51 # remaing bytes = 64 byte report data
52 # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
53 self._hid.write(b'\0' + report + b'\0'*(64-len(report)))
55 # return is 64 byte response report
56 return self._hid.read(64)
58 #----------------------------------------------------------------
60 #----------------------------------------------------------------
61 def gp_get_mode(self, pin):
62 return self._hid_xfer(b'\x61')[22+pin] & 0x07
64 def gp_set_mode(self, pin, mode):
65 # get current settings
66 current = self._hid_xfer(b'\x61')
67 # empty report, this is safe since 0's = no change
68 report = bytearray(b'\x60'+b'\x00'*63)
69 # set the alter GP flag byte
71 # each pin can be set individually
72 # but all 4 get set at once, so we need to
73 # transpose current settings
74 report[8] = current[22] # GP0
75 report[9] = current[23] # GP1
76 report[10] = current[24] # GP2
77 report[11] = current[25] # GP3
78 # then change only the one
79 report[8+pin] = mode & 0x07
81 self._hid_xfer(report)
83 def _pretty_report(self, report):
84 print(" 0 1 2 3 4 5 6 7 8 9")
87 print("{} : ".format(row), end='')
89 print("{:02x} ".format(report[index]), end='')
95 def _status_dump(self):
96 self._pretty_report(self._hid_xfer(b'\x10'))
99 self._pretty_report(self._hid_xfer(b'\x61'))
102 self._hid_xfer(b'\x70\xAB\xCD\xEF', response=False)
103 start = time.monotonic()
104 while time.monotonic() - start < 5:
106 self._hid.open(MCP2221.VID, MCP2221.PID)
112 raise OSError("open failed")
114 #----------------------------------------------------------------
116 #----------------------------------------------------------------
117 def gpio_set_direction(self, pin, mode):
118 report = bytearray(b'\x50'+b'\x00'*63) # empty set GPIO report
119 offset = 4 * (pin + 1)
120 report[offset] = 0x01 # set pin direction
121 report[offset+1] = mode # to this
122 self._hid_xfer(report)
124 def gpio_set_pin(self, pin, value):
125 report = bytearray(b'\x50'+b'\x00'*63) # empty set GPIO report
127 report[offset] = 0x01 # set pin value
128 report[offset+1] = value # to this
129 self._hid_xfer(report)
131 def gpio_get_pin(self, pin):
132 resp = self._hid_xfer(b'\x51')
134 if resp[offset] == 0xEE:
135 raise RuntimeError("Pin is not set for GPIO operation.")
139 #----------------------------------------------------------------
141 #----------------------------------------------------------------
142 def _i2c_status(self):
143 resp = self._hid_xfer(b'\x10')
145 raise RuntimeError("Couldn't get I2C status")
148 def _i2c_state(self):
149 return self._i2c_status()[8]
151 def _i2c_cancel(self):
152 resp = self._hid_xfer(b'\x10\x00\x10')
154 raise RuntimeError("Couldn't cancel I2C")
156 # bus release will need "a few hundred microseconds"
159 def _i2c_write(self, cmd, address, buffer, start=0, end=None):
160 if self._i2c_state() != 0x00:
163 end = end if end else len(buffer)
167 while (end - start) > 0:
168 chunk = min(end - start, MCP2221_MAX_I2C_DATA_LEN)
169 # write out current chunk
170 resp = self._hid_xfer(bytes([cmd,
172 (length >> 8) & 0xFF,
174 buffer[start:(start+chunk)])
177 if resp[2] in (RESP_I2C_START_TOUT,
178 RESP_I2C_WRADDRL_TOUT,
179 RESP_I2C_WRADDRL_NACK,
180 RESP_I2C_WRDATA_TOUT,
182 raise RuntimeError("Unrecoverable I2C state failure")
184 if retries >= MCP2221_RETRY_MAX:
185 raise RuntimeError("I2C write error, max retries reached.")
189 while self._i2c_state() == RESP_I2C_PARTIALDATA:
194 # check status in another loop
195 for _ in range(MCP2221_RETRY_MAX):
196 status = self._i2c_status()
197 if status[20] & MASK_ADDR_NACK:
198 raise RuntimeError("I2C slave address was NACK'd")
199 usb_cmd_status = status[8]
200 if usb_cmd_status == 0:
202 if usb_cmd_status == RESP_I2C_WRITINGNOSTOP and cmd == 0x94:
203 break # this is OK too!
204 if usb_cmd_status in (RESP_I2C_START_TOUT,
205 RESP_I2C_WRADDRL_TOUT,
206 RESP_I2C_WRADDRL_NACK,
207 RESP_I2C_WRDATA_TOUT,
209 raise RuntimeError("Unrecoverable I2C state failure")
212 raise RuntimeError("I2C write error: max retries reached.")
215 def _i2c_read(self, cmd, address, buffer, start=0, end=None):
216 if self._i2c_state() not in (RESP_I2C_WRITINGNOSTOP, 0):
219 end = end if end else len(buffer)
222 # tell it we want to read
223 resp = self._hid_xfer(bytes([cmd,
225 (length >> 8) & 0xFF,
226 (address << 1) | 0x01]))
230 raise RuntimeError("Unrecoverable I2C read failure")
232 # and now the read part
233 while (end - start) > 0:
234 for retry in range(MCP2221_RETRY_MAX):
236 resp = self._hid_xfer(b'\x40')
238 if resp[1] == RESP_I2C_PARTIALDATA:
242 raise RuntimeError("Unrecoverable I2C read failure")
243 if resp[2] == RESP_ADDR_NACK:
244 raise RuntimeError("I2C NACK")
245 if resp[3] == 0x00 and resp[2] == 0x00:
247 if resp[3] == RESP_READ_ERR:
250 if resp[2] in (RESP_READ_COMPL, RESP_READ_PARTIAL):
253 # move data into buffer
254 chunk = min(end - start, 60)
255 for i, k in enumerate(range(start, start+chunk)):
256 buffer[k] = resp[4 + i]
259 def i2c_configure(self, baudrate=100000):
260 self._hid_xfer(bytes([0x10, # set parameters
263 0x20, # next byte is clock divider
264 12000000 // baudrate - 3]))
266 def i2c_writeto(self, address, buffer, *, start=0, end=None):
267 self._i2c_write(0x90, address, buffer, start, end)
269 def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
270 self._i2c_read(0x91, address, buffer, start, end)
272 def i2c_writeto_then_readfrom(self, address, out_buffer, in_buffer, *,
273 out_start=0, out_end=None,
274 in_start=0, in_end=None):
275 self._i2c_write(0x94, address, out_buffer, out_start, out_end)
276 self._i2c_read(0x93, address, in_buffer, in_start, in_end)
278 def i2c_scan(self, *, start=0, end=0x79):
280 for addr in range(start, end+1):
282 self.i2c_writeto(addr, b'\x00')
284 if self._i2c_status() == 0x00:
286 # cancel and continue
290 #----------------------------------------------------------------
292 #----------------------------------------------------------------
293 def adc_configure(self, vref=0):
294 report = bytearray(b'\x60'+b'\x00'*63)
295 report[5] = 1 << 7 | (vref & 0b111)
296 self._hid_xfer(report)
298 def adc_read(self, pin):
299 resp = self._hid_xfer(b'\x10')
300 return resp[49 + 2 * pin] << 8 | resp[48 + 2 * pin]
302 #----------------------------------------------------------------
304 #----------------------------------------------------------------
305 def dac_configure(self, vref=0):
306 report = bytearray(b'\x60'+b'\x00'*63)
307 report[3] = 1 << 7 | (vref & 0b111)
308 self._hid_xfer(report)
310 def dac_write(self, pin, value):
311 report = bytearray(b'\x60'+b'\x00'*63)
312 report[4] = 1 << 7 | (value & 0b11111)
313 self._hid_xfer(report)