]> Repositories - Adafruit_Blinka-hackapet.git/blobdiff - src/adafruit_blinka/microcontroller/ftdi_mpsse/mpsse/i2c.py
Add support for FT2232H "microcontroller"
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / ftdi_mpsse / mpsse / i2c.py
diff --git a/src/adafruit_blinka/microcontroller/ftdi_mpsse/mpsse/i2c.py b/src/adafruit_blinka/microcontroller/ftdi_mpsse/mpsse/i2c.py
new file mode 100644 (file)
index 0000000..762dd71
--- /dev/null
@@ -0,0 +1,73 @@
+"""I2C Class for FTDI MPSSE"""
+from adafruit_blinka.microcontroller.ftdi_mpsse.mpsse.pin import Pin
+
+class I2C:
+    """Custom I2C Class for FTDI MPSSE"""
+
+    MASTER = 0
+    SLAVE = 1
+    _mode = None
+
+    # pylint: disable=unused-argument
+    def __init__(self, id=None, mode=MASTER, baudrate=None, frequency=400000):
+        if mode != self.MASTER:
+            raise NotImplementedError("Only I2C Master supported!")
+        _mode = self.MASTER
+
+        # change GPIO controller to I2C
+        # pylint: disable=import-outside-toplevel
+        from pyftdi.i2c import I2cController
+
+        # pylint: enable=import-outside-toplevel
+
+        self._i2c = I2cController()
+        if id is None:
+            self._i2c.configure("ftdi://ftdi:ft232h/1", frequency=frequency)
+        else:
+            self._i2c.configure("ftdi://ftdi:ft2232h/{}".format(id+1), frequency=frequency)
+        Pin.mpsse_gpio = self._i2c.get_gpio()
+
+    def scan(self):
+        """Perform an I2C Device Scan"""
+        return [addr for addr in range(0x79) if self._i2c.poll(addr)]
+
+    def writeto(self, address, buffer, *, start=0, end=None, stop=True):
+        """Write data from the buffer to an address"""
+        end = end if end else len(buffer)
+        port = self._i2c.get_port(address)
+        port.write(buffer[start:end], relax=stop)
+
+    def readfrom_into(self, address, buffer, *, start=0, end=None, stop=True):
+        """Read data from an address and into the buffer"""
+        end = end if end else len(buffer)
+        port = self._i2c.get_port(address)
+        result = port.read(len(buffer[start:end]), relax=stop)
+        for i, b in enumerate(result):
+            buffer[start + i] = b
+
+    # pylint: disable=unused-argument
+    def writeto_then_readfrom(
+        self,
+        address,
+        buffer_out,
+        buffer_in,
+        *,
+        out_start=0,
+        out_end=None,
+        in_start=0,
+        in_end=None,
+        stop=False
+    ):
+        """Write data from buffer_out to an address and then
+        read data from an address and into buffer_in
+        """
+        out_end = out_end if out_end else len(buffer_out)
+        in_end = in_end if in_end else len(buffer_in)
+        port = self._i2c.get_port(address)
+        result = port.exchange(
+            buffer_out[out_start:out_end], in_end - in_start, relax=True
+        )
+        for i, b in enumerate(result):
+            buffer_in[in_start + i] = b
+
+    # pylint: enable=unused-argument