]> Repositories - Adafruit_Blinka-hackapet.git/blob - src/adafruit_blinka/microcontroller/rp2040_u2if/rp2040_u2if.py
Merge pull request #540 from ezio-melotti/close-mcp2221
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / rp2040_u2if / rp2040_u2if.py
1 # SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries
2 #
3 # SPDX-License-Identifier: MIT
4 """Helper class for use with RP2040 running u2if firmware"""
5 # https://github.com/execuc/u2if
6
7 import os
8 import time
9 import hid
10
11 # Use to set delay between reset and device reopen. if negative, don't reset at all
12 RP2040_U2IF_RESET_DELAY = float(os.environ.get("RP2040_U2IF_RESET_DELAY", 1))
13
14 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
15 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
16
17
18 class RP2040_u2if:
19     """Helper class for use with RP2040 running u2if firmware"""
20
21     # MISC
22     RESP_OK = 0x01
23     SYS_RESET = 0x10
24
25     # GPIO
26     GPIO_INIT_PIN = 0x20
27     GPIO_SET_VALUE = 0x21
28     GPIO_GET_VALUE = 0x22
29
30     # ADC
31     ADC_INIT_PIN = 0x40
32     ADC_GET_VALUE = 0x41
33
34     # I2C
35     I2C0_INIT = 0x80
36     I2C0_DEINIT = 0x81
37     I2C0_WRITE = 0x82
38     I2C0_READ = 0x83
39     I2C0_WRITE_FROM_UART = 0x84
40     I2C1_INIT = I2C0_INIT + 0x10
41     I2C1_DEINIT = I2C0_DEINIT + 0x10
42     I2C1_WRITE = I2C0_WRITE + 0x10
43     I2C1_READ = I2C0_READ + 0x10
44     I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
45
46     # SPI
47     SPI0_INIT = 0x60
48     SPI0_DEINIT = 0x61
49     SPI0_WRITE = 0x62
50     SPI0_READ = 0x63
51     SPI0_WRITE_FROM_UART = 0x64
52     SPI1_INIT = SPI0_INIT + 0x10
53     SPI1_DEINIT = SPI0_DEINIT + 0x10
54     SPI1_WRITE = SPI0_WRITE + 0x10
55     SPI1_READ = SPI0_READ + 0x10
56     SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
57
58     # WS2812B (LED)
59     WS2812B_INIT = 0xA0
60     WS2812B_DEINIT = 0xA1
61     WS2812B_WRITE = 0xA2
62
63     # PWM
64     PWM_INIT_PIN = 0x30
65     PWM_DEINIT_PIN = 0x31
66     PWM_SET_FREQ = 0x32
67     PWM_GET_FREQ = 0x33
68     PWM_SET_DUTY_U16 = 0x34
69     PWM_GET_DUTY_U16 = 0x35
70     PWM_SET_DUTY_NS = 0x36
71     PWM_GET_DUTY_NS = 0x37
72
73     def __init__(self):
74         self._vid = None
75         self._pid = None
76         self._hid = None
77         self._opened = False
78         self._i2c_index = None
79         self._spi_index = None
80         self._serial = None
81         self._neopixel_initialized = False
82         self._uart_rx_buffer = None
83
84     def _hid_xfer(self, report, response=True):
85         """Perform HID Transfer"""
86         # first byte is report ID, which =0
87         # remaing bytes = 64 byte report data
88         # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
89         self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
90         if response:
91             # return is 64 byte response report
92             return self._hid.read(64)
93         return None
94
95     def _reset(self):
96         self._hid_xfer(bytes([self.SYS_RESET]), False)
97         time.sleep(RP2040_U2IF_RESET_DELAY)
98         start = time.monotonic()
99         while time.monotonic() - start < 5:
100             try:
101                 self._hid.open(self._vid, self._pid)
102             except OSError:
103                 time.sleep(0.1)
104                 continue
105             return
106         raise OSError("RP2040 u2if open error.")
107
108     # ----------------------------------------------------------------
109     # MISC
110     # ----------------------------------------------------------------
111     def open(self, vid, pid):
112         """Open HID interface for given USB VID and PID."""
113
114         if self._opened:
115             return
116         self._vid = vid
117         self._pid = pid
118         self._hid = hid.device()
119         self._hid.open(self._vid, self._pid)
120         if RP2040_U2IF_RESET_DELAY >= 0:
121             self._reset()
122         self._opened = True
123
124     # ----------------------------------------------------------------
125     # GPIO
126     # ----------------------------------------------------------------
127     def gpio_init_pin(self, pin_id, direction, pull):
128         """Configure GPIO Pin."""
129         self._hid_xfer(
130             bytes(
131                 [
132                     self.GPIO_INIT_PIN,
133                     pin_id,
134                     direction,
135                     pull,
136                 ]
137             )
138         )
139
140     def gpio_set_pin(self, pin_id, value):
141         """Set Current GPIO Pin Value"""
142         self._hid_xfer(
143             bytes(
144                 [
145                     self.GPIO_SET_VALUE,
146                     pin_id,
147                     int(value),
148                 ]
149             )
150         )
151
152     def gpio_get_pin(self, pin_id):
153         """Get Current GPIO Pin Value"""
154         resp = self._hid_xfer(
155             bytes(
156                 [
157                     self.GPIO_GET_VALUE,
158                     pin_id,
159                 ]
160             ),
161             True,
162         )
163         return resp[3] != 0x00
164
165     # ----------------------------------------------------------------
166     # ADC
167     # ----------------------------------------------------------------
168     def adc_init_pin(self, pin_id):
169         """Configure ADC Pin."""
170         self._hid_xfer(
171             bytes(
172                 [
173                     self.ADC_INIT_PIN,
174                     pin_id,
175                 ]
176             )
177         )
178
179     def adc_get_value(self, pin_id):
180         """Get ADC value for pin."""
181         resp = self._hid_xfer(
182             bytes(
183                 [
184                     self.ADC_GET_VALUE,
185                     pin_id,
186                 ]
187             ),
188             True,
189         )
190         return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
191
192     # ----------------------------------------------------------------
193     # I2C
194     # ----------------------------------------------------------------
195     def i2c_configure(self, baudrate, pullup=False):
196         """Configure I2C."""
197         if self._i2c_index is None:
198             raise RuntimeError("I2C bus not initialized.")
199
200         resp = self._hid_xfer(
201             bytes(
202                 [
203                     self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
204                     0x00 if not pullup else 0x01,
205                 ]
206             )
207             + baudrate.to_bytes(4, byteorder="little"),
208             True,
209         )
210         if resp[1] != self.RESP_OK:
211             raise RuntimeError("I2C init error.")
212
213     def i2c_set_port(self, index):
214         """Set I2C port."""
215         if index not in (0, 1):
216             raise ValueError("I2C index must be 0 or 1.")
217         self._i2c_index = index
218
219     def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
220         """Write data from the buffer to an address"""
221         if self._i2c_index is None:
222             raise RuntimeError("I2C bus not initialized.")
223
224         end = end if end else len(buffer)
225
226         write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
227         stop_flag = 0x01 if stop else 0x00
228
229         while (end - start) > 0:
230             remain_bytes = end - start
231             chunk = min(remain_bytes, 64 - 7)
232             resp = self._hid_xfer(
233                 bytes([write_cmd, address, stop_flag])
234                 + remain_bytes.to_bytes(4, byteorder="little")
235                 + buffer[start : (start + chunk)],
236                 True,
237             )
238             if resp[1] != self.RESP_OK:
239                 raise RuntimeError("I2C write error")
240             start += chunk
241
242     def _i2c_read(self, address, buffer, start=0, end=None):
243         """Read data from an address and into the buffer"""
244         # TODO: support chunkified reads
245         if self._i2c_index is None:
246             raise RuntimeError("I2C bus not initialized.")
247
248         end = end if end else len(buffer)
249
250         read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
251         stop_flag = 0x01  # always stop
252         read_size = end - start
253
254         resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
255         if resp[1] != self.RESP_OK:
256             raise RuntimeError("I2C write error")
257         # move into buffer
258         for i in range(read_size):
259             buffer[start + i] = resp[i + 2]
260
261     def i2c_writeto(self, address, buffer, *, start=0, end=None):
262         """Write data from the buffer to an address"""
263         self._i2c_write(address, buffer, start, end)
264
265     def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
266         """Read data from an address and into the buffer"""
267         self._i2c_read(address, buffer, start, end)
268
269     def i2c_writeto_then_readfrom(
270         self,
271         address,
272         out_buffer,
273         in_buffer,
274         *,
275         out_start=0,
276         out_end=None,
277         in_start=0,
278         in_end=None,
279     ):
280         """Write data from buffer_out to an address and then
281         read data from an address and into buffer_in
282         """
283         self._i2c_write(address, out_buffer, out_start, out_end, False)
284         self._i2c_read(address, in_buffer, in_start, in_end)
285
286     def i2c_scan(self, *, start=0, end=0x79):
287         """Perform an I2C Device Scan"""
288         if self._i2c_index is None:
289             raise RuntimeError("I2C bus not initialized.")
290         found = []
291         for addr in range(start, end + 1):
292             # try a write
293             try:
294                 self.i2c_writeto(addr, b"\x00\x00\x00")
295             except RuntimeError:  # no reply!
296                 continue
297             # store if success
298             found.append(addr)
299         return found
300
301     # ----------------------------------------------------------------
302     # SPI
303     # ----------------------------------------------------------------
304     def spi_configure(self, baudrate):
305         """Configure SPI."""
306         if self._spi_index is None:
307             raise RuntimeError("SPI bus not initialized.")
308
309         resp = self._hid_xfer(
310             bytes(
311                 [
312                     self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
313                     0x00,  # mode, not yet implemented
314                 ]
315             )
316             + baudrate.to_bytes(4, byteorder="little"),
317             True,
318         )
319         if resp[1] != self.RESP_OK:
320             raise RuntimeError("SPI init error.")
321
322     def spi_set_port(self, index):
323         """Set SPI port."""
324         if index not in (0, 1):
325             raise ValueError("SPI index must be 0 or 1.")
326         self._spi_index = index
327
328     def spi_write(self, buffer, *, start=0, end=None):
329         """SPI write."""
330         if self._spi_index is None:
331             raise RuntimeError("SPI bus not initialized.")
332
333         end = end if end else len(buffer)
334
335         write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
336
337         while (end - start) > 0:
338             remain_bytes = end - start
339             chunk = min(remain_bytes, 64 - 3)
340             resp = self._hid_xfer(
341                 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
342             )
343             if resp[1] != self.RESP_OK:
344                 raise RuntimeError("SPI write error")
345             start += chunk
346
347     def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
348         """SPI readinto."""
349         if self._spi_index is None:
350             raise RuntimeError("SPI bus not initialized.")
351
352         end = end if end else len(buffer)
353         read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
354         read_size = end - start
355
356         resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
357         if resp[1] != self.RESP_OK:
358             raise RuntimeError("SPI write error")
359         # move into buffer
360         for i in range(read_size):
361             buffer[start + i] = resp[i + 2]
362
363     def spi_write_readinto(
364         self,
365         buffer_out,
366         buffer_in,
367         *,
368         out_start=0,
369         out_end=None,
370         in_start=0,
371         in_end=None,
372     ):
373         """SPI write and readinto."""
374         raise NotImplementedError("SPI write_readinto Not implemented")
375
376     # ----------------------------------------------------------------
377     # NEOPIXEL
378     # ----------------------------------------------------------------
379     def neopixel_write(self, gpio, buf):
380         """NeoPixel write."""
381         # open serial (data is sent over this)
382         if self._serial is None:
383             import serial
384             import serial.tools.list_ports
385
386             ports = serial.tools.list_ports.comports()
387             for port in ports:
388                 if port.vid == self._vid and port.pid == self._pid:
389                     self._serial = serial.Serial(port.device)
390                     break
391         if self._serial is None:
392             raise RuntimeError("Could not find Pico com port.")
393
394         # init
395         if not self._neopixel_initialized:
396             # deinit any current setup
397             # pylint: disable=protected-access
398             self._hid_xfer(bytes([self.WS2812B_DEINIT]))
399             resp = self._hid_xfer(
400                 bytes(
401                     [
402                         self.WS2812B_INIT,
403                         gpio._pin.id,
404                     ]
405                 ),
406                 True,
407             )
408             if resp[1] != self.RESP_OK:
409                 raise RuntimeError("Neopixel init error")
410             self._neopixel_initialized = True
411
412         self._serial.reset_output_buffer()
413
414         # write
415         # command is done over HID
416         remain_bytes = len(buf)
417         resp = self._hid_xfer(
418             bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
419             True,
420         )
421         if resp[1] != self.RESP_OK:
422             # pylint: disable=no-else-raise
423             if resp[2] == 0x01:
424                 raise RuntimeError(
425                     "Neopixel write error : too many pixel for the firmware."
426                 )
427             elif resp[2] == 0x02:
428                 raise RuntimeError(
429                     "Neopixel write error : transfer already in progress."
430                 )
431             else:
432                 raise RuntimeError("Neopixel write error.")
433         # buffer is sent over serial
434         self._serial.write(buf)
435         # hack (see u2if)
436         if len(buf) % 64 == 0:
437             self._serial.write([0])
438         self._serial.flush()
439         # polling loop to wait for write complete?
440         time.sleep(0.1)
441         resp = self._hid.read(64)
442         while resp[0] != self.WS2812B_WRITE:
443             resp = self._hid.read(64)
444         if resp[1] != self.RESP_OK:
445             raise RuntimeError("Neopixel write (flush) error.")
446
447     # ----------------------------------------------------------------
448     # PWM
449     # ----------------------------------------------------------------
450     # pylint: disable=unused-argument
451     def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
452         """Configure PWM."""
453         self.pwm_deinit(pin)
454         resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
455         if resp[1] != self.RESP_OK:
456             raise RuntimeError("PWM init error.")
457
458         self.pwm_set_frequency(pin, frequency)
459         self.pwm_set_duty_cycle(pin, duty_cycle)
460
461     def pwm_deinit(self, pin):
462         """Deinit PWM."""
463         self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
464
465     def pwm_get_frequency(self, pin):
466         """PWM get freq."""
467         resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
468         if resp[1] != self.RESP_OK:
469             raise RuntimeError("PWM get frequency error.")
470         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
471
472     def pwm_set_frequency(self, pin, frequency):
473         """PWM set freq."""
474         resp = self._hid_xfer(
475             bytes([self.PWM_SET_FREQ, pin.id])
476             + frequency.to_bytes(4, byteorder="little"),
477             True,
478         )
479         if resp[1] != self.RESP_OK:
480             # pylint: disable=no-else-raise
481             if resp[3] == 0x01:
482                 raise RuntimeError("PWM different frequency on same slice.")
483             elif resp[3] == 0x02:
484                 raise RuntimeError("PWM frequency too low.")
485             elif resp[3] == 0x03:
486                 raise RuntimeError("PWM frequency too high.")
487             else:
488                 raise RuntimeError("PWM frequency error.")
489
490     def pwm_get_duty_cycle(self, pin):
491         """PWM get duty cycle."""
492         resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
493         if resp[1] != self.RESP_OK:
494             raise RuntimeError("PWM get duty cycle error.")
495         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
496
497     def pwm_set_duty_cycle(self, pin, duty_cycle):
498         """PWM set duty cycle."""
499         resp = self._hid_xfer(
500             bytes([self.PWM_SET_DUTY_U16, pin.id])
501             + duty_cycle.to_bytes(2, byteorder="little"),
502             True,
503         )
504         if resp[1] != self.RESP_OK:
505             raise RuntimeError("PWM set duty cycle error.")
506
507
508 rp2040_u2if = RP2040_u2if()