"""
import threading
+import time
from typing import Union
-import fourwire
-import i2cdisplaybus
-from busdisplay import BusDisplay
-from busdisplay._displaybus import _DisplayBus
-from epaperdisplay import EPaperDisplay
from ._bitmap import Bitmap
from ._colorspace import Colorspace
from ._colorconverter import ColorConverter
from ._tilegrid import TileGrid
from ._constants import CIRCUITPY_DISPLAY_LIMIT
-# 8.x Backwards compatibility, remove at 10.x or
-# when compatibility is removed from core displayio
-Display = BusDisplay
-FourWire = fourwire.FourWire
-I2CDisplay = i2cdisplaybus.I2CDisplayBus
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_Blinka_displayio.git"
display_buses = []
-def _background():
+def _background(stop_event):
"""Main thread function to loop through all displays and update them"""
- while True:
+ while not stop_event.is_set():
for display in displays:
display._background() # pylint: disable=protected-access
+ # relax system when _background does nothing
+ # and we are in a while True loop consuming lots of CPU
+ time.sleep(0.0)
+
def release_displays() -> None:
"""Releases any actively used displays so their busses and pins can be used again.
display_buses.clear()
-def allocate_display(new_display: Union[BusDisplay, EPaperDisplay]) -> None:
+def allocate_display(
+ new_display: Union["busdisplay.BusDisplay", "epaperdisplay.EPaperDisplay"]
+) -> None:
"""Add a display to the displays pool and return the new display"""
if len(displays) >= CIRCUITPY_DISPLAY_LIMIT:
raise RuntimeError("Too many displays")
displays.append(new_display)
-def allocate_display_bus(new_display_bus: _DisplayBus) -> None:
+def allocate_display_bus(new_display_bus: "busdisplay._displaybus._DisplayBus") -> None:
"""Add a display bus to the display_buses pool and return the new display bus"""
if len(display_buses) >= CIRCUITPY_DISPLAY_LIMIT:
raise RuntimeError(
display_buses.append(new_display_bus)
-background_thread = threading.Thread(target=_background, daemon=True)
+background_thread_stop_event = threading.Event()
+background_thread = threading.Thread(
+ target=_background, args=(background_thread_stop_event,), daemon=True
+)
# Start the background thread
def _stop_background():
if background_thread.is_alive():
+ background_thread_stop_event.set()
# Stop the thread
background_thread.join()