]> Repositories - hackapet/Adafruit_Blinka.git/blobdiff - src/adafruit_blinka/microcontroller/pico_u2if/pico_u2if.py
Merge pull request #461 from makermelissa/master
[hackapet/Adafruit_Blinka.git] / src / adafruit_blinka / microcontroller / pico_u2if / pico_u2if.py
index 25ce962d1ed47c308d774d6b9adedf36d0313754..687bfc033585b4212b5bd44945fee65c4140461f 100644 (file)
@@ -1,8 +1,16 @@
 """Chip Definition for Pico with u2if firmware"""
 # https://github.com/execuc/u2if
 
+import os
+import time
 import hid
 
+# Use to set delay between reset and device reopen. if negative, don't reset at all
+PICO_U2IF_RESET_DELAY = float(os.environ.get("PICO_U2IF_RESET_DELAY", 1))
+
+# pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
+# pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
+
 
 class Pico_u2if:
     """MCP2221 Device Class Definition"""
@@ -12,6 +20,7 @@ class Pico_u2if:
 
     # MISC
     RESP_OK = 0x01
+    SYS_RESET = 0x10
 
     # GPIO
     GPIO_INIT_PIN = 0x20
@@ -61,15 +70,11 @@ class Pico_u2if:
     PWM_SET_DUTY_NS = 0x36
     PWM_GET_DUTY_NS = 0x37
 
-    # UART
-    UART0_INIT = 0x50
-    UART0_DEINIT = 0x51
-    UART0_WRITE = 0x52
-    UART0_READ = 0x53
-
     def __init__(self):
         self._hid = hid.device()
         self._hid.open(Pico_u2if.VID, Pico_u2if.PID)
+        if PICO_U2IF_RESET_DELAY >= 0:
+            self._reset()
         self._i2c_index = None
         self._spi_index = None
         self._serial = None
@@ -87,6 +92,19 @@ class Pico_u2if:
             return self._hid.read(64)
         return None
 
+    def _reset(self):
+        self._hid_xfer(bytes([self.SYS_RESET]), False)
+        time.sleep(PICO_U2IF_RESET_DELAY)
+        start = time.monotonic()
+        while time.monotonic() - start < 5:
+            try:
+                self._hid.open(Pico_u2if.VID, Pico_u2if.PID)
+            except OSError:
+                time.sleep(0.1)
+                continue
+            return
+        raise OSError("Pico open error.")
+
     # ----------------------------------------------------------------
     # GPIO
     # ----------------------------------------------------------------
@@ -159,6 +177,7 @@ class Pico_u2if:
     # I2C
     # ----------------------------------------------------------------
     def i2c_configure(self, baudrate, pullup=False):
+        """Configure I2C."""
         if self._i2c_index is None:
             raise RuntimeError("I2C bus not initialized.")
 
@@ -176,6 +195,7 @@ class Pico_u2if:
             raise RuntimeError("I2C init error.")
 
     def i2c_set_port(self, index):
+        """Set I2C port."""
         if index not in (0, 1):
             raise ValueError("I2C index must be 0 or 1.")
         self._i2c_index = index
@@ -266,6 +286,7 @@ class Pico_u2if:
     # SPI
     # ----------------------------------------------------------------
     def spi_configure(self, baudrate):
+        """Configure SPI."""
         if self._spi_index is None:
             raise RuntimeError("SPI bus not initialized.")
 
@@ -283,11 +304,13 @@ class Pico_u2if:
             raise RuntimeError("SPI init error.")
 
     def spi_set_port(self, index):
+        """Set SPI port."""
         if index not in (0, 1):
             raise ValueError("SPI index must be 0 or 1.")
         self._spi_index = index
 
     def spi_write(self, buffer, *, start=0, end=None):
+        """SPI write."""
         if self._spi_index is None:
             raise RuntimeError("SPI bus not initialized.")
 
@@ -306,6 +329,7 @@ class Pico_u2if:
             start += chunk
 
     def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
+        """SPI readinto."""
         if self._spi_index is None:
             raise RuntimeError("SPI bus not initialized.")
 
@@ -330,12 +354,14 @@ class Pico_u2if:
         in_start=0,
         in_end=None
     ):
+        """SPI write and readinto."""
         raise NotImplementedError("SPI write_readinto Not implemented")
 
     # ----------------------------------------------------------------
     # NEOPIXEL
     # ----------------------------------------------------------------
     def neopixel_write(self, gpio, buf):
+        """NeoPixel write."""
         # open serial (data is sent over this)
         if self._serial is None:
             import serial
@@ -352,6 +378,7 @@ class Pico_u2if:
         # init
         if not self._neopixel_initialized:
             # deinit any current setup
+            # pylint: disable=protected-access
             self._hid_xfer(bytes([self.WS2812B_DEINIT]))
             resp = self._hid_xfer(
                 bytes(
@@ -366,6 +393,8 @@ class Pico_u2if:
                 raise RuntimeError("Neopixel init error")
             self._neopixel_initialized = True
 
+        self._serial.reset_output_buffer()
+
         # write
         # command is done over HID
         remain_bytes = len(buf)
@@ -374,24 +403,37 @@ class Pico_u2if:
             True,
         )
         if resp[1] != self.RESP_OK:
+            # pylint: disable=no-else-raise
             if resp[2] == 0x01:
                 raise RuntimeError(
                     "Neopixel write error : too many pixel for the firmware."
                 )
             elif resp[2] == 0x02:
+                print(resp[0:10])
                 raise RuntimeError(
                     "Neopixel write error : transfer already in progress."
                 )
             else:
-                raise RuntimeError("Neopixel write error")
+                raise RuntimeError("Neopixel write error.")
         # buffer is sent over serial
         self._serial.write(buf)
+        # hack (see u2if)
+        if len(buf) % 64 == 0:
+            self._serial.write([0])
         self._serial.flush()
+        # polling loop to wait for write complete?
+        resp = self._hid.read(64)
+        while resp[0] != self.WS2812B_WRITE:
+            resp = self._hid.read(64)
+        if resp[1] != self.RESP_OK:
+            raise RuntimeError("Neopixel write (flush) error.")
 
     # ----------------------------------------------------------------
     # PWM
     # ----------------------------------------------------------------
+    # pylint: disable=unused-argument
     def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
+        """Configure PWM."""
         self.pwm_deinit(pin)
         resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
         if resp[1] != self.RESP_OK:
@@ -401,25 +443,25 @@ class Pico_u2if:
         self.pwm_set_duty_cycle(pin, duty_cycle)
 
     def pwm_deinit(self, pin):
+        """Deinit PWM."""
         self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
 
     def pwm_get_frequency(self, pin):
-        resp = self._hid_xfer(
-            bytes([self.PWM_GET_FREQ, pin.id])
-            + frequency.to_bytes(4, byteorder="little"),
-            True,
-        )
+        """PWM get freq."""
+        resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
         if resp[1] != self.RESP_OK:
             raise RuntimeError("PWM get frequency error.")
         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
 
     def pwm_set_frequency(self, pin, frequency):
+        """PWM set freq."""
         resp = self._hid_xfer(
             bytes([self.PWM_SET_FREQ, pin.id])
             + frequency.to_bytes(4, byteorder="little"),
             True,
         )
         if resp[1] != self.RESP_OK:
+            # pylint: disable=no-else-raise
             if resp[3] == 0x01:
                 raise RuntimeError("PWM different frequency on same slice.")
             elif resp[3] == 0x02:
@@ -430,12 +472,14 @@ class Pico_u2if:
                 raise RuntimeError("PWM frequency error.")
 
     def pwm_get_duty_cycle(self, pin):
+        """PWM get duty cycle."""
         resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
         if resp[1] != self.RESP_OK:
             raise RuntimeError("PWM get duty cycle error.")
         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
 
     def pwm_set_duty_cycle(self, pin, duty_cycle):
+        """PWM set duty cycle."""
         resp = self._hid_xfer(
             bytes([self.PWM_SET_DUTY_U16, pin.id])
             + duty_cycle.to_bytes(2, byteorder="little"),