]> Repositories - Adafruit_Blinka-hackapet.git/blob - src/adafruit_blinka/microcontroller/mcp2221/mcp2221.py
Merge pull request #243 from caternuson/win_hid_fix
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / mcp2221 / mcp2221.py
1 import os
2 import time
3 import hid
4
5 # Small values seem to help on some Windows setups
6 MCP2221_HID_DELAY = float(os.environ.get('BLINKA_MCP2221_HID_DELAY', 0))
7
8 # from the C driver
9 # http://ww1.microchip.com/downloads/en/DeviceDoc/mcp2221_0_1.tar.gz
10 # others (???) determined during driver developement
11 # pylint: disable=bad-whitespace
12 RESP_ERR_NOERR              = 0x00
13 RESP_ADDR_NACK              = 0x25
14 RESP_READ_ERR               = 0x7F
15 RESP_READ_COMPL             = 0x55
16 RESP_READ_PARTIAL           = 0x54 # ???
17 RESP_I2C_IDLE               = 0x00
18 RESP_I2C_START_TOUT         = 0x12
19 RESP_I2C_RSTART_TOUT        = 0x17
20 RESP_I2C_WRADDRL_TOUT       = 0x23
21 RESP_I2C_WRADDRL_WSEND      = 0x21
22 RESP_I2C_WRADDRL_NACK       = 0x25
23 RESP_I2C_WRDATA_TOUT        = 0x44
24 RESP_I2C_RDDATA_TOUT        = 0x52
25 RESP_I2C_STOP_TOUT          = 0x62
26
27 RESP_I2C_MOREDATA           = 0x43 # ???
28 RESP_I2C_PARTIALDATA        = 0x41 # ???
29 RESP_I2C_WRITINGNOSTOP      = 0x45 # ???
30
31 MCP2221_RETRY_MAX           = 50
32 MCP2221_MAX_I2C_DATA_LEN    = 60
33 MASK_ADDR_NACK              = 0x40
34 # pylint: enable=bad-whitespace
35
36 class MCP2221:
37
38     VID = 0x04D8
39     PID = 0x00DD
40
41     GP_GPIO = 0b000
42     GP_DEDICATED = 0b001
43     GP_ALT0 = 0b010
44     GP_ALT1 = 0b011
45     GP_ALT2 = 0b100
46
47     def __init__(self):
48         self._hid = hid.device()
49         self._hid.open(MCP2221.VID, MCP2221.PID)
50         self._reset()
51         time.sleep(0.25)
52
53     def _hid_xfer(self, report, response=True):
54         # first byte is report ID, which =0 for MCP2221
55         # remaing bytes = 64 byte report data
56         # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
57         self._hid.write(b'\0' + report + b'\0'*(64-len(report)))
58         time.sleep(MCP2221_HID_DELAY)
59         if response:
60             # return is 64 byte response report
61             return self._hid.read(64)
62
63     #----------------------------------------------------------------
64     # MISC
65     #----------------------------------------------------------------
66     def gp_get_mode(self, pin):
67         return self._hid_xfer(b'\x61')[22+pin] & 0x07
68
69     def gp_set_mode(self, pin, mode):
70         # get current settings
71         current = self._hid_xfer(b'\x61')
72         # empty report, this is safe since 0's = no change
73         report = bytearray(b'\x60'+b'\x00'*63)
74         # set the alter GP flag byte
75         report[7] = 0xFF
76         # each pin can be set individually
77         # but all 4 get set at once, so we need to
78         # transpose current settings
79         report[8]  = current[22]  # GP0
80         report[9]  = current[23]  # GP1
81         report[10] = current[24]  # GP2
82         report[11] = current[25]  # GP3
83         # then change only the one
84         report[8+pin] = mode & 0x07
85         # and make it so
86         self._hid_xfer(report)
87
88     def _pretty_report(self, report):
89         print("     0  1  2  3  4  5  6  7  8  9")
90         index = 0
91         for row in range(7):
92             print("{} : ".format(row), end='')
93             for _ in range(10):
94                 print("{:02x} ".format(report[index]), end='')
95                 index += 1
96                 if index > 63:
97                     break
98             print()
99
100     def _status_dump(self):
101         self._pretty_report(self._hid_xfer(b'\x10'))
102
103     def _sram_dump(self):
104         self._pretty_report(self._hid_xfer(b'\x61'))
105
106     def _reset(self):
107         self._hid_xfer(b'\x70\xAB\xCD\xEF', response=False)
108         start = time.monotonic()
109         while time.monotonic() - start < 5:
110             try:
111                 self._hid.open(MCP2221.VID, MCP2221.PID)
112             except OSError:
113                 # try again
114                 time.sleep(0.1)
115                 continue
116             return
117         raise OSError("open failed")
118
119     #----------------------------------------------------------------
120     # GPIO
121     #----------------------------------------------------------------
122     def gpio_set_direction(self, pin, mode):
123         report = bytearray(b'\x50'+b'\x00'*63)  # empty set GPIO report
124         offset = 4 * (pin + 1)
125         report[offset] = 0x01                   # set pin direction
126         report[offset+1] = mode                 # to this
127         self._hid_xfer(report)
128
129     def gpio_set_pin(self, pin, value):
130         report = bytearray(b'\x50'+b'\x00'*63)  # empty set GPIO report
131         offset = 2 + 4 * pin
132         report[offset] = 0x01                   # set pin value
133         report[offset+1] = value                # to this
134         self._hid_xfer(report)
135
136     def gpio_get_pin(self, pin):
137         resp = self._hid_xfer(b'\x51')
138         offset = 2 + 2 * pin
139         if resp[offset] == 0xEE:
140             raise RuntimeError("Pin is not set for GPIO operation.")
141         else:
142             return resp[offset]
143
144     #----------------------------------------------------------------
145     # I2C
146     #----------------------------------------------------------------
147     def _i2c_status(self):
148         resp = self._hid_xfer(b'\x10')
149         if resp[1] != 0:
150             raise RuntimeError("Couldn't get I2C status")
151         return resp
152
153     def _i2c_state(self):
154         return self._i2c_status()[8]
155
156     def _i2c_cancel(self):
157         resp = self._hid_xfer(b'\x10\x00\x10')
158         if resp[1] != 0x00:
159             raise RuntimeError("Couldn't cancel I2C")
160         if resp[2] == 0x10:
161             # bus release will need "a few hundred microseconds"
162             time.sleep(0.001)
163
164     def _i2c_write(self, cmd, address, buffer, start=0, end=None):
165         if self._i2c_state() != 0x00:
166             self._i2c_cancel()
167
168         end = end if end else len(buffer)
169         length = end - start
170         retries = 0
171
172         while (end - start) > 0:
173             chunk = min(end - start, MCP2221_MAX_I2C_DATA_LEN)
174             # write out current chunk
175             resp = self._hid_xfer(bytes([cmd,
176                                          length & 0xFF,
177                                          (length >> 8) & 0xFF,
178                                          address << 1]) +
179                                          buffer[start:(start+chunk)])
180             # check for success
181             if resp[1] != 0x00:
182                 if resp[2] in (RESP_I2C_START_TOUT,
183                                RESP_I2C_WRADDRL_TOUT,
184                                RESP_I2C_WRADDRL_NACK,
185                                RESP_I2C_WRDATA_TOUT,
186                                RESP_I2C_STOP_TOUT):
187                     raise RuntimeError("Unrecoverable I2C state failure")
188                 retries += 1
189                 if retries >= MCP2221_RETRY_MAX:
190                     raise RuntimeError("I2C write error, max retries reached.")
191                 time.sleep(0.001)
192                 continue # try again
193             # yay chunk sent!
194             while self._i2c_state() == RESP_I2C_PARTIALDATA:
195                 time.sleep(0.001)
196             start += chunk
197             retries = 0
198
199         # check status in another loop
200         for _ in range(MCP2221_RETRY_MAX):
201             status = self._i2c_status()
202             if status[20] & MASK_ADDR_NACK:
203                 raise RuntimeError("I2C slave address was NACK'd")
204             usb_cmd_status = status[8]
205             if usb_cmd_status == 0:
206                 break
207             if usb_cmd_status == RESP_I2C_WRITINGNOSTOP and cmd == 0x94:
208                 break   # this is OK too!
209             if usb_cmd_status in (RESP_I2C_START_TOUT,
210                                   RESP_I2C_WRADDRL_TOUT,
211                                   RESP_I2C_WRADDRL_NACK,
212                                   RESP_I2C_WRDATA_TOUT,
213                                   RESP_I2C_STOP_TOUT):
214                 raise RuntimeError("Unrecoverable I2C state failure")
215             time.sleep(0.001)
216         else:
217             raise RuntimeError("I2C write error: max retries reached.")
218         # whew success!
219
220     def _i2c_read(self, cmd, address, buffer, start=0, end=None):
221         if self._i2c_state() not in (RESP_I2C_WRITINGNOSTOP, 0):
222             self._i2c_cancel()
223
224         end = end if end else len(buffer)
225         length = end - start
226
227         # tell it we want to read
228         resp = self._hid_xfer(bytes([cmd,
229                                      length & 0xFF,
230                                      (length >> 8) & 0xFF,
231                                      (address << 1) | 0x01]))
232
233         # check for success
234         if resp[1] != 0x00:
235             raise RuntimeError("Unrecoverable I2C read failure")
236
237         # and now the read part
238         while (end - start) > 0:
239             for retry in range(MCP2221_RETRY_MAX):
240                 # the actual read
241                 resp = self._hid_xfer(b'\x40')
242                 # check for success
243                 if resp[1] == RESP_I2C_PARTIALDATA:
244                     time.sleep(0.001)
245                     continue
246                 if resp[1] != 0x00:
247                     raise RuntimeError("Unrecoverable I2C read failure")
248                 if resp[2] == RESP_ADDR_NACK:
249                     raise RuntimeError("I2C NACK")
250                 if resp[3] == 0x00 and resp[2] == 0x00:
251                     break
252                 if resp[3] == RESP_READ_ERR:
253                     time.sleep(0.001)
254                     continue
255                 if resp[2] in (RESP_READ_COMPL, RESP_READ_PARTIAL):
256                     break
257
258             # move data into buffer
259             chunk = min(end - start, 60)
260             for i, k in enumerate(range(start, start+chunk)):
261                 buffer[k] = resp[4 + i]
262             start += chunk
263
264     def i2c_configure(self, baudrate=100000):
265         self._hid_xfer(bytes([0x10,  # set parameters
266                               0x00,  # don't care
267                               0x00,  # no effect
268                               0x20,  # next byte is clock divider
269                               12000000 // baudrate - 3]))
270
271     def i2c_writeto(self, address, buffer, *, start=0, end=None):
272         self._i2c_write(0x90, address, buffer, start, end)
273
274     def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
275         self._i2c_read(0x91, address, buffer, start, end)
276
277     def i2c_writeto_then_readfrom(self, address, out_buffer, in_buffer, *,
278                                   out_start=0, out_end=None,
279                                   in_start=0, in_end=None):
280         self._i2c_write(0x94, address, out_buffer, out_start, out_end)
281         self._i2c_read(0x93, address, in_buffer, in_start, in_end)
282
283     def i2c_scan(self, *, start=0, end=0x79):
284         found = []
285         for addr in range(start, end+1):
286             # try a write
287             try:
288                 self.i2c_writeto(addr, b'\x00')
289             except RuntimeError: # no reply!
290                 continue
291             # store if success
292             found.append(addr)
293         return found
294
295     #----------------------------------------------------------------
296     # ADC
297     #----------------------------------------------------------------
298     def adc_configure(self, vref=0):
299         report = bytearray(b'\x60'+b'\x00'*63)
300         report[5] = 1 << 7 | (vref & 0b111)
301         self._hid_xfer(report)
302
303     def adc_read(self, pin):
304         resp = self._hid_xfer(b'\x10')
305         return resp[49 + 2 * pin] << 8 | resp[48 + 2 * pin]
306
307     #----------------------------------------------------------------
308     # DAC
309     #----------------------------------------------------------------
310     def dac_configure(self, vref=0):
311         report = bytearray(b'\x60'+b'\x00'*63)
312         report[3] = 1 << 7 | (vref & 0b111)
313         self._hid_xfer(report)
314
315     def dac_write(self, pin, value):
316         report = bytearray(b'\x60'+b'\x00'*63)
317         report[4] = 1 << 7 | (value & 0b11111)
318         self._hid_xfer(report)
319
320 mcp2221 = MCP2221()