]> Repositories - hackapet/Adafruit_Blinka.git/commitdiff
Merge pull request #184 from caternuson/mcp2221
authorLimor "Ladyada" Fried <limor@ladyada.net>
Sat, 7 Dec 2019 02:15:09 +0000 (21:15 -0500)
committerGitHub <noreply@github.com>
Sat, 7 Dec 2019 02:15:09 +0000 (21:15 -0500)
Add MCP2221 support.

src/adafruit_blinka/board/microchip_mcp2221.py [new file with mode: 0644]
src/adafruit_blinka/microcontroller/mcp2221/__init__.py [new file with mode: 0644]
src/adafruit_blinka/microcontroller/mcp2221/i2c.py [new file with mode: 0644]
src/adafruit_blinka/microcontroller/mcp2221/mcp2221.py [new file with mode: 0644]
src/adafruit_blinka/microcontroller/mcp2221/pin.py [new file with mode: 0644]
src/analogio.py [new file with mode: 0644]
src/board.py
src/busio.py
src/digitalio.py
src/microcontroller/pin.py

diff --git a/src/adafruit_blinka/board/microchip_mcp2221.py b/src/adafruit_blinka/board/microchip_mcp2221.py
new file mode 100644 (file)
index 0000000..bdb3ba2
--- /dev/null
@@ -0,0 +1,9 @@
+from adafruit_blinka.microcontroller.mcp2221 import pin
+
+G0 = pin.G0
+G1 = pin.G1
+G2 = pin.G2
+G3 = pin.G3
+
+SCL = pin.SCL
+SDA = pin.SDA
\ No newline at end of file
diff --git a/src/adafruit_blinka/microcontroller/mcp2221/__init__.py b/src/adafruit_blinka/microcontroller/mcp2221/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/adafruit_blinka/microcontroller/mcp2221/i2c.py b/src/adafruit_blinka/microcontroller/mcp2221/i2c.py
new file mode 100644 (file)
index 0000000..9ebb5b6
--- /dev/null
@@ -0,0 +1,23 @@
+from .mcp2221 import mcp2221
+
+class I2C:
+
+    def __init__(self, *, frequency=100000):
+        self._mcp2221 = mcp2221
+        self._mcp2221.i2c_configure(frequency)
+
+    def scan(self):
+        return self._mcp2221.i2c_scan()
+
+    def writeto(self, address, buffer, *, start=0, end=None, stop=True):
+        self._mcp2221.i2c_writeto(address, buffer, start=start, end=end)
+
+    def readfrom_into(self, address, buffer, *, start=0, end=None, stop=True):
+        self._mcp2221.i2c_readfrom_into(address, buffer, start=start, end=end)
+
+    def writeto_then_readfrom(self, address, buffer_out, buffer_in, *,
+                              out_start=0, out_end=None,
+                              in_start=0, in_end=None, stop=False):
+        self._mcp2221.i2c_writeto_then_readfrom(address, buffer_out, buffer_in,
+                                          out_start=out_start, out_end=out_end,
+                                          in_start=in_start, in_end=in_end)
diff --git a/src/adafruit_blinka/microcontroller/mcp2221/mcp2221.py b/src/adafruit_blinka/microcontroller/mcp2221/mcp2221.py
new file mode 100644 (file)
index 0000000..bd44474
--- /dev/null
@@ -0,0 +1,315 @@
+import time
+import hid
+
+# from the C driver
+# http://ww1.microchip.com/downloads/en/DeviceDoc/mcp2221_0_1.tar.gz
+# others (???) determined during driver developement
+# pylint: disable=bad-whitespace
+RESP_ERR_NOERR              = 0x00
+RESP_ADDR_NACK              = 0x25
+RESP_READ_ERR               = 0x7F
+RESP_READ_COMPL             = 0x55
+RESP_READ_PARTIAL           = 0x54 # ???
+RESP_I2C_IDLE               = 0x00
+RESP_I2C_START_TOUT         = 0x12
+RESP_I2C_RSTART_TOUT        = 0x17
+RESP_I2C_WRADDRL_TOUT       = 0x23
+RESP_I2C_WRADDRL_WSEND      = 0x21
+RESP_I2C_WRADDRL_NACK       = 0x25
+RESP_I2C_WRDATA_TOUT        = 0x44
+RESP_I2C_RDDATA_TOUT        = 0x52
+RESP_I2C_STOP_TOUT          = 0x62
+
+RESP_I2C_MOREDATA           = 0x43 # ???
+RESP_I2C_PARTIALDATA        = 0x41 # ???
+RESP_I2C_WRITINGNOSTOP      = 0x45 # ???
+
+MCP2221_RETRY_MAX           = 50
+MCP2221_MAX_I2C_DATA_LEN    = 60
+MASK_ADDR_NACK              = 0x40
+# pylint: enable=bad-whitespace
+
+class MCP2221:
+
+    VID = 0x04D8
+    PID = 0x00DD
+
+    GP_GPIO = 0b000
+    GP_DEDICATED = 0b001
+    GP_ALT0 = 0b010
+    GP_ALT1 = 0b011
+    GP_ALT2 = 0b100
+
+    def __init__(self):
+        self._hid = hid.device()
+        self._hid.open(MCP2221.VID, MCP2221.PID)
+        self._reset()
+        time.sleep(0.25)
+
+    def _hid_xfer(self, report, response=True):
+        # first byte is report ID, which =0 for MCP2221
+        # remaing bytes = 64 byte report data
+        # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
+        self._hid.write(b'\0' + report + b'\0'*(64-len(report)))
+        if response:
+            # return is 64 byte response report
+            return self._hid.read(64)
+
+    #----------------------------------------------------------------
+    # MISC
+    #----------------------------------------------------------------
+    def gp_get_mode(self, pin):
+        return self._hid_xfer(b'\x61')[22+pin] & 0x07
+
+    def gp_set_mode(self, pin, mode):
+        # get current settings
+        current = self._hid_xfer(b'\x61')
+        # empty report, this is safe since 0's = no change
+        report = bytearray(b'\x60'+b'\x00'*63)
+        # set the alter GP flag byte
+        report[7] = 0xFF
+        # each pin can be set individually
+        # but all 4 get set at once, so we need to
+        # transpose current settings
+        report[8]  = current[22]  # GP0
+        report[9]  = current[23]  # GP1
+        report[10] = current[24]  # GP2
+        report[11] = current[25]  # GP3
+        # then change only the one
+        report[8+pin] = mode & 0x07
+        # and make it so
+        self._hid_xfer(report)
+
+    def _pretty_report(self, report):
+        print("     0  1  2  3  4  5  6  7  8  9")
+        index = 0
+        for row in range(7):
+            print("{} : ".format(row), end='')
+            for _ in range(10):
+                print("{:02x} ".format(report[index]), end='')
+                index += 1
+                if index > 63:
+                    break
+            print()
+
+    def _status_dump(self):
+        self._pretty_report(self._hid_xfer(b'\x10'))
+
+    def _sram_dump(self):
+        self._pretty_report(self._hid_xfer(b'\x61'))
+
+    def _reset(self):
+        self._hid_xfer(b'\x70\xAB\xCD\xEF', response=False)
+        start = time.monotonic()
+        while time.monotonic() - start < 5:
+            try:
+                self._hid.open(MCP2221.VID, MCP2221.PID)
+            except OSError:
+                # try again
+                time.sleep(0.1)
+                continue
+            return
+        raise OSError("open failed")
+
+    #----------------------------------------------------------------
+    # GPIO
+    #----------------------------------------------------------------
+    def gpio_set_direction(self, pin, mode):
+        report = bytearray(b'\x50'+b'\x00'*63)  # empty set GPIO report
+        offset = 4 * (pin + 1)
+        report[offset] = 0x01                   # set pin direction
+        report[offset+1] = mode                 # to this
+        self._hid_xfer(report)
+
+    def gpio_set_pin(self, pin, value):
+        report = bytearray(b'\x50'+b'\x00'*63)  # empty set GPIO report
+        offset = 2 + 4 * pin
+        report[offset] = 0x01                   # set pin value
+        report[offset+1] = value                # to this
+        self._hid_xfer(report)
+
+    def gpio_get_pin(self, pin):
+        resp = self._hid_xfer(b'\x51')
+        offset = 2 + 2 * pin
+        if resp[offset] == 0xEE:
+            raise RuntimeError("Pin is not set for GPIO operation.")
+        else:
+            return resp[offset]
+
+    #----------------------------------------------------------------
+    # I2C
+    #----------------------------------------------------------------
+    def _i2c_status(self):
+        resp = self._hid_xfer(b'\x10')
+        if resp[1] != 0:
+            raise RuntimeError("Couldn't get I2C status")
+        return resp
+
+    def _i2c_state(self):
+        return self._i2c_status()[8]
+
+    def _i2c_cancel(self):
+        resp = self._hid_xfer(b'\x10\x00\x10')
+        if resp[1] != 0x00:
+            raise RuntimeError("Couldn't cancel I2C")
+        if resp[2] == 0x10:
+            # bus release will need "a few hundred microseconds"
+            time.sleep(0.001)
+
+    def _i2c_write(self, cmd, address, buffer, start=0, end=None):
+        if self._i2c_state() != 0x00:
+            self._i2c_cancel()
+
+        end = end if end else len(buffer)
+        length = end - start
+        retries = 0
+
+        while (end - start) > 0:
+            chunk = min(end - start, MCP2221_MAX_I2C_DATA_LEN)
+            # write out current chunk
+            resp = self._hid_xfer(bytes([cmd,
+                                         length & 0xFF,
+                                         (length >> 8) & 0xFF,
+                                         address << 1]) +
+                                         buffer[start:(start+chunk)])
+            # check for success
+            if resp[1] != 0x00:
+                if resp[2] in (RESP_I2C_START_TOUT,
+                               RESP_I2C_WRADDRL_TOUT,
+                               RESP_I2C_WRADDRL_NACK,
+                               RESP_I2C_WRDATA_TOUT,
+                               RESP_I2C_STOP_TOUT):
+                    raise RuntimeError("Unrecoverable I2C state failure")
+                retries += 1
+                if retries >= MCP2221_RETRY_MAX:
+                    raise RuntimeError("I2C write error, max retries reached.")
+                time.sleep(0.001)
+                continue # try again
+            # yay chunk sent!
+            while self._i2c_state() == RESP_I2C_PARTIALDATA:
+                time.sleep(0.001)
+            start += chunk
+            retries = 0
+
+        # check status in another loop
+        for _ in range(MCP2221_RETRY_MAX):
+            status = self._i2c_status()
+            if status[20] & MASK_ADDR_NACK:
+                raise RuntimeError("I2C slave address was NACK'd")
+            usb_cmd_status = status[8]
+            if usb_cmd_status == 0:
+                break
+            if usb_cmd_status == RESP_I2C_WRITINGNOSTOP and cmd == 0x94:
+                break   # this is OK too!
+            if usb_cmd_status in (RESP_I2C_START_TOUT,
+                                  RESP_I2C_WRADDRL_TOUT,
+                                  RESP_I2C_WRADDRL_NACK,
+                                  RESP_I2C_WRDATA_TOUT,
+                                  RESP_I2C_STOP_TOUT):
+                raise RuntimeError("Unrecoverable I2C state failure")
+            time.sleep(0.001)
+        else:
+            raise RuntimeError("I2C write error: max retries reached.")
+        # whew success!
+
+    def _i2c_read(self, cmd, address, buffer, start=0, end=None):
+        if self._i2c_state() not in (RESP_I2C_WRITINGNOSTOP, 0):
+            self._i2c_cancel()
+
+        end = end if end else len(buffer)
+        length = end - start
+
+        # tell it we want to read
+        resp = self._hid_xfer(bytes([cmd,
+                                     length & 0xFF,
+                                     (length >> 8) & 0xFF,
+                                     (address << 1) | 0x01]))
+
+        # check for success
+        if resp[1] != 0x00:
+            raise RuntimeError("Unrecoverable I2C read failure")
+
+        # and now the read part
+        while (end - start) > 0:
+            for retry in range(MCP2221_RETRY_MAX):
+                # the actual read
+                resp = self._hid_xfer(b'\x40')
+                # check for success
+                if resp[1] == RESP_I2C_PARTIALDATA:
+                    time.sleep(0.001)
+                    continue
+                if resp[1] != 0x00:
+                    raise RuntimeError("Unrecoverable I2C read failure")
+                if resp[2] == RESP_ADDR_NACK:
+                    raise RuntimeError("I2C NACK")
+                if resp[3] == 0x00 and resp[2] == 0x00:
+                    break
+                if resp[3] == RESP_READ_ERR:
+                    time.sleep(0.001)
+                    continue
+                if resp[2] in (RESP_READ_COMPL, RESP_READ_PARTIAL):
+                    break
+
+            # move data into buffer
+            chunk = min(end - start, 60)
+            for i, k in enumerate(range(start, start+chunk)):
+                buffer[k] = resp[4 + i]
+            start += chunk
+
+    def i2c_configure(self, baudrate=100000):
+        self._hid_xfer(bytes([0x10,  # set parameters
+                              0x00,  # don't care
+                              0x00,  # no effect
+                              0x20,  # next byte is clock divider
+                              12000000 // baudrate - 3]))
+
+    def i2c_writeto(self, address, buffer, *, start=0, end=None):
+        self._i2c_write(0x90, address, buffer, start, end)
+
+    def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
+        self._i2c_read(0x91, address, buffer, start, end)
+
+    def i2c_writeto_then_readfrom(self, address, out_buffer, in_buffer, *,
+                                  out_start=0, out_end=None,
+                                  in_start=0, in_end=None):
+        self._i2c_write(0x94, address, out_buffer, out_start, out_end)
+        self._i2c_read(0x93, address, in_buffer, in_start, in_end)
+
+    def i2c_scan(self, *, start=0, end=0x79):
+        found = []
+        for addr in range(start, end+1):
+            # try a write
+            self.i2c_writeto(addr, b'\x00')
+            # store if success
+            if self._i2c_status() == 0x00:
+                found.append(addr)
+            # cancel and continue
+            self._i2c_cancel()
+        return found
+
+    #----------------------------------------------------------------
+    # ADC
+    #----------------------------------------------------------------
+    def adc_configure(self, vref=0):
+        report = bytearray(b'\x60'+b'\x00'*63)
+        report[5] = 1 << 7 | (vref & 0b111)
+        self._hid_xfer(report)
+
+    def adc_read(self, pin):
+        resp = self._hid_xfer(b'\x10')
+        return resp[49 + 2 * pin] << 8 | resp[48 + 2 * pin]
+
+    #----------------------------------------------------------------
+    # DAC
+    #----------------------------------------------------------------
+    def dac_configure(self, vref=0):
+        report = bytearray(b'\x60'+b'\x00'*63)
+        report[3] = 1 << 7 | (vref & 0b111)
+        self._hid_xfer(report)
+
+    def dac_write(self, pin, value):
+        report = bytearray(b'\x60'+b'\x00'*63)
+        report[4] = 1 << 7 | (value & 0b11111)
+        self._hid_xfer(report)
+
+mcp2221 = MCP2221()
diff --git a/src/adafruit_blinka/microcontroller/mcp2221/pin.py b/src/adafruit_blinka/microcontroller/mcp2221/pin.py
new file mode 100644 (file)
index 0000000..139d8b6
--- /dev/null
@@ -0,0 +1,81 @@
+from .mcp2221 import mcp2221
+
+class Pin:
+    """A basic Pin class for use with MCP2221."""
+
+    # pin modes
+    OUT     = 0
+    IN      = 1
+    ADC     = 2
+    DAC     = 3
+    # pin values
+    LOW     = 0
+    HIGH    = 1
+
+    def __init__(self, pin_id=None):
+        self.id = pin_id
+        self._mode = None
+
+    def init(self, mode=IN, pull=None):
+        if self.id is None:
+            raise RuntimeError("Can not init a None type pin.")
+        if mode in (Pin.IN, Pin.OUT):
+            # All pins can do GPIO
+            mcp2221.gp_set_mode(self.id, mcp2221.GP_GPIO)
+            mcp2221.gpio_set_direction(self.id, mode)
+        elif mode == Pin.ADC:
+            # ADC only available on these pins
+            if self.id not in (1, 2, 3):
+                raise ValueError("Pin does not have ADC capabilities")
+            mcp2221.gp_set_mode(self.id, mcp2221.GP_ALT0)
+            mcp2221.adc_configure()
+        elif mode == Pin.DAC:
+            # DAC only available on these pins
+            if self.id not in (2, 3):
+                raise ValueError("Pin does not have DAC capabilities")
+            mcp2221.gp_set_mode(self.id, mcp2221.GP_ALT1)
+            mcp2221.dac_configure()
+        else:
+            raise ValueError("Incorrect pin mode: {}".format(mode))
+        self._mode = mode
+
+    def value(self, val=None):
+        # Digital In / Out
+        if self._mode in (Pin.IN, Pin.OUT):
+            # digital read
+            if val is None:
+                return mcp2221.gpio_get_pin(self.id)
+            # digital write
+            elif val in (Pin.LOW, Pin.HIGH):
+                mcp2221.gpio_set_pin(self.id, val)
+            # nope
+            else:
+                raise ValueError("Invalid value for pin.")
+        # Analog In
+        elif self._mode == Pin.ADC:
+            if val is None:
+                # MCP2221 ADC is 10 bit, scale to 16 bit per CP API
+                return mcp2221.adc_read(self.id) * 64
+            else:
+                # read only
+                raise AttributeError("'AnalogIn' object has no attribute 'value'")
+        # Analog Out
+        elif self._mode == Pin.DAC:
+            if val is None:
+                # write only
+                raise AttributeError("unreadable attribute")
+            else:
+                # scale 16 bit value to MCP2221 5 bit DAC (yes 5 bit)
+                mcp2221.dac_write(self.id, val // 2048)
+        else:
+            raise RuntimeError("No action for mode {} with value {}".format(self._mode, val))
+
+
+# create pin instances for each pin
+G0 = Pin(0)
+G1 = Pin(1)
+G2 = Pin(2)
+G3 = Pin(3)
+
+SCL = Pin()
+SDA = Pin()
\ No newline at end of file
diff --git a/src/analogio.py b/src/analogio.py
new file mode 100644 (file)
index 0000000..229495d
--- /dev/null
@@ -0,0 +1,52 @@
+"""
+`analogio` - Analog input and output control
+=================================================
+See `CircuitPython:analogio` in CircuitPython for more details.
+* Author(s): Carter Nelson
+"""
+
+from adafruit_blinka.agnostic import board_id, detector
+
+# pylint: disable=ungrouped-imports,wrong-import-position
+
+if detector.board.microchip_mcp2221:
+    from adafruit_blinka.microcontroller.mcp2221.pin import Pin
+else:
+    raise NotImplementedError("analogio not supported for this board.")
+
+from adafruit_blinka import ContextManaged
+
+class AnalogIn(ContextManaged):
+
+    def __init__(self, pin):
+        self._pin = Pin(pin.id)
+        self._pin.init(mode=Pin.ADC)
+
+    @property
+    def value(self):
+        return self._pin.value()
+
+    @value.setter
+    def value(self, value):
+        # emulate what CircuitPython does
+        raise AttributeError("'AnalogIn' object has no attribute 'value'")
+
+    def deinit(self):
+        del self._pin
+
+class AnalogOut(ContextManaged):
+    def __init__(self, pin):
+        self._pin = Pin(pin.id)
+        self._pin.init(mode=Pin.DAC)
+
+    @property
+    def value(self):
+        # emulate what CircuitPython does
+        raise AttributeError("unreadable attribute")
+
+    @value.setter
+    def value(self, value):
+        self._pin.value(value)
+
+    def deinit(self):
+        del self._pin
\ No newline at end of file
index ef2f086c26c40f65d4dfccef3007aa430193b3b3..c9fac91dc7cfb5e903f0ae7e3e3a3b1102d18353 100755 (executable)
@@ -57,7 +57,7 @@ elif detector.board.RASPBERRY_PI_B_REV2:
 
 elif board_id == ap_board.BEAGLEBONE_BLACK:
     from adafruit_blinka.board.beaglebone_black import *
-       
+
 elif board_id == ap_board.BEAGLEBONE_BLACK_INDUSTRIAL:
     from adafruit_blinka.board.beaglebone_black import *
 
@@ -109,6 +109,9 @@ elif board_id == ap_board.FTDI_FT232H:
 elif board_id == ap_board.BINHO_NOVA:
     from adafruit_blinka.board.binho_nova import *
 
+elif board_id == ap_board.MICROCHIP_MCP2221:
+    from adafruit_blinka.board.microchip_mcp2221 import *
+
 elif "sphinx" in sys.modules:
     pass
 
index 74cfa89eb75d2dabe4bc68bedeae80aab87701ea..d58113423c67f1df9a7f069202368efed9c9f033 100755 (executable)
@@ -25,7 +25,9 @@ class I2C(Lockable):
             return
         elif detector.board.binho_nova:
             from adafruit_blinka.microcontroller.nova.i2c import I2C
-            self._i2c = I2C()
+        elif detector.board.microchip_mcp2221:
+            from adafruit_blinka.microcontroller.mcp2221.i2c import I2C
+            self._i2c = I2C(frequency=frequency)
             return
         elif detector.board.any_embedded_linux:
             from adafruit_blinka.microcontroller.generic_linux.i2c import I2C as _I2C
index ed497cc76addd339b7ad68c9a43fab9be11bf58c..bb2a48ef32a2b5ef09c5c81b26b78a7930bf65fb 100755 (executable)
@@ -39,6 +39,8 @@ elif detector.board.binho_nova:
     from adafruit_blinka.microcontroller.nova.pin import Pin
 elif detector.chip.STM32:
     from machine import Pin
+elif detector.board.microchip_mcp2221:
+    from adafruit_blinka.microcontroller.mcp2221.pin import Pin
 from adafruit_blinka import Enum, ContextManaged
 
 class DriveMode(Enum):
index d6e18d8ac50f1218206b63fd961293fb54fbc12f..d1b5b8fea3ec7b8ebe0aceb6f3d9c20901d32030 100755 (executable)
@@ -36,5 +36,7 @@ elif chip_id == ap_chip.FT232H:
     from adafruit_blinka.microcontroller.ft232h.pin import *
 elif chip_id == ap_chip.BINHO:
     from adafruit_blinka.microcontroller.nova.pin import *
+elif chip_id == ap_chip.MCP2221:
+    from adafruit_blinka.microcontroller.mcp2221.pin import *
 else:
     raise NotImplementedError("Microcontroller not supported: ", chip_id)