+from adafruit_blinka.agnostic import board_id, detector
+
+# pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
+# pylint: disable=too-many-arguments,too-many-function-args,too-many-return-statements
+
+
+class I2C(Lockable):
+ """
+ Busio I2C Class for CircuitPython Compatibility. Used
+ for both MicroPython and Linux.
+ """
+
+ def __init__(self, scl, sda, frequency=100000):
+ self.init(scl, sda, frequency)
+
+ def init(self, scl, sda, frequency):
+ """Initialization"""
+ self.deinit()
+ if detector.board.ftdi_ft232h:
+ from adafruit_blinka.microcontroller.ftdi_mpsse.mpsse.i2c import I2C as _I2C
+
+ self._i2c = _I2C(frequency=frequency)
+ return
+ if detector.board.binho_nova:
+ from adafruit_blinka.microcontroller.nova.i2c import I2C as _I2C
+
+ self._i2c = _I2C(frequency=frequency)
+ return
+ if detector.board.microchip_mcp2221:
+ from adafruit_blinka.microcontroller.mcp2221.i2c import I2C as _I2C
+
+ self._i2c = _I2C(frequency=frequency)
+ return
+ if detector.board.greatfet_one:
+ from adafruit_blinka.microcontroller.nxp_lpc4330.i2c import I2C as _I2C
+
+ self._i2c = _I2C(frequency=frequency)
+ return
+ if detector.board.pico_u2if:
+ from adafruit_blinka.microcontroller.rp2040_u2if.i2c import I2C_Pico as _I2C
+
+ self._i2c = _I2C(scl, sda, frequency=frequency)
+ return
+ if detector.board.feather_u2if:
+ from adafruit_blinka.microcontroller.rp2040_u2if.i2c import (
+ I2C_Feather as _I2C,
+ )
+
+ self._i2c = _I2C(scl, sda, frequency=frequency)
+ return
+ if detector.board.qtpy_u2if:
+ from adafruit_blinka.microcontroller.rp2040_u2if.i2c import I2C_QTPY as _I2C
+
+ self._i2c = _I2C(scl, sda, frequency=frequency)
+ return
+ if detector.board.itsybitsy_u2if:
+ from adafruit_blinka.microcontroller.rp2040_u2if.i2c import (
+ I2C_ItsyBitsy as _I2C,
+ )
+
+ self._i2c = _I2C(scl, sda, frequency=frequency)
+ return
+ if detector.board.macropad_u2if:
+ from adafruit_blinka.microcontroller.rp2040_u2if.i2c import (
+ I2C_MacroPad as _I2C,
+ )
+
+ self._i2c = _I2C(scl, sda, frequency=frequency)
+ return
+ if detector.board.qt2040_trinkey_u2if:
+ from adafruit_blinka.microcontroller.rp2040_u2if.i2c import (
+ I2C_QT2040_Trinkey as _I2C,
+ )
+
+ self._i2c = _I2C(scl, sda, frequency=frequency)
+ return
+ if detector.chip.id == ap_chip.RP2040:
+ from adafruit_blinka.microcontroller.rp2040.i2c import I2C as _I2C
+
+ self._i2c = _I2C(scl, sda, frequency=frequency)
+ return
+ if detector.board.any_embedded_linux:
+ from adafruit_blinka.microcontroller.generic_linux.i2c import I2C as _I2C
+ elif detector.board.ftdi_ft2232h:
+ from adafruit_blinka.microcontroller.ftdi_mpsse.mpsse.i2c import I2C as _I2C
+ else:
+ from adafruit_blinka.microcontroller.generic_micropython.i2c import (
+ I2C as _I2C,
+ )
+ from microcontroller.pin import i2cPorts
+
+ for portId, portScl, portSda in i2cPorts:
+ try:
+ # pylint: disable=unexpected-keyword-arg
+ if scl == portScl and sda == portSda:
+ self._i2c = _I2C(portId, mode=_I2C.MASTER, baudrate=frequency)
+ break
+ # pylint: enable=unexpected-keyword-arg
+ except RuntimeError:
+ pass
+ else:
+ raise ValueError(
+ "No Hardware I2C on (scl,sda)={}\nValid I2C ports: {}".format(
+ (scl, sda), i2cPorts
+ )
+ )
+ if threading is not None:
+ self._lock = threading.RLock()
+
+ def deinit(self):
+ """Deinitialization"""
+ try:
+ del self._i2c
+ except AttributeError:
+ pass
+
+ def __enter__(self):
+ if threading is not None:
+ self._lock.acquire()
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if threading is not None:
+ self._lock.release()
+ self.deinit()
+
+ def scan(self):
+ """Scan for attached devices"""
+ return self._i2c.scan()
+
+ def readfrom_into(self, address, buffer, *, start=0, end=None):
+ """Read from a device at specified address into a buffer"""
+ if start != 0 or end is not None:
+ if end is None:
+ end = len(buffer)
+ buffer = memoryview(buffer)[start:end]
+ stop = True # remove for efficiency later
+ return self._i2c.readfrom_into(address, buffer, stop=stop)
+
+ def writeto(self, address, buffer, *, start=0, end=None, stop=True):
+ """Write to a device at specified address from a buffer"""
+ if isinstance(buffer, str):
+ buffer = bytes([ord(x) for x in buffer])
+ if start != 0 or end is not None:
+ if end is None:
+ return self._i2c.writeto(address, memoryview(buffer)[start:], stop=stop)
+ return self._i2c.writeto(address, memoryview(buffer)[start:end], stop=stop)
+ return self._i2c.writeto(address, buffer, stop=stop)
+
+ def writeto_then_readfrom(
+ self,
+ address,
+ buffer_out,
+ buffer_in,
+ *,
+ out_start=0,
+ out_end=None,
+ in_start=0,
+ in_end=None,
+ stop=False,
+ ):
+ """ "Write to a device at specified address from a buffer then read
+ from a device at specified address into a buffer
+ """
+ return self._i2c.writeto_then_readfrom(
+ address,
+ buffer_out,
+ buffer_in,
+ out_start=out_start,
+ out_end=out_end,
+ in_start=in_start,
+ in_end=in_end,
+ stop=stop,
+ )