]> Repositories - hackapet/Adafruit_Blinka.git/blob - src/adafruit_blinka/microcontroller/mcp2221/mcp2221.py
Massive pylinting session and added Github Actions
[hackapet/Adafruit_Blinka.git] / src / adafruit_blinka / microcontroller / mcp2221 / mcp2221.py
1 """Chip Definition for MCP2221"""
2
3 import os
4 import time
5 import hid
6
7 # Here if you need it
8 MCP2221_HID_DELAY = float(os.environ.get("BLINKA_MCP2221_HID_DELAY", 0))
9 # Use to set delay between reset and device reopen
10 MCP2221_RESET_DELAY = float(os.environ.get("BLINKA_MCP2221_RESET_DELAY", 0.5))
11
12 # from the C driver
13 # http://ww1.microchip.com/downloads/en/DeviceDoc/mcp2221_0_1.tar.gz
14 # others (???) determined during driver developement
15 # pylint: disable=bad-whitespace
16 RESP_ERR_NOERR = 0x00
17 RESP_ADDR_NACK = 0x25
18 RESP_READ_ERR = 0x7F
19 RESP_READ_COMPL = 0x55
20 RESP_READ_PARTIAL = 0x54  # ???
21 RESP_I2C_IDLE = 0x00
22 RESP_I2C_START_TOUT = 0x12
23 RESP_I2C_RSTART_TOUT = 0x17
24 RESP_I2C_WRADDRL_TOUT = 0x23
25 RESP_I2C_WRADDRL_WSEND = 0x21
26 RESP_I2C_WRADDRL_NACK = 0x25
27 RESP_I2C_WRDATA_TOUT = 0x44
28 RESP_I2C_RDDATA_TOUT = 0x52
29 RESP_I2C_STOP_TOUT = 0x62
30
31 RESP_I2C_MOREDATA = 0x43  # ???
32 RESP_I2C_PARTIALDATA = 0x41  # ???
33 RESP_I2C_WRITINGNOSTOP = 0x45  # ???
34
35 MCP2221_RETRY_MAX = 50
36 MCP2221_MAX_I2C_DATA_LEN = 60
37 MASK_ADDR_NACK = 0x40
38 # pylint: enable=bad-whitespace
39
40
41 class MCP2221:
42     """MCP2221 Device Class Definition"""
43
44     VID = 0x04D8
45     PID = 0x00DD
46
47     GP_GPIO = 0b000
48     GP_DEDICATED = 0b001
49     GP_ALT0 = 0b010
50     GP_ALT1 = 0b011
51     GP_ALT2 = 0b100
52
53     def __init__(self):
54         self._hid = hid.device()
55         self._hid.open(MCP2221.VID, MCP2221.PID)
56         self._reset()
57
58     def _hid_xfer(self, report, response=True):
59         """Perform HID Transfer"""
60         # first byte is report ID, which =0 for MCP2221
61         # remaing bytes = 64 byte report data
62         # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
63         self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
64         time.sleep(MCP2221_HID_DELAY)
65         if response:
66             # return is 64 byte response report
67             return self._hid.read(64)
68         return None
69
70     # ----------------------------------------------------------------
71     # MISC
72     # ----------------------------------------------------------------
73     def gp_get_mode(self, pin):
74         """Get Current Pin Mode"""
75         return self._hid_xfer(b"\x61")[22 + pin] & 0x07
76
77     def gp_set_mode(self, pin, mode):
78         """Set Current Pin Mode"""
79         # get current settings
80         current = self._hid_xfer(b"\x61")
81         # empty report, this is safe since 0's = no change
82         report = bytearray(b"\x60" + b"\x00" * 63)
83         # set the alter GP flag byte
84         report[7] = 0xFF
85         # each pin can be set individually
86         # but all 4 get set at once, so we need to
87         # transpose current settings
88         report[8] = current[22]  # GP0
89         report[9] = current[23]  # GP1
90         report[10] = current[24]  # GP2
91         report[11] = current[25]  # GP3
92         # then change only the one
93         report[8 + pin] = mode & 0x07
94         # and make it so
95         self._hid_xfer(report)
96
97     def _pretty_report(self, register):
98         report = self._hid_xfer(register)
99         print("     0  1  2  3  4  5  6  7  8  9")
100         index = 0
101         for row in range(7):
102             print("{} : ".format(row), end="")
103             for _ in range(10):
104                 print("{:02x} ".format(report[index]), end="")
105                 index += 1
106                 if index > 63:
107                     break
108             print()
109
110     def _status_dump(self):
111         self._pretty_report(b"\x10")
112
113     def _sram_dump(self):
114         self._pretty_report(b"\x61")
115
116     def _reset(self):
117         self._hid_xfer(b"\x70\xAB\xCD\xEF", response=False)
118         time.sleep(MCP2221_RESET_DELAY)
119         start = time.monotonic()
120         while time.monotonic() - start < 5:
121             try:
122                 self._hid.open(MCP2221.VID, MCP2221.PID)
123             except OSError:
124                 # try again
125                 time.sleep(0.1)
126                 continue
127             return
128         raise OSError("open failed")
129
130     # ----------------------------------------------------------------
131     # GPIO
132     # ----------------------------------------------------------------
133     def gpio_set_direction(self, pin, mode):
134         """Set Current GPIO Pin Direction"""
135         report = bytearray(b"\x50" + b"\x00" * 63)  # empty set GPIO report
136         offset = 4 * (pin + 1)
137         report[offset] = 0x01  # set pin direction
138         report[offset + 1] = mode  # to this
139         self._hid_xfer(report)
140
141     def gpio_set_pin(self, pin, value):
142         """Set Current GPIO Pin Value"""
143         report = bytearray(b"\x50" + b"\x00" * 63)  # empty set GPIO report
144         offset = 2 + 4 * pin
145         report[offset] = 0x01  # set pin value
146         report[offset + 1] = value  # to this
147         self._hid_xfer(report)
148
149     def gpio_get_pin(self, pin):
150         """Get Current GPIO Pin Value"""
151         resp = self._hid_xfer(b"\x51")
152         offset = 2 + 2 * pin
153         if resp[offset] == 0xEE:
154             raise RuntimeError("Pin is not set for GPIO operation.")
155         return resp[offset]
156
157     # ----------------------------------------------------------------
158     # I2C
159     # ----------------------------------------------------------------
160     def _i2c_status(self):
161         resp = self._hid_xfer(b"\x10")
162         if resp[1] != 0:
163             raise RuntimeError("Couldn't get I2C status")
164         return resp
165
166     def _i2c_state(self):
167         return self._i2c_status()[8]
168
169     def _i2c_cancel(self):
170         resp = self._hid_xfer(b"\x10\x00\x10")
171         if resp[1] != 0x00:
172             raise RuntimeError("Couldn't cancel I2C")
173         if resp[2] == 0x10:
174             # bus release will need "a few hundred microseconds"
175             time.sleep(0.001)
176
177     # pylint: disable=too-many-arguments
178     def _i2c_write(self, cmd, address, buffer, start=0, end=None):
179         if self._i2c_state() != 0x00:
180             self._i2c_cancel()
181
182         end = end if end else len(buffer)
183         length = end - start
184         retries = 0
185
186         while (end - start) > 0:
187             chunk = min(end - start, MCP2221_MAX_I2C_DATA_LEN)
188             # write out current chunk
189             resp = self._hid_xfer(
190                 bytes([cmd, length & 0xFF, (length >> 8) & 0xFF, address << 1])
191                 + buffer[start : (start + chunk)]
192             )
193             # check for success
194             if resp[1] != 0x00:
195                 if resp[2] in (
196                     RESP_I2C_START_TOUT,
197                     RESP_I2C_WRADDRL_TOUT,
198                     RESP_I2C_WRADDRL_NACK,
199                     RESP_I2C_WRDATA_TOUT,
200                     RESP_I2C_STOP_TOUT,
201                 ):
202                     raise RuntimeError("Unrecoverable I2C state failure")
203                 retries += 1
204                 if retries >= MCP2221_RETRY_MAX:
205                     raise RuntimeError("I2C write error, max retries reached.")
206                 time.sleep(0.001)
207                 continue  # try again
208             # yay chunk sent!
209             while self._i2c_state() == RESP_I2C_PARTIALDATA:
210                 time.sleep(0.001)
211             start += chunk
212             retries = 0
213
214         # check status in another loop
215         for _ in range(MCP2221_RETRY_MAX):
216             status = self._i2c_status()
217             if status[20] & MASK_ADDR_NACK:
218                 raise RuntimeError("I2C slave address was NACK'd")
219             usb_cmd_status = status[8]
220             if usb_cmd_status == 0:
221                 break
222             if usb_cmd_status == RESP_I2C_WRITINGNOSTOP and cmd == 0x94:
223                 break  # this is OK too!
224             if usb_cmd_status in (
225                 RESP_I2C_START_TOUT,
226                 RESP_I2C_WRADDRL_TOUT,
227                 RESP_I2C_WRADDRL_NACK,
228                 RESP_I2C_WRDATA_TOUT,
229                 RESP_I2C_STOP_TOUT,
230             ):
231                 raise RuntimeError("Unrecoverable I2C state failure")
232             time.sleep(0.001)
233         else:
234             raise RuntimeError("I2C write error: max retries reached.")
235         # whew success!
236
237     def _i2c_read(self, cmd, address, buffer, start=0, end=None):
238         if self._i2c_state() not in (RESP_I2C_WRITINGNOSTOP, 0):
239             self._i2c_cancel()
240
241         end = end if end else len(buffer)
242         length = end - start
243
244         # tell it we want to read
245         resp = self._hid_xfer(
246             bytes([cmd, length & 0xFF, (length >> 8) & 0xFF, (address << 1) | 0x01])
247         )
248
249         # check for success
250         if resp[1] != 0x00:
251             raise RuntimeError("Unrecoverable I2C read failure")
252
253         # and now the read part
254         while (end - start) > 0:
255             for retry in range(MCP2221_RETRY_MAX):
256                 # the actual read
257                 resp = self._hid_xfer(b"\x40")
258                 # check for success
259                 if resp[1] == RESP_I2C_PARTIALDATA:
260                     time.sleep(0.001)
261                     continue
262                 if resp[1] != 0x00:
263                     raise RuntimeError("Unrecoverable I2C read failure")
264                 if resp[2] == RESP_ADDR_NACK:
265                     raise RuntimeError("I2C NACK")
266                 if resp[3] == 0x00 and resp[2] == 0x00:
267                     break
268                 if resp[3] == RESP_READ_ERR:
269                     time.sleep(0.001)
270                     continue
271                 if resp[2] in (RESP_READ_COMPL, RESP_READ_PARTIAL):
272                     break
273
274             # move data into buffer
275             chunk = min(end - start, 60)
276             for i, k in enumerate(range(start, start + chunk)):
277                 buffer[k] = resp[4 + i]
278             start += chunk
279
280     # pylint: enable=too-many-arguments
281
282     def i2c_configure(self, baudrate=100000):
283         """Configure I2C"""
284         self._hid_xfer(
285             bytes(
286                 [
287                     0x10,  # set parameters
288                     0x00,  # don't care
289                     0x00,  # no effect
290                     0x20,  # next byte is clock divider
291                     12000000 // baudrate - 3,
292                 ]
293             )
294         )
295
296     def i2c_writeto(self, address, buffer, *, start=0, end=None):
297         """Write data from the buffer to an address"""
298         self._i2c_write(0x90, address, buffer, start, end)
299
300     def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
301         """Read data from an address and into the buffer"""
302         self._i2c_read(0x91, address, buffer, start, end)
303
304     def i2c_writeto_then_readfrom(
305         self,
306         address,
307         out_buffer,
308         in_buffer,
309         *,
310         out_start=0,
311         out_end=None,
312         in_start=0,
313         in_end=None
314     ):
315         """Write data from buffer_out to an address and then
316         read data from an address and into buffer_in
317         """
318         self._i2c_write(0x94, address, out_buffer, out_start, out_end)
319         self._i2c_read(0x93, address, in_buffer, in_start, in_end)
320
321     def i2c_scan(self, *, start=0, end=0x79):
322         """Perform an I2C Device Scan"""
323         found = []
324         for addr in range(start, end + 1):
325             # try a write
326             try:
327                 self.i2c_writeto(addr, b"\x00")
328             except RuntimeError:  # no reply!
329                 continue
330             # store if success
331             found.append(addr)
332         return found
333
334     # ----------------------------------------------------------------
335     # ADC
336     # ----------------------------------------------------------------
337     def adc_configure(self, vref=0):
338         """Configure the Analog-to-Digital Converter"""
339         report = bytearray(b"\x60" + b"\x00" * 63)
340         report[5] = 1 << 7 | (vref & 0b111)
341         self._hid_xfer(report)
342
343     def adc_read(self, pin):
344         """Read from the Analog-to-Digital Converter"""
345         resp = self._hid_xfer(b"\x10")
346         return resp[49 + 2 * pin] << 8 | resp[48 + 2 * pin]
347
348     # ----------------------------------------------------------------
349     # DAC
350     # ----------------------------------------------------------------
351     def dac_configure(self, vref=0):
352         """Configure the Digital-to-Analog Converter"""
353         report = bytearray(b"\x60" + b"\x00" * 63)
354         report[3] = 1 << 7 | (vref & 0b111)
355         self._hid_xfer(report)
356
357     # pylint: disable=unused-argument
358     def dac_write(self, pin, value):
359         """Write to the Digital-to-Analog Converter"""
360         report = bytearray(b"\x60" + b"\x00" * 63)
361         report[4] = 1 << 7 | (value & 0b11111)
362         self._hid_xfer(report)
363
364     # pylint: enable=unused-argument
365
366
367 mcp2221 = MCP2221()