X-Git-Url: https://git.ayoreis.com/Adafruit_Blinka-hackapet.git/blobdiff_plain/0bc5711e69ff03db0a46adc2ffa1ee2819b0b1ef..c2bc4a90827f28abb6b51eae659a1f80a65c0497:/src/adafruit_blinka/microcontroller/mcp2221/mcp2221.py diff --git a/src/adafruit_blinka/microcontroller/mcp2221/mcp2221.py b/src/adafruit_blinka/microcontroller/mcp2221/mcp2221.py index 0275e66..2dcc539 100644 --- a/src/adafruit_blinka/microcontroller/mcp2221/mcp2221.py +++ b/src/adafruit_blinka/microcontroller/mcp2221/mcp2221.py @@ -1,7 +1,12 @@ +# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries +# +# SPDX-License-Identifier: MIT """Chip Definition for MCP2221""" import os import time +import atexit + import hid # Here if you need it @@ -51,6 +56,8 @@ class MCP2221: def __init__(self): self._hid = hid.device() self._hid.open(MCP2221.VID, MCP2221.PID) + # make sure the device gets closed before exit + atexit.register(self.close) if MCP2221_RESET_DELAY >= 0: self._reset() self._gp_config = [0x07] * 4 # "don't care" initial value @@ -58,6 +65,14 @@ class MCP2221: self.gp_set_mode(pin, self.GP_GPIO) # set to GPIO mode self.gpio_set_direction(pin, 1) # set to INPUT + def close(self): + """Close the hid device. Does nothing if the device is not open.""" + self._hid.close() + + def __del__(self): + # try to close the device before destroying the instance + self.close() + def _hid_xfer(self, report, response=True): """Perform HID Transfer""" # first byte is report ID, which =0 for MCP2221 @@ -118,6 +133,7 @@ class MCP2221: def _reset(self): self._hid_xfer(b"\x70\xAB\xCD\xEF", response=False) + self._hid.close() time.sleep(MCP2221_RESET_DELAY) start = time.monotonic() while time.monotonic() - start < 5: @@ -189,7 +205,7 @@ class MCP2221: # bus release will need "a few hundred microseconds" time.sleep(0.001) - # pylint: disable=too-many-arguments + # pylint: disable=too-many-arguments,too-many-branches def _i2c_write(self, cmd, address, buffer, start=0, end=None): if self._i2c_state() != 0x00: self._i2c_cancel() @@ -198,7 +214,7 @@ class MCP2221: length = end - start retries = 0 - while (end - start) > 0: + while (end - start) > 0 or not buffer: chunk = min(end - start, MCP2221_MAX_I2C_DATA_LEN) # write out current chunk resp = self._hid_xfer( @@ -223,6 +239,8 @@ class MCP2221: # yay chunk sent! while self._i2c_state() == RESP_I2C_PARTIALDATA: time.sleep(0.001) + if not buffer: + break start += chunk retries = 0 @@ -230,7 +248,7 @@ class MCP2221: for _ in range(MCP2221_RETRY_MAX): status = self._i2c_status() if status[20] & MASK_ADDR_NACK: - raise RuntimeError("I2C slave address was NACK'd") + raise OSError("I2C slave address was NACK'd") usb_cmd_status = status[8] if usb_cmd_status == 0: break @@ -285,6 +303,8 @@ class MCP2221: continue if resp[2] in (RESP_READ_COMPL, RESP_READ_PARTIAL): break + else: + raise RuntimeError("I2C read error: max retries reached.") # move data into buffer chunk = min(end - start, 60) @@ -294,7 +314,7 @@ class MCP2221: # pylint: enable=too-many-arguments - def i2c_configure(self, baudrate=100000): + def _i2c_configure(self, baudrate=100000): """Configure I2C""" self._hid_xfer( bytes( @@ -325,7 +345,7 @@ class MCP2221: out_start=0, out_end=None, in_start=0, - in_end=None + in_end=None, ): """Write data from buffer_out to an address and then read data from an address and into buffer_in @@ -340,7 +360,8 @@ class MCP2221: # try a write try: self.i2c_writeto(addr, b"\x00") - except RuntimeError: # no reply! + except OSError: # no reply! + # We got a NACK, which could be correct continue # store if success found.append(addr)