]> Repositories - Adafruit_Blinka-hackapet.git/blob - src/adafruit_blinka/microcontroller/mcp2221/mcp2221.py
Clarify docstring and add missing newline.
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / mcp2221 / mcp2221.py
1 """Chip Definition for MCP2221"""
2
3 import os
4 import time
5 import atexit
6
7 import hid
8
9 # Here if you need it
10 MCP2221_HID_DELAY = float(os.environ.get("BLINKA_MCP2221_HID_DELAY", 0))
11 # Use to set delay between reset and device reopen. if negative, don't reset at all
12 MCP2221_RESET_DELAY = float(os.environ.get("BLINKA_MCP2221_RESET_DELAY", 0.5))
13
14 # from the C driver
15 # http://ww1.microchip.com/downloads/en/DeviceDoc/mcp2221_0_1.tar.gz
16 # others (???) determined during driver developement
17 RESP_ERR_NOERR = 0x00
18 RESP_ADDR_NACK = 0x25
19 RESP_READ_ERR = 0x7F
20 RESP_READ_COMPL = 0x55
21 RESP_READ_PARTIAL = 0x54  # ???
22 RESP_I2C_IDLE = 0x00
23 RESP_I2C_START_TOUT = 0x12
24 RESP_I2C_RSTART_TOUT = 0x17
25 RESP_I2C_WRADDRL_TOUT = 0x23
26 RESP_I2C_WRADDRL_WSEND = 0x21
27 RESP_I2C_WRADDRL_NACK = 0x25
28 RESP_I2C_WRDATA_TOUT = 0x44
29 RESP_I2C_RDDATA_TOUT = 0x52
30 RESP_I2C_STOP_TOUT = 0x62
31
32 RESP_I2C_MOREDATA = 0x43  # ???
33 RESP_I2C_PARTIALDATA = 0x41  # ???
34 RESP_I2C_WRITINGNOSTOP = 0x45  # ???
35
36 MCP2221_RETRY_MAX = 50
37 MCP2221_MAX_I2C_DATA_LEN = 60
38 MASK_ADDR_NACK = 0x40
39
40
41 class MCP2221:
42     """MCP2221 Device Class Definition"""
43
44     VID = 0x04D8
45     PID = 0x00DD
46
47     GP_GPIO = 0b000
48     GP_DEDICATED = 0b001
49     GP_ALT0 = 0b010
50     GP_ALT1 = 0b011
51     GP_ALT2 = 0b100
52
53     def __init__(self):
54         self._hid = hid.device()
55         self._hid.open(MCP2221.VID, MCP2221.PID)
56         # make sure the device gets closed before exit
57         atexit.register(self.close)
58         if MCP2221_RESET_DELAY >= 0:
59             self._reset()
60         self._gp_config = [0x07] * 4  # "don't care" initial value
61         for pin in range(4):
62             self.gp_set_mode(pin, self.GP_GPIO)  # set to GPIO mode
63             self.gpio_set_direction(pin, 1)  # set to INPUT
64
65     def close(self):
66         """Close the hid device. Does nothing if the device is not open."""
67         self._hid.close()
68
69     def __del__(self):
70         # try to close the device before destroying the instance
71         self.close()
72
73     def _hid_xfer(self, report, response=True):
74         """Perform HID Transfer"""
75         # first byte is report ID, which =0 for MCP2221
76         # remaing bytes = 64 byte report data
77         # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
78         self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
79         time.sleep(MCP2221_HID_DELAY)
80         if response:
81             # return is 64 byte response report
82             return self._hid.read(64)
83         return None
84
85     # ----------------------------------------------------------------
86     # MISC
87     # ----------------------------------------------------------------
88     def gp_get_mode(self, pin):
89         """Get Current Pin Mode"""
90         return self._hid_xfer(b"\x61")[22 + pin] & 0x07
91
92     def gp_set_mode(self, pin, mode):
93         """Set Current Pin Mode"""
94         # already set to that mode?
95         mode &= 0x07
96         if mode == (self._gp_config[pin] & 0x07):
97             return
98         # update GP mode for pin
99         self._gp_config[pin] = mode
100         # empty report, this is safe since 0's = no change
101         report = bytearray(b"\x60" + b"\x00" * 63)
102         # set the alter GP flag byte
103         report[7] = 0xFF
104         # add GP setttings
105         report[8] = self._gp_config[0]
106         report[9] = self._gp_config[1]
107         report[10] = self._gp_config[2]
108         report[11] = self._gp_config[3]
109         # and make it so
110         self._hid_xfer(report)
111
112     def _pretty_report(self, register):
113         report = self._hid_xfer(register)
114         print("     0  1  2  3  4  5  6  7  8  9")
115         index = 0
116         for row in range(7):
117             print("{} : ".format(row), end="")
118             for _ in range(10):
119                 print("{:02x} ".format(report[index]), end="")
120                 index += 1
121                 if index > 63:
122                     break
123             print()
124
125     def _status_dump(self):
126         self._pretty_report(b"\x10")
127
128     def _sram_dump(self):
129         self._pretty_report(b"\x61")
130
131     def _reset(self):
132         self._hid_xfer(b"\x70\xAB\xCD\xEF", response=False)
133         time.sleep(MCP2221_RESET_DELAY)
134         start = time.monotonic()
135         while time.monotonic() - start < 5:
136             try:
137                 self._hid.open(MCP2221.VID, MCP2221.PID)
138             except OSError:
139                 # try again
140                 time.sleep(0.1)
141                 continue
142             return
143         raise OSError("open failed")
144
145     # ----------------------------------------------------------------
146     # GPIO
147     # ----------------------------------------------------------------
148     def gpio_set_direction(self, pin, mode):
149         """Set Current GPIO Pin Direction"""
150         if mode:
151             # set bit 3 for INPUT
152             self._gp_config[pin] |= 1 << 3
153         else:
154             # clear bit 3 for OUTPUT
155             self._gp_config[pin] &= ~(1 << 3)
156         report = bytearray(b"\x50" + b"\x00" * 63)  # empty set GPIO report
157         offset = 4 * (pin + 1)
158         report[offset] = 0x01  # set pin direction
159         report[offset + 1] = mode  # to this
160         self._hid_xfer(report)
161
162     def gpio_set_pin(self, pin, value):
163         """Set Current GPIO Pin Value"""
164         if value:
165             # set bit 4
166             self._gp_config[pin] |= 1 << 4
167         else:
168             # clear bit 4
169             self._gp_config[pin] &= ~(1 << 4)
170         report = bytearray(b"\x50" + b"\x00" * 63)  # empty set GPIO report
171         offset = 2 + 4 * pin
172         report[offset] = 0x01  # set pin value
173         report[offset + 1] = value  # to this
174         self._hid_xfer(report)
175
176     def gpio_get_pin(self, pin):
177         """Get Current GPIO Pin Value"""
178         resp = self._hid_xfer(b"\x51")
179         offset = 2 + 2 * pin
180         if resp[offset] == 0xEE:
181             raise RuntimeError("Pin is not set for GPIO operation.")
182         return resp[offset]
183
184     # ----------------------------------------------------------------
185     # I2C
186     # ----------------------------------------------------------------
187     def _i2c_status(self):
188         resp = self._hid_xfer(b"\x10")
189         if resp[1] != 0:
190             raise RuntimeError("Couldn't get I2C status")
191         return resp
192
193     def _i2c_state(self):
194         return self._i2c_status()[8]
195
196     def _i2c_cancel(self):
197         resp = self._hid_xfer(b"\x10\x00\x10")
198         if resp[1] != 0x00:
199             raise RuntimeError("Couldn't cancel I2C")
200         if resp[2] == 0x10:
201             # bus release will need "a few hundred microseconds"
202             time.sleep(0.001)
203
204     # pylint: disable=too-many-arguments,too-many-branches
205     def _i2c_write(self, cmd, address, buffer, start=0, end=None):
206         if self._i2c_state() != 0x00:
207             self._i2c_cancel()
208
209         end = end if end else len(buffer)
210         length = end - start
211         retries = 0
212
213         while (end - start) > 0 or not buffer:
214             chunk = min(end - start, MCP2221_MAX_I2C_DATA_LEN)
215             # write out current chunk
216             resp = self._hid_xfer(
217                 bytes([cmd, length & 0xFF, (length >> 8) & 0xFF, address << 1])
218                 + buffer[start : (start + chunk)]
219             )
220             # check for success
221             if resp[1] != 0x00:
222                 if resp[2] in (
223                     RESP_I2C_START_TOUT,
224                     RESP_I2C_WRADDRL_TOUT,
225                     RESP_I2C_WRADDRL_NACK,
226                     RESP_I2C_WRDATA_TOUT,
227                     RESP_I2C_STOP_TOUT,
228                 ):
229                     raise RuntimeError("Unrecoverable I2C state failure")
230                 retries += 1
231                 if retries >= MCP2221_RETRY_MAX:
232                     raise RuntimeError("I2C write error, max retries reached.")
233                 time.sleep(0.001)
234                 continue  # try again
235             # yay chunk sent!
236             while self._i2c_state() == RESP_I2C_PARTIALDATA:
237                 time.sleep(0.001)
238             if not buffer:
239                 break
240             start += chunk
241             retries = 0
242
243         # check status in another loop
244         for _ in range(MCP2221_RETRY_MAX):
245             status = self._i2c_status()
246             if status[20] & MASK_ADDR_NACK:
247                 raise RuntimeError("I2C slave address was NACK'd")
248             usb_cmd_status = status[8]
249             if usb_cmd_status == 0:
250                 break
251             if usb_cmd_status == RESP_I2C_WRITINGNOSTOP and cmd == 0x94:
252                 break  # this is OK too!
253             if usb_cmd_status in (
254                 RESP_I2C_START_TOUT,
255                 RESP_I2C_WRADDRL_TOUT,
256                 RESP_I2C_WRADDRL_NACK,
257                 RESP_I2C_WRDATA_TOUT,
258                 RESP_I2C_STOP_TOUT,
259             ):
260                 raise RuntimeError("Unrecoverable I2C state failure")
261             time.sleep(0.001)
262         else:
263             raise RuntimeError("I2C write error: max retries reached.")
264         # whew success!
265
266     def _i2c_read(self, cmd, address, buffer, start=0, end=None):
267         if self._i2c_state() not in (RESP_I2C_WRITINGNOSTOP, 0):
268             self._i2c_cancel()
269
270         end = end if end else len(buffer)
271         length = end - start
272
273         # tell it we want to read
274         resp = self._hid_xfer(
275             bytes([cmd, length & 0xFF, (length >> 8) & 0xFF, (address << 1) | 0x01])
276         )
277
278         # check for success
279         if resp[1] != 0x00:
280             raise RuntimeError("Unrecoverable I2C read failure")
281
282         # and now the read part
283         while (end - start) > 0:
284             for _ in range(MCP2221_RETRY_MAX):
285                 # the actual read
286                 resp = self._hid_xfer(b"\x40")
287                 # check for success
288                 if resp[1] == RESP_I2C_PARTIALDATA:
289                     time.sleep(0.001)
290                     continue
291                 if resp[1] != 0x00:
292                     raise RuntimeError("Unrecoverable I2C read failure")
293                 if resp[2] == RESP_ADDR_NACK:
294                     raise RuntimeError("I2C NACK")
295                 if resp[3] == 0x00 and resp[2] == 0x00:
296                     break
297                 if resp[3] == RESP_READ_ERR:
298                     time.sleep(0.001)
299                     continue
300                 if resp[2] in (RESP_READ_COMPL, RESP_READ_PARTIAL):
301                     break
302             else:
303                 raise RuntimeError("I2C read error: max retries reached.")
304
305             # move data into buffer
306             chunk = min(end - start, 60)
307             for i, k in enumerate(range(start, start + chunk)):
308                 buffer[k] = resp[4 + i]
309             start += chunk
310
311     # pylint: enable=too-many-arguments
312
313     def _i2c_configure(self, baudrate=100000):
314         """Configure I2C"""
315         self._hid_xfer(
316             bytes(
317                 [
318                     0x10,  # set parameters
319                     0x00,  # don't care
320                     0x00,  # no effect
321                     0x20,  # next byte is clock divider
322                     12000000 // baudrate - 3,
323                 ]
324             )
325         )
326
327     def i2c_writeto(self, address, buffer, *, start=0, end=None):
328         """Write data from the buffer to an address"""
329         self._i2c_write(0x90, address, buffer, start, end)
330
331     def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
332         """Read data from an address and into the buffer"""
333         self._i2c_read(0x91, address, buffer, start, end)
334
335     def i2c_writeto_then_readfrom(
336         self,
337         address,
338         out_buffer,
339         in_buffer,
340         *,
341         out_start=0,
342         out_end=None,
343         in_start=0,
344         in_end=None,
345     ):
346         """Write data from buffer_out to an address and then
347         read data from an address and into buffer_in
348         """
349         self._i2c_write(0x94, address, out_buffer, out_start, out_end)
350         self._i2c_read(0x93, address, in_buffer, in_start, in_end)
351
352     def i2c_scan(self, *, start=0, end=0x79):
353         """Perform an I2C Device Scan"""
354         found = []
355         for addr in range(start, end + 1):
356             # try a write
357             try:
358                 self.i2c_writeto(addr, b"\x00")
359             except RuntimeError:  # no reply!
360                 continue
361             # store if success
362             found.append(addr)
363         return found
364
365     # ----------------------------------------------------------------
366     # ADC
367     # ----------------------------------------------------------------
368     def adc_configure(self, vref=0):
369         """Configure the Analog-to-Digital Converter"""
370         report = bytearray(b"\x60" + b"\x00" * 63)
371         report[5] = 1 << 7 | (vref & 0b111)
372         self._hid_xfer(report)
373
374     def adc_read(self, pin):
375         """Read from the Analog-to-Digital Converter"""
376         resp = self._hid_xfer(b"\x10")
377         return resp[49 + 2 * pin] << 8 | resp[48 + 2 * pin]
378
379     # ----------------------------------------------------------------
380     # DAC
381     # ----------------------------------------------------------------
382     def dac_configure(self, vref=0):
383         """Configure the Digital-to-Analog Converter"""
384         report = bytearray(b"\x60" + b"\x00" * 63)
385         report[3] = 1 << 7 | (vref & 0b111)
386         self._hid_xfer(report)
387
388     # pylint: disable=unused-argument
389     def dac_write(self, pin, value):
390         """Write to the Digital-to-Analog Converter"""
391         report = bytearray(b"\x60" + b"\x00" * 63)
392         report[4] = 1 << 7 | (value & 0b11111)
393         self._hid_xfer(report)
394
395     # pylint: enable=unused-argument
396
397
398 mcp2221 = MCP2221()