]> Repositories - Adafruit_Blinka-hackapet.git/blob - src/adafruit_blinka/microcontroller/mcp2221/mcp2221.py
updates for pyftdi
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / mcp2221 / mcp2221.py
1 import time
2 import hid
3
4 # from the C driver
5 # http://ww1.microchip.com/downloads/en/DeviceDoc/mcp2221_0_1.tar.gz
6 # others (???) determined during driver developement
7 # pylint: disable=bad-whitespace
8 RESP_ERR_NOERR              = 0x00
9 RESP_ADDR_NACK              = 0x25
10 RESP_READ_ERR               = 0x7F
11 RESP_READ_COMPL             = 0x55
12 RESP_READ_PARTIAL           = 0x54 # ???
13 RESP_I2C_IDLE               = 0x00
14 RESP_I2C_START_TOUT         = 0x12
15 RESP_I2C_RSTART_TOUT        = 0x17
16 RESP_I2C_WRADDRL_TOUT       = 0x23
17 RESP_I2C_WRADDRL_WSEND      = 0x21
18 RESP_I2C_WRADDRL_NACK       = 0x25
19 RESP_I2C_WRDATA_TOUT        = 0x44
20 RESP_I2C_RDDATA_TOUT        = 0x52
21 RESP_I2C_STOP_TOUT          = 0x62
22
23 RESP_I2C_MOREDATA           = 0x43 # ???
24 RESP_I2C_PARTIALDATA        = 0x41 # ???
25 RESP_I2C_WRITINGNOSTOP      = 0x45 # ???
26
27 MCP2221_RETRY_MAX           = 50
28 MCP2221_MAX_I2C_DATA_LEN    = 60
29 MASK_ADDR_NACK              = 0x40
30 # pylint: enable=bad-whitespace
31
32 class MCP2221:
33
34     VID = 0x04D8
35     PID = 0x00DD
36
37     GP_GPIO = 0b000
38     GP_DEDICATED = 0b001
39     GP_ALT0 = 0b010
40     GP_ALT1 = 0b011
41     GP_ALT2 = 0b100
42
43     def __init__(self):
44         self._hid = hid.device()
45         self._hid.open(MCP2221.VID, MCP2221.PID)
46         self._reset()
47         time.sleep(0.25)
48
49     def _hid_xfer(self, report, response=True):
50         # first byte is report ID, which =0 for MCP2221
51         # remaing bytes = 64 byte report data
52         # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
53         self._hid.write(b'\0' + report + b'\0'*(64-len(report)))
54         if response:
55             # return is 64 byte response report
56             return self._hid.read(64)
57
58     #----------------------------------------------------------------
59     # MISC
60     #----------------------------------------------------------------
61     def gp_get_mode(self, pin):
62         return self._hid_xfer(b'\x61')[22+pin] & 0x07
63
64     def gp_set_mode(self, pin, mode):
65         # get current settings
66         current = self._hid_xfer(b'\x61')
67         # empty report, this is safe since 0's = no change
68         report = bytearray(b'\x60'+b'\x00'*63)
69         # set the alter GP flag byte
70         report[7] = 0xFF
71         # each pin can be set individually
72         # but all 4 get set at once, so we need to
73         # transpose current settings
74         report[8]  = current[22]  # GP0
75         report[9]  = current[23]  # GP1
76         report[10] = current[24]  # GP2
77         report[11] = current[25]  # GP3
78         # then change only the one
79         report[8+pin] = mode & 0x07
80         # and make it so
81         self._hid_xfer(report)
82
83     def _pretty_report(self, report):
84         print("     0  1  2  3  4  5  6  7  8  9")
85         index = 0
86         for row in range(7):
87             print("{} : ".format(row), end='')
88             for _ in range(10):
89                 print("{:02x} ".format(report[index]), end='')
90                 index += 1
91                 if index > 63:
92                     break
93             print()
94
95     def _status_dump(self):
96         self._pretty_report(self._hid_xfer(b'\x10'))
97
98     def _sram_dump(self):
99         self._pretty_report(self._hid_xfer(b'\x61'))
100
101     def _reset(self):
102         self._hid_xfer(b'\x70\xAB\xCD\xEF', response=False)
103         start = time.monotonic()
104         while time.monotonic() - start < 5:
105             try:
106                 self._hid.open(MCP2221.VID, MCP2221.PID)
107             except OSError:
108                 # try again
109                 time.sleep(0.1)
110                 continue
111             return
112         raise OSError("open failed")
113
114     #----------------------------------------------------------------
115     # GPIO
116     #----------------------------------------------------------------
117     def gpio_set_direction(self, pin, mode):
118         report = bytearray(b'\x50'+b'\x00'*63)  # empty set GPIO report
119         offset = 4 * (pin + 1)
120         report[offset] = 0x01                   # set pin direction
121         report[offset+1] = mode                 # to this
122         self._hid_xfer(report)
123
124     def gpio_set_pin(self, pin, value):
125         report = bytearray(b'\x50'+b'\x00'*63)  # empty set GPIO report
126         offset = 2 + 4 * pin
127         report[offset] = 0x01                   # set pin value
128         report[offset+1] = value                # to this
129         self._hid_xfer(report)
130
131     def gpio_get_pin(self, pin):
132         resp = self._hid_xfer(b'\x51')
133         offset = 2 + 2 * pin
134         if resp[offset] == 0xEE:
135             raise RuntimeError("Pin is not set for GPIO operation.")
136         else:
137             return resp[offset]
138
139     #----------------------------------------------------------------
140     # I2C
141     #----------------------------------------------------------------
142     def _i2c_status(self):
143         resp = self._hid_xfer(b'\x10')
144         if resp[1] != 0:
145             raise RuntimeError("Couldn't get I2C status")
146         return resp
147
148     def _i2c_state(self):
149         return self._i2c_status()[8]
150
151     def _i2c_cancel(self):
152         resp = self._hid_xfer(b'\x10\x00\x10')
153         if resp[1] != 0x00:
154             raise RuntimeError("Couldn't cancel I2C")
155         if resp[2] == 0x10:
156             # bus release will need "a few hundred microseconds"
157             time.sleep(0.001)
158
159     def _i2c_write(self, cmd, address, buffer, start=0, end=None):
160         if self._i2c_state() != 0x00:
161             self._i2c_cancel()
162
163         end = end if end else len(buffer)
164         length = end - start
165         retries = 0
166
167         while (end - start) > 0:
168             chunk = min(end - start, MCP2221_MAX_I2C_DATA_LEN)
169             # write out current chunk
170             resp = self._hid_xfer(bytes([cmd,
171                                          length & 0xFF,
172                                          (length >> 8) & 0xFF,
173                                          address << 1]) +
174                                          buffer[start:(start+chunk)])
175             # check for success
176             if resp[1] != 0x00:
177                 if resp[2] in (RESP_I2C_START_TOUT,
178                                RESP_I2C_WRADDRL_TOUT,
179                                RESP_I2C_WRADDRL_NACK,
180                                RESP_I2C_WRDATA_TOUT,
181                                RESP_I2C_STOP_TOUT):
182                     raise RuntimeError("Unrecoverable I2C state failure")
183                 retries += 1
184                 if retries >= MCP2221_RETRY_MAX:
185                     raise RuntimeError("I2C write error, max retries reached.")
186                 time.sleep(0.001)
187                 continue # try again
188             # yay chunk sent!
189             while self._i2c_state() == RESP_I2C_PARTIALDATA:
190                 time.sleep(0.001)
191             start += chunk
192             retries = 0
193
194         # check status in another loop
195         for _ in range(MCP2221_RETRY_MAX):
196             status = self._i2c_status()
197             if status[20] & MASK_ADDR_NACK:
198                 raise RuntimeError("I2C slave address was NACK'd")
199             usb_cmd_status = status[8]
200             if usb_cmd_status == 0:
201                 break
202             if usb_cmd_status == RESP_I2C_WRITINGNOSTOP and cmd == 0x94:
203                 break   # this is OK too!
204             if usb_cmd_status in (RESP_I2C_START_TOUT,
205                                   RESP_I2C_WRADDRL_TOUT,
206                                   RESP_I2C_WRADDRL_NACK,
207                                   RESP_I2C_WRDATA_TOUT,
208                                   RESP_I2C_STOP_TOUT):
209                 raise RuntimeError("Unrecoverable I2C state failure")
210             time.sleep(0.001)
211         else:
212             raise RuntimeError("I2C write error: max retries reached.")
213         # whew success!
214
215     def _i2c_read(self, cmd, address, buffer, start=0, end=None):
216         if self._i2c_state() not in (RESP_I2C_WRITINGNOSTOP, 0):
217             self._i2c_cancel()
218
219         end = end if end else len(buffer)
220         length = end - start
221
222         # tell it we want to read
223         resp = self._hid_xfer(bytes([cmd,
224                                      length & 0xFF,
225                                      (length >> 8) & 0xFF,
226                                      (address << 1) | 0x01]))
227
228         # check for success
229         if resp[1] != 0x00:
230             raise RuntimeError("Unrecoverable I2C read failure")
231
232         # and now the read part
233         while (end - start) > 0:
234             for retry in range(MCP2221_RETRY_MAX):
235                 # the actual read
236                 resp = self._hid_xfer(b'\x40')
237                 # check for success
238                 if resp[1] == RESP_I2C_PARTIALDATA:
239                     time.sleep(0.001)
240                     continue
241                 if resp[1] != 0x00:
242                     raise RuntimeError("Unrecoverable I2C read failure")
243                 if resp[2] == RESP_ADDR_NACK:
244                     raise RuntimeError("I2C NACK")
245                 if resp[3] == 0x00 and resp[2] == 0x00:
246                     break
247                 if resp[3] == RESP_READ_ERR:
248                     time.sleep(0.001)
249                     continue
250                 if resp[2] in (RESP_READ_COMPL, RESP_READ_PARTIAL):
251                     break
252
253             # move data into buffer
254             chunk = min(end - start, 60)
255             for i, k in enumerate(range(start, start+chunk)):
256                 buffer[k] = resp[4 + i]
257             start += chunk
258
259     def i2c_configure(self, baudrate=100000):
260         self._hid_xfer(bytes([0x10,  # set parameters
261                               0x00,  # don't care
262                               0x00,  # no effect
263                               0x20,  # next byte is clock divider
264                               12000000 // baudrate - 3]))
265
266     def i2c_writeto(self, address, buffer, *, start=0, end=None):
267         self._i2c_write(0x90, address, buffer, start, end)
268
269     def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
270         self._i2c_read(0x91, address, buffer, start, end)
271
272     def i2c_writeto_then_readfrom(self, address, out_buffer, in_buffer, *,
273                                   out_start=0, out_end=None,
274                                   in_start=0, in_end=None):
275         self._i2c_write(0x94, address, out_buffer, out_start, out_end)
276         self._i2c_read(0x93, address, in_buffer, in_start, in_end)
277
278     def i2c_scan(self, *, start=0, end=0x79):
279         found = []
280         for addr in range(start, end+1):
281             # try a write
282             try:
283                 self.i2c_writeto(addr, b'\x00')
284             except RuntimeError: # no reply!
285                 continue
286             # store if success
287             found.append(addr)
288         return found
289
290     #----------------------------------------------------------------
291     # ADC
292     #----------------------------------------------------------------
293     def adc_configure(self, vref=0):
294         report = bytearray(b'\x60'+b'\x00'*63)
295         report[5] = 1 << 7 | (vref & 0b111)
296         self._hid_xfer(report)
297
298     def adc_read(self, pin):
299         resp = self._hid_xfer(b'\x10')
300         return resp[49 + 2 * pin] << 8 | resp[48 + 2 * pin]
301
302     #----------------------------------------------------------------
303     # DAC
304     #----------------------------------------------------------------
305     def dac_configure(self, vref=0):
306         report = bytearray(b'\x60'+b'\x00'*63)
307         report[3] = 1 << 7 | (vref & 0b111)
308         self._hid_xfer(report)
309
310     def dac_write(self, pin, value):
311         report = bytearray(b'\x60'+b'\x00'*63)
312         report[4] = 1 << 7 | (value & 0b11111)
313         self._hid_xfer(report)
314
315 mcp2221 = MCP2221()