]> Repositories - Adafruit_Blinka-hackapet.git/blob - src/busio.py
Merge pull request #191 from binhollc/master
[Adafruit_Blinka-hackapet.git] / src / busio.py
1 """
2 `busio` - Bus protocol support like I2C and SPI
3 =================================================
4
5 See `CircuitPython:busio` in CircuitPython for more details.
6
7 * Author(s): cefn
8 """
9
10 import threading
11
12 from adafruit_blinka import Enum, Lockable, agnostic
13 from adafruit_blinka.agnostic import board_id, detector
14 import adafruit_platformdetect.board as ap_board
15
16 class I2C(Lockable):
17     def __init__(self, scl, sda, frequency=400000):
18         self.init(scl, sda, frequency)
19
20     def init(self, scl, sda, frequency):
21         self.deinit()
22         if detector.board.ftdi_ft232h:
23             from adafruit_blinka.microcontroller.ft232h.i2c import I2C
24             self._i2c = I2C()
25             return
26         elif detector.board.binho_nova:
27             from adafruit_blinka.microcontroller.nova.i2c import I2C
28             self._i2c = I2C()
29             return
30         elif detector.board.any_embedded_linux:
31             from adafruit_blinka.microcontroller.generic_linux.i2c import I2C as _I2C
32         else:
33             from machine import I2C as _I2C
34         from microcontroller.pin import i2cPorts
35         for portId, portScl, portSda in i2cPorts:
36             try:
37                 if scl == portScl and sda == portSda:
38                     self._i2c = _I2C(portId, mode=_I2C.MASTER, baudrate=frequency)
39                     break
40             except RuntimeError:
41                 pass
42         else:
43             raise ValueError(
44                 "No Hardware I2C on (scl,sda)={}\nValid I2C ports: {}".format((scl, sda), i2cPorts)
45             )
46
47         self._lock = threading.RLock()
48
49     def deinit(self):
50         try:
51             del self._i2c
52         except AttributeError:
53             pass
54
55     def __enter__(self):
56         self._lock.acquire()
57         return self
58
59     def __exit__(self, exc_type, exc_value, traceback):
60         self._lock.release()
61         self.deinit()
62
63     def scan(self):
64         return self._i2c.scan()
65
66     def readfrom_into(self, address, buffer, *, start=0, end=None):
67         if start is not 0 or end is not None:
68             if end is None:
69                 end = len(buffer)
70             buffer = memoryview(buffer)[start:end]
71         stop = True  # remove for efficiency later
72         return self._i2c.readfrom_into(address, buffer, stop=stop)
73
74     def writeto(self, address, buffer, *, start=0, end=None, stop=True):
75         if isinstance(buffer, str):
76             buffer = bytes([ord(x) for x in buffer])
77         if start is not 0 or end is not None:
78             if end is None:
79                 return self._i2c.writeto(address, memoryview(buffer)[start:], stop=stop)
80             else:
81                 return self._i2c.writeto(address, memoryview(buffer)[start:end], stop=stop)
82         return self._i2c.writeto(address, buffer, stop=stop)
83
84     def writeto_then_readfrom(self, address, buffer_out, buffer_in, *, out_start=0, out_end=None, in_start=0, in_end=None, stop=False):
85         return self._i2c.writeto_then_readfrom(address, buffer_out, buffer_in,
86                                                out_start=out_start, out_end=out_end,
87                                                in_start=in_start, in_end=in_end, stop=stop)
88
89 class SPI(Lockable):
90     def __init__(self, clock, MOSI=None, MISO=None):
91         self.deinit()
92         if detector.board.ftdi_ft232h:
93             from adafruit_blinka.microcontroller.ft232h.spi import SPI as _SPI
94             from adafruit_blinka.microcontroller.ft232h.pin import SCK, MOSI, MISO
95             self._spi = _SPI()
96             self._pins = (SCK, MOSI, MISO)
97             return
98         elif detector.board.binho_nova:
99             from adafruit_blinka.microcontroller.nova.spi import SPI as _SPI
100             from adafruit_blinka.microcontroller.nova.pin import SCK, MOSI, MISO
101             self._spi = _SPI(clock)
102             self._pins = (SCK, MOSI, MISO)
103             return
104         elif detector.board.any_embedded_linux:
105             from adafruit_blinka.microcontroller.generic_linux.spi import SPI as _SPI
106         else:
107             from machine import SPI as _SPI
108         from microcontroller.pin import spiPorts
109         for portId, portSck, portMosi, portMiso in spiPorts:
110             if ((clock == portSck) and                   # Clock is required!
111                 (MOSI == portMosi or MOSI == None) and   # But can do with just output
112                 (MISO == portMiso or MISO == None)):      # Or just input
113                 self._spi = _SPI(portId)
114                 self._pins = (portSck, portMosi, portMiso)
115                 break
116         else:
117             raise ValueError(
118                 "No Hardware SPI on (SCLK, MOSI, MISO)={}\nValid SPI ports:{}".
119                 format((clock, MOSI, MISO), spiPorts))
120
121     def configure(self, baudrate=100000, polarity=0, phase=0, bits=8):
122         if detector.board.any_raspberry_pi or detector.board.any_raspberry_pi_40_pin:
123             from adafruit_blinka.microcontroller.bcm283x.pin import Pin
124             from adafruit_blinka.microcontroller.generic_linux.spi import SPI as _SPI
125         elif detector.board.any_beaglebone:
126             from adafruit_blinka.microcontroller.am335x.pin import Pin
127             from adafruit_blinka.microcontroller.generic_linux.spi import SPI as _SPI
128         elif board_id == ap_board.ORANGE_PI_PC or board_id == ap_board.ORANGE_PI_R1 or board_id == ap_board.ORANGE_PI_ZERO:
129             from adafruit_blinka.microcontroller.allwinner_h3.pin import Pin
130             from adafruit_blinka.microcontroller.generic_linux.spi import SPI as _SPI
131         elif board_id == ap_board.GIANT_BOARD:
132             from adafruit_blinka.microcontroller.sama5.pin import Pin
133             from adafruit_blinka.microcontroller.generic_linux.spi import SPI as _SPI
134         elif board_id == ap_board.CORAL_EDGE_TPU_DEV:
135             from adafruit_blinka.microcontroller.nxp_imx8m.pin import Pin
136             from adafruit_blinka.microcontroller.generic_linux.spi import SPI as _SPI
137         elif board_id == ap_board.ODROID_C2:
138             from adafruit_blinka.microcontroller.amlogic.s905.pin import Pin
139             from adafruit_blinka.microcontroller.generic_linux.spi import SPI as _SPI
140         elif board_id == ap_board.DRAGONBOARD_410C:
141             from adafruit_blinka.microcontroller.snapdragon.apq8016.pin import Pin
142             from adafruit_blinka.microcontroller.generic_linux.spi import SPI as _SPI
143         elif board_id == ap_board.JETSON_NANO:
144             from adafruit_blinka.microcontroller.generic_linux.spi import SPI as _SPI
145             from adafruit_blinka.microcontroller.tegra.t210.pin import Pin
146         elif board_id == ap_board.JETSON_TX1:
147             from adafruit_blinka.microcontroller.generic_linux.spi import SPI as _SPI
148             from adafruit_blinka.microcontroller.tegra.t210.pin import Pin
149         elif board_id == ap_board.JETSON_TX2:
150             from adafruit_blinka.microcontroller.generic_linux.spi import SPI as _SPI
151             from adafruit_blinka.microcontroller.tegra.t186.pin import Pin
152         elif board_id == ap_board.JETSON_XAVIER:
153             from adafruit_blinka.microcontroller.generic_linux.spi import SPI as _SPI
154             from adafruit_blinka.microcontroller.tegra.t194.pin import Pin
155         elif detector.board.ftdi_ft232h:
156             from adafruit_blinka.microcontroller.ft232h.spi import SPI as _SPI
157             from adafruit_blinka.microcontroller.ft232h.pin import Pin
158         elif detector.board.binho_nova:
159             from adafruit_blinka.microcontroller.nova.spi import SPI as _SPI
160             from adafruit_blinka.microcontroller.nova.pin import Pin
161         else:
162             from machine import SPI as _SPI
163             from machine import Pin
164
165         if self._locked:
166             # TODO check if #init ignores MOSI=None rather than unsetting, to save _pinIds attribute
167             self._spi.init(
168                 baudrate=baudrate,
169                 polarity=polarity,
170                 phase=phase,
171                 bits=bits,
172                 firstbit=_SPI.MSB,
173                 sck=Pin(self._pins[0].id),
174                 mosi=Pin(self._pins[1].id),
175                 miso=Pin(self._pins[2].id)
176             )
177         else:
178             raise RuntimeError("First call try_lock()")
179
180     def deinit(self):
181         self._spi = None
182         self._pinIds = None
183
184     @property
185     def frequency(self):
186         try:
187             return self._spi.frequency
188         except AttributeError:
189             raise NotImplementedError("Frequency attribute not implemented for this platform")
190
191     def write(self, buf, start=0, end=None):
192         return self._spi.write(buf, start, end)
193
194     def readinto(self, buf, start=0, end=None, write_value=0):
195         return self._spi.readinto(buf, start, end, write_value=write_value)
196
197     def write_readinto(self, buffer_out, buffer_in,  out_start=0, out_end=None, in_start=0, in_end=None):
198         return self._spi.write_readinto(buffer_out, buffer_in, out_start, out_end, in_start, in_end)
199
200
201 class UART(Lockable):
202     class Parity(Enum):
203         pass
204
205     Parity.ODD = Parity()
206     Parity.EVEN = Parity()
207
208     def __init__(self,
209                  tx,
210                  rx,
211                  baudrate=9600,
212                  bits=8,
213                  parity=None,
214                  stop=1,
215                  timeout=1000,
216                  receiver_buffer_size=64,
217                  flow=None):
218         if detector.board.any_embedded_linux:
219             raise RuntimeError('busio.UART not supported on this platform. Please use pyserial instead.')
220         elif detector.board.binho_nova:
221             from adafruit_blinka.microcontroller.nova.uart import UART as _UART
222         else:
223             from machine import UART as _UART
224
225         if detector.board.binho_nova:
226             from adafruit_blinka.microcontroller.nova.pin import uartPorts
227         else:
228             from microcontroller.pin import uartPorts
229
230         self.baudrate = baudrate
231
232         if flow is not None:  # default 0
233             raise NotImplementedError(
234                 "Parameter '{}' unsupported on {}".format(
235                     "flow", agnostic.board_id))
236
237         # translate parity flag for Micropython
238         if parity is UART.Parity.ODD:
239             parity = 1
240         elif parity is UART.Parity.EVEN:
241             parity = 0
242         elif parity is None:
243             pass
244         else:
245             raise ValueError("Invalid parity")
246
247         # check tx and rx have hardware support
248         for portId, portTx, portRx in uartPorts:  #
249             if portTx == tx and portRx == rx:
250                 self._uart = _UART(
251                     portId,
252                     baudrate,
253                     bits=bits,
254                     parity=parity,
255                     stop=stop,
256                     timeout=timeout,
257                     read_buf_len=receiver_buffer_size
258                 )
259                 break
260         else:
261             raise ValueError(
262                 "No Hardware UART on (tx,rx)={}\nValid UART ports: {}".format((tx, rx), uartPorts)
263             )
264
265     def deinit(self):
266         if detector.board.binho_nova:
267             self._uart.deinit()
268         self._uart = None
269
270     def read(self, nbytes=None):
271         return self._uart.read(nbytes)
272
273     def readinto(self, buf, nbytes=None):
274         return self._uart.readinto(buf, nbytes)
275
276     def readline(self):
277         return self._uart.readline()
278
279     def write(self, buf):
280         return self._uart.write(buf)