]> Repositories - hackapet/Adafruit_Blinka_Displayio.git/blobdiff - displayio/_i2cdisplay.py
Run pre-commit
[hackapet/Adafruit_Blinka_Displayio.git] / displayio / _i2cdisplay.py
index 26476599fa61acc0379b3d3b27c5b54163feb86d..9611fb09187a687351c308a3cbb07097997b8eca 100644 (file)
@@ -23,7 +23,7 @@ displayio for Blinka
 import time
 import busio
 import digitalio
 import time
 import busio
 import digitalio
-import circuitpython_typing
+from circuitpython_typing import ReadableBuffer
 from ._constants import CHIP_SELECT_UNTOUCHED, DISPLAY_COMMAND
 
 __version__ = "0.0.0+auto.0"
 from ._constants import CHIP_SELECT_UNTOUCHED, DISPLAY_COMMAND
 
 __version__ = "0.0.0+auto.0"
@@ -53,6 +53,15 @@ class I2CDisplay:
         self._i2c = i2c_bus
         self._dev_addr = device_address
 
         self._i2c = i2c_bus
         self._dev_addr = device_address
 
+    def __new__(cls, *args, **kwargs):
+        from . import (  # pylint: disable=import-outside-toplevel, cyclic-import
+            allocate_display_bus,
+        )
+
+        display_bus_instance = super().__new__(cls)
+        allocate_display_bus(display_bus_instance)
+        return display_bus_instance
+
     def _release(self):
         self.reset()
         self._i2c.deinit()
     def _release(self):
         self.reset()
         self._i2c.deinit()
@@ -71,12 +80,7 @@ class I2CDisplay:
         time.sleep(0.0001)
         self._reset.value = True
 
         time.sleep(0.0001)
         self._reset.value = True
 
-    def _begin_transaction(self) -> None:
-        """Lock the bus before sending data."""
-        while not self._i2c.try_lock():
-            pass
-
-    def send(self, command: int, data: circuitpython_typing.ReadableBuffer) -> None:
+    def send(self, command: int, data: ReadableBuffer) -> None:
         """
         Sends the given command value followed by the full set of data. Display state,
         such as vertical scroll, set via ``send`` may or may not be reset once the code is
         """
         Sends the given command value followed by the full set of data. Display state,
         such as vertical scroll, set via ``send`` may or may not be reset once the code is
@@ -89,10 +93,9 @@ class I2CDisplay:
     def _send(
         self,
         data_type: int,
     def _send(
         self,
         data_type: int,
-        chip_select: int,
-        data: circuitpython_typing.ReadableBuffer,
+        _chip_select: int,  # Chip select behavior
+        data: ReadableBuffer,
     ):
     ):
-        # pylint: disable=unused-argument
         if data_type == DISPLAY_COMMAND:
             n = len(data)
             if n > 0:
         if data_type == DISPLAY_COMMAND:
             n = len(data)
             if n > 0:
@@ -101,12 +104,37 @@ class I2CDisplay:
                     command_bytes[2 * i] = 0x80
                     command_bytes[2 * i + 1] = data[i]
 
                     command_bytes[2 * i] = 0x80
                     command_bytes[2 * i + 1] = data[i]
 
-                self._i2c.writeto(self._dev_addr, buffer=command_bytes, stop=True)
+            try:
+                self._i2c.writeto(self._dev_addr, buffer=command_bytes)
+            except OSError as error:
+                if error.errno == 121:
+                    raise RuntimeError(
+                        f"I2C write error to 0x{self._dev_addr:02x}"
+                    ) from error
+                raise error
         else:
             data_bytes = bytearray(len(data) + 1)
             data_bytes[0] = 0x40
             data_bytes[1:] = data
         else:
             data_bytes = bytearray(len(data) + 1)
             data_bytes[0] = 0x40
             data_bytes[1:] = data
-            self._i2c.writeto(self._dev_addr, buffer=data_bytes, stop=True)
+            try:
+                self._i2c.writeto(self._dev_addr, buffer=data_bytes)
+            except OSError as error:
+                if error.errno == 121:
+                    raise RuntimeError(
+                        f"I2C write error to 0x{self._dev_addr:02x}"
+                    ) from error
+                raise error
+
+    def _free(self) -> bool:
+        """Attempt to free the bus and return False if busy"""
+        if not self._i2c.try_lock():
+            return False
+        self._i2c.unlock()
+        return True
+
+    def _begin_transaction(self) -> bool:
+        """Lock the bus before sending data."""
+        return self._i2c.try_lock()
 
     def _end_transaction(self) -> None:
         """Release the bus after sending data."""
 
     def _end_transaction(self) -> None:
         """Release the bus after sending data."""