]> Repositories - Adafruit_Blinka-hackapet.git/blob - src/adafruit_blinka/microcontroller/mcp2221/mcp2221.py
Bumping requirements for release
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / mcp2221 / mcp2221.py
1 import os
2 import time
3 import hid
4
5 # Here if you need it
6 MCP2221_HID_DELAY = float(os.environ.get('BLINKA_MCP2221_HID_DELAY', 0))
7 # Use to set delay between reset and device reopen
8 MCP2221_RESET_DELAY = float(os.environ.get('BLINKA_MCP2221_RESET_DELAY', 0.5))
9
10 # from the C driver
11 # http://ww1.microchip.com/downloads/en/DeviceDoc/mcp2221_0_1.tar.gz
12 # others (???) determined during driver developement
13 # pylint: disable=bad-whitespace
14 RESP_ERR_NOERR              = 0x00
15 RESP_ADDR_NACK              = 0x25
16 RESP_READ_ERR               = 0x7F
17 RESP_READ_COMPL             = 0x55
18 RESP_READ_PARTIAL           = 0x54 # ???
19 RESP_I2C_IDLE               = 0x00
20 RESP_I2C_START_TOUT         = 0x12
21 RESP_I2C_RSTART_TOUT        = 0x17
22 RESP_I2C_WRADDRL_TOUT       = 0x23
23 RESP_I2C_WRADDRL_WSEND      = 0x21
24 RESP_I2C_WRADDRL_NACK       = 0x25
25 RESP_I2C_WRDATA_TOUT        = 0x44
26 RESP_I2C_RDDATA_TOUT        = 0x52
27 RESP_I2C_STOP_TOUT          = 0x62
28
29 RESP_I2C_MOREDATA           = 0x43 # ???
30 RESP_I2C_PARTIALDATA        = 0x41 # ???
31 RESP_I2C_WRITINGNOSTOP      = 0x45 # ???
32
33 MCP2221_RETRY_MAX           = 50
34 MCP2221_MAX_I2C_DATA_LEN    = 60
35 MASK_ADDR_NACK              = 0x40
36 # pylint: enable=bad-whitespace
37
38 class MCP2221:
39
40     VID = 0x04D8
41     PID = 0x00DD
42
43     GP_GPIO = 0b000
44     GP_DEDICATED = 0b001
45     GP_ALT0 = 0b010
46     GP_ALT1 = 0b011
47     GP_ALT2 = 0b100
48
49     def __init__(self):
50         self._hid = hid.device()
51         self._hid.open(MCP2221.VID, MCP2221.PID)
52         self._reset()
53
54     def _hid_xfer(self, report, response=True):
55         # first byte is report ID, which =0 for MCP2221
56         # remaing bytes = 64 byte report data
57         # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
58         self._hid.write(b'\0' + report + b'\0'*(64-len(report)))
59         time.sleep(MCP2221_HID_DELAY)
60         if response:
61             # return is 64 byte response report
62             return self._hid.read(64)
63
64     #----------------------------------------------------------------
65     # MISC
66     #----------------------------------------------------------------
67     def gp_get_mode(self, pin):
68         return self._hid_xfer(b'\x61')[22+pin] & 0x07
69
70     def gp_set_mode(self, pin, mode):
71         # get current settings
72         current = self._hid_xfer(b'\x61')
73         # empty report, this is safe since 0's = no change
74         report = bytearray(b'\x60'+b'\x00'*63)
75         # set the alter GP flag byte
76         report[7] = 0xFF
77         # each pin can be set individually
78         # but all 4 get set at once, so we need to
79         # transpose current settings
80         report[8]  = current[22]  # GP0
81         report[9]  = current[23]  # GP1
82         report[10] = current[24]  # GP2
83         report[11] = current[25]  # GP3
84         # then change only the one
85         report[8+pin] = mode & 0x07
86         # and make it so
87         self._hid_xfer(report)
88
89     def _pretty_report(self, report):
90         print("     0  1  2  3  4  5  6  7  8  9")
91         index = 0
92         for row in range(7):
93             print("{} : ".format(row), end='')
94             for _ in range(10):
95                 print("{:02x} ".format(report[index]), end='')
96                 index += 1
97                 if index > 63:
98                     break
99             print()
100
101     def _status_dump(self):
102         self._pretty_report(self._hid_xfer(b'\x10'))
103
104     def _sram_dump(self):
105         self._pretty_report(self._hid_xfer(b'\x61'))
106
107     def _reset(self):
108         self._hid_xfer(b'\x70\xAB\xCD\xEF', response=False)
109         time.sleep(MCP2221_RESET_DELAY)
110         start = time.monotonic()
111         while time.monotonic() - start < 5:
112             try:
113                 self._hid.open(MCP2221.VID, MCP2221.PID)
114             except OSError:
115                 # try again
116                 time.sleep(0.1)
117                 continue
118             return
119         raise OSError("open failed")
120
121     #----------------------------------------------------------------
122     # GPIO
123     #----------------------------------------------------------------
124     def gpio_set_direction(self, pin, mode):
125         report = bytearray(b'\x50'+b'\x00'*63)  # empty set GPIO report
126         offset = 4 * (pin + 1)
127         report[offset] = 0x01                   # set pin direction
128         report[offset+1] = mode                 # to this
129         self._hid_xfer(report)
130
131     def gpio_set_pin(self, pin, value):
132         report = bytearray(b'\x50'+b'\x00'*63)  # empty set GPIO report
133         offset = 2 + 4 * pin
134         report[offset] = 0x01                   # set pin value
135         report[offset+1] = value                # to this
136         self._hid_xfer(report)
137
138     def gpio_get_pin(self, pin):
139         resp = self._hid_xfer(b'\x51')
140         offset = 2 + 2 * pin
141         if resp[offset] == 0xEE:
142             raise RuntimeError("Pin is not set for GPIO operation.")
143         else:
144             return resp[offset]
145
146     #----------------------------------------------------------------
147     # I2C
148     #----------------------------------------------------------------
149     def _i2c_status(self):
150         resp = self._hid_xfer(b'\x10')
151         if resp[1] != 0:
152             raise RuntimeError("Couldn't get I2C status")
153         return resp
154
155     def _i2c_state(self):
156         return self._i2c_status()[8]
157
158     def _i2c_cancel(self):
159         resp = self._hid_xfer(b'\x10\x00\x10')
160         if resp[1] != 0x00:
161             raise RuntimeError("Couldn't cancel I2C")
162         if resp[2] == 0x10:
163             # bus release will need "a few hundred microseconds"
164             time.sleep(0.001)
165
166     def _i2c_write(self, cmd, address, buffer, start=0, end=None):
167         if self._i2c_state() != 0x00:
168             self._i2c_cancel()
169
170         end = end if end else len(buffer)
171         length = end - start
172         retries = 0
173
174         while (end - start) > 0:
175             chunk = min(end - start, MCP2221_MAX_I2C_DATA_LEN)
176             # write out current chunk
177             resp = self._hid_xfer(bytes([cmd,
178                                          length & 0xFF,
179                                          (length >> 8) & 0xFF,
180                                          address << 1]) +
181                                          buffer[start:(start+chunk)])
182             # check for success
183             if resp[1] != 0x00:
184                 if resp[2] in (RESP_I2C_START_TOUT,
185                                RESP_I2C_WRADDRL_TOUT,
186                                RESP_I2C_WRADDRL_NACK,
187                                RESP_I2C_WRDATA_TOUT,
188                                RESP_I2C_STOP_TOUT):
189                     raise RuntimeError("Unrecoverable I2C state failure")
190                 retries += 1
191                 if retries >= MCP2221_RETRY_MAX:
192                     raise RuntimeError("I2C write error, max retries reached.")
193                 time.sleep(0.001)
194                 continue # try again
195             # yay chunk sent!
196             while self._i2c_state() == RESP_I2C_PARTIALDATA:
197                 time.sleep(0.001)
198             start += chunk
199             retries = 0
200
201         # check status in another loop
202         for _ in range(MCP2221_RETRY_MAX):
203             status = self._i2c_status()
204             if status[20] & MASK_ADDR_NACK:
205                 raise RuntimeError("I2C slave address was NACK'd")
206             usb_cmd_status = status[8]
207             if usb_cmd_status == 0:
208                 break
209             if usb_cmd_status == RESP_I2C_WRITINGNOSTOP and cmd == 0x94:
210                 break   # this is OK too!
211             if usb_cmd_status in (RESP_I2C_START_TOUT,
212                                   RESP_I2C_WRADDRL_TOUT,
213                                   RESP_I2C_WRADDRL_NACK,
214                                   RESP_I2C_WRDATA_TOUT,
215                                   RESP_I2C_STOP_TOUT):
216                 raise RuntimeError("Unrecoverable I2C state failure")
217             time.sleep(0.001)
218         else:
219             raise RuntimeError("I2C write error: max retries reached.")
220         # whew success!
221
222     def _i2c_read(self, cmd, address, buffer, start=0, end=None):
223         if self._i2c_state() not in (RESP_I2C_WRITINGNOSTOP, 0):
224             self._i2c_cancel()
225
226         end = end if end else len(buffer)
227         length = end - start
228
229         # tell it we want to read
230         resp = self._hid_xfer(bytes([cmd,
231                                      length & 0xFF,
232                                      (length >> 8) & 0xFF,
233                                      (address << 1) | 0x01]))
234
235         # check for success
236         if resp[1] != 0x00:
237             raise RuntimeError("Unrecoverable I2C read failure")
238
239         # and now the read part
240         while (end - start) > 0:
241             for retry in range(MCP2221_RETRY_MAX):
242                 # the actual read
243                 resp = self._hid_xfer(b'\x40')
244                 # check for success
245                 if resp[1] == RESP_I2C_PARTIALDATA:
246                     time.sleep(0.001)
247                     continue
248                 if resp[1] != 0x00:
249                     raise RuntimeError("Unrecoverable I2C read failure")
250                 if resp[2] == RESP_ADDR_NACK:
251                     raise RuntimeError("I2C NACK")
252                 if resp[3] == 0x00 and resp[2] == 0x00:
253                     break
254                 if resp[3] == RESP_READ_ERR:
255                     time.sleep(0.001)
256                     continue
257                 if resp[2] in (RESP_READ_COMPL, RESP_READ_PARTIAL):
258                     break
259
260             # move data into buffer
261             chunk = min(end - start, 60)
262             for i, k in enumerate(range(start, start+chunk)):
263                 buffer[k] = resp[4 + i]
264             start += chunk
265
266     def i2c_configure(self, baudrate=100000):
267         self._hid_xfer(bytes([0x10,  # set parameters
268                               0x00,  # don't care
269                               0x00,  # no effect
270                               0x20,  # next byte is clock divider
271                               12000000 // baudrate - 3]))
272
273     def i2c_writeto(self, address, buffer, *, start=0, end=None):
274         self._i2c_write(0x90, address, buffer, start, end)
275
276     def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
277         self._i2c_read(0x91, address, buffer, start, end)
278
279     def i2c_writeto_then_readfrom(self, address, out_buffer, in_buffer, *,
280                                   out_start=0, out_end=None,
281                                   in_start=0, in_end=None):
282         self._i2c_write(0x94, address, out_buffer, out_start, out_end)
283         self._i2c_read(0x93, address, in_buffer, in_start, in_end)
284
285     def i2c_scan(self, *, start=0, end=0x79):
286         found = []
287         for addr in range(start, end+1):
288             # try a write
289             try:
290                 self.i2c_writeto(addr, b'\x00')
291             except RuntimeError: # no reply!
292                 continue
293             # store if success
294             found.append(addr)
295         return found
296
297     #----------------------------------------------------------------
298     # ADC
299     #----------------------------------------------------------------
300     def adc_configure(self, vref=0):
301         report = bytearray(b'\x60'+b'\x00'*63)
302         report[5] = 1 << 7 | (vref & 0b111)
303         self._hid_xfer(report)
304
305     def adc_read(self, pin):
306         resp = self._hid_xfer(b'\x10')
307         return resp[49 + 2 * pin] << 8 | resp[48 + 2 * pin]
308
309     #----------------------------------------------------------------
310     # DAC
311     #----------------------------------------------------------------
312     def dac_configure(self, vref=0):
313         report = bytearray(b'\x60'+b'\x00'*63)
314         report[3] = 1 << 7 | (vref & 0b111)
315         self._hid_xfer(report)
316
317     def dac_write(self, pin, value):
318         report = bytearray(b'\x60'+b'\x00'*63)
319         report[4] = 1 << 7 | (value & 0b11111)
320         self._hid_xfer(report)
321
322 mcp2221 = MCP2221()