]> Repositories - Adafruit_Blinka-hackapet.git/blob - src/adafruit_blinka/microcontroller/rp2040_u2if/rp2040_u2if.py
qt2040 trinkey updates
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / rp2040_u2if / rp2040_u2if.py
1 """Helper class for use with RP2040 running u2if firmware"""
2 # https://github.com/execuc/u2if
3
4 import os
5 import time
6 import hid
7
8 # Use to set delay between reset and device reopen. if negative, don't reset at all
9 RP2040_U2IF_RESET_DELAY = float(os.environ.get("RP2040_U2IF_RESET_DELAY", 1))
10
11 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
12 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
13
14
15 class RP2040_u2if:
16     """Helper class for use with RP2040 running u2if firmware"""
17
18     # MISC
19     RESP_OK = 0x01
20     SYS_RESET = 0x10
21
22     # GPIO
23     GPIO_INIT_PIN = 0x20
24     GPIO_SET_VALUE = 0x21
25     GPIO_GET_VALUE = 0x22
26
27     # ADC
28     ADC_INIT_PIN = 0x40
29     ADC_GET_VALUE = 0x41
30
31     # I2C
32     I2C0_INIT = 0x80
33     I2C0_DEINIT = 0x81
34     I2C0_WRITE = 0x82
35     I2C0_READ = 0x83
36     I2C0_WRITE_FROM_UART = 0x84
37     I2C1_INIT = I2C0_INIT + 0x10
38     I2C1_DEINIT = I2C0_DEINIT + 0x10
39     I2C1_WRITE = I2C0_WRITE + 0x10
40     I2C1_READ = I2C0_READ + 0x10
41     I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
42
43     # SPI
44     SPI0_INIT = 0x60
45     SPI0_DEINIT = 0x61
46     SPI0_WRITE = 0x62
47     SPI0_READ = 0x63
48     SPI0_WRITE_FROM_UART = 0x64
49     SPI1_INIT = SPI0_INIT + 0x10
50     SPI1_DEINIT = SPI0_DEINIT + 0x10
51     SPI1_WRITE = SPI0_WRITE + 0x10
52     SPI1_READ = SPI0_READ + 0x10
53     SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
54
55     # WS2812B (LED)
56     WS2812B_INIT = 0xA0
57     WS2812B_DEINIT = 0xA1
58     WS2812B_WRITE = 0xA2
59
60     # PWM
61     PWM_INIT_PIN = 0x30
62     PWM_DEINIT_PIN = 0x31
63     PWM_SET_FREQ = 0x32
64     PWM_GET_FREQ = 0x33
65     PWM_SET_DUTY_U16 = 0x34
66     PWM_GET_DUTY_U16 = 0x35
67     PWM_SET_DUTY_NS = 0x36
68     PWM_GET_DUTY_NS = 0x37
69
70     def __init__(self):
71         self._opened = False
72         self._i2c_index = None
73         self._spi_index = None
74         self._serial = None
75         self._neopixel_initialized = False
76         # self._vid = vid
77         # self._pid = pid
78         # self._hid = hid.device()
79         # self._hid.open(self._vid, self._pid)
80         # if RP2040_U2IF_RESET_DELAY >= 0:
81         #     self._reset()
82         # self._i2c_index = None
83         # self._spi_index = None
84         # self._serial = None
85         # self._neopixel_initialized = False
86         # self._uart_rx_buffer = None
87
88     def _hid_xfer(self, report, response=True):
89         """Perform HID Transfer"""
90         # first byte is report ID, which =0
91         # remaing bytes = 64 byte report data
92         # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
93         self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
94         if response:
95             # return is 64 byte response report
96             return self._hid.read(64)
97         return None
98
99     def _reset(self):
100         self._hid_xfer(bytes([self.SYS_RESET]), False)
101         time.sleep(RP2040_U2IF_RESET_DELAY)
102         start = time.monotonic()
103         while time.monotonic() - start < 5:
104             try:
105                 self._hid.open(self._vid, self._pid)
106             except OSError:
107                 time.sleep(0.1)
108                 continue
109             return
110         raise OSError("RP2040 u2if open error.")
111
112     # ----------------------------------------------------------------
113     # MISC
114     # ----------------------------------------------------------------
115     def open(self, vid, pid):
116         if self._opened:
117             return
118         self._vid = vid
119         self._pid = pid
120         self._hid = hid.device()
121         self._hid.open(self._vid, self._pid)
122         if RP2040_U2IF_RESET_DELAY >= 0:
123             self._reset()
124         self._opened = True
125
126     # ----------------------------------------------------------------
127     # GPIO
128     # ----------------------------------------------------------------
129     def gpio_init_pin(self, pin_id, direction, pull):
130         """Configure GPIO Pin."""
131         self._hid_xfer(
132             bytes(
133                 [
134                     self.GPIO_INIT_PIN,
135                     pin_id,
136                     direction,
137                     pull,
138                 ]
139             )
140         )
141
142     def gpio_set_pin(self, pin_id, value):
143         """Set Current GPIO Pin Value"""
144         self._hid_xfer(
145             bytes(
146                 [
147                     self.GPIO_SET_VALUE,
148                     pin_id,
149                     int(value),
150                 ]
151             )
152         )
153
154     def gpio_get_pin(self, pin_id):
155         """Get Current GPIO Pin Value"""
156         resp = self._hid_xfer(
157             bytes(
158                 [
159                     self.GPIO_GET_VALUE,
160                     pin_id,
161                 ]
162             ),
163             True,
164         )
165         return resp[3] != 0x00
166
167     # ----------------------------------------------------------------
168     # ADC
169     # ----------------------------------------------------------------
170     def adc_init_pin(self, pin_id):
171         """Configure ADC Pin."""
172         self._hid_xfer(
173             bytes(
174                 [
175                     self.ADC_INIT_PIN,
176                     pin_id,
177                 ]
178             )
179         )
180
181     def adc_get_value(self, pin_id):
182         """Get ADC value for pin."""
183         resp = self._hid_xfer(
184             bytes(
185                 [
186                     self.ADC_GET_VALUE,
187                     pin_id,
188                 ]
189             ),
190             True,
191         )
192         return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
193
194     # ----------------------------------------------------------------
195     # I2C
196     # ----------------------------------------------------------------
197     def i2c_configure(self, baudrate, pullup=False):
198         """Configure I2C."""
199         if self._i2c_index is None:
200             raise RuntimeError("I2C bus not initialized.")
201
202         resp = self._hid_xfer(
203             bytes(
204                 [
205                     self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
206                     0x00 if not pullup else 0x01,
207                 ]
208             )
209             + baudrate.to_bytes(4, byteorder="little"),
210             True,
211         )
212         if resp[1] != self.RESP_OK:
213             raise RuntimeError("I2C init error.")
214
215     def i2c_set_port(self, index):
216         """Set I2C port."""
217         if index not in (0, 1):
218             raise ValueError("I2C index must be 0 or 1.")
219         self._i2c_index = index
220
221     def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
222         """Write data from the buffer to an address"""
223         if self._i2c_index is None:
224             raise RuntimeError("I2C bus not initialized.")
225
226         end = end if end else len(buffer)
227
228         write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
229         stop_flag = 0x01 if stop else 0x00
230
231         while (end - start) > 0:
232             remain_bytes = end - start
233             chunk = min(remain_bytes, 64 - 7)
234             resp = self._hid_xfer(
235                 bytes([write_cmd, address, stop_flag])
236                 + remain_bytes.to_bytes(4, byteorder="little")
237                 + buffer[start : (start + chunk)],
238                 True,
239             )
240             if resp[1] != self.RESP_OK:
241                 raise RuntimeError("I2C write error")
242             start += chunk
243
244     def _i2c_read(self, address, buffer, start=0, end=None):
245         """Read data from an address and into the buffer"""
246         # TODO: support chunkified reads
247         if self._i2c_index is None:
248             raise RuntimeError("I2C bus not initialized.")
249
250         end = end if end else len(buffer)
251
252         read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
253         stop_flag = 0x01  # always stop
254         read_size = end - start
255
256         resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
257         if resp[1] != self.RESP_OK:
258             raise RuntimeError("I2C write error")
259         # move into buffer
260         for i in range(read_size):
261             buffer[start + i] = resp[i + 2]
262
263     def i2c_writeto(self, address, buffer, *, start=0, end=None):
264         """Write data from the buffer to an address"""
265         self._i2c_write(address, buffer, start, end)
266
267     def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
268         """Read data from an address and into the buffer"""
269         self._i2c_read(address, buffer, start, end)
270
271     def i2c_writeto_then_readfrom(
272         self,
273         address,
274         out_buffer,
275         in_buffer,
276         *,
277         out_start=0,
278         out_end=None,
279         in_start=0,
280         in_end=None
281     ):
282         """Write data from buffer_out to an address and then
283         read data from an address and into buffer_in
284         """
285         self._i2c_write(address, out_buffer, out_start, out_end, False)
286         self._i2c_read(address, in_buffer, in_start, in_end)
287
288     def i2c_scan(self, *, start=0, end=0x79):
289         """Perform an I2C Device Scan"""
290         if self._i2c_index is None:
291             raise RuntimeError("I2C bus not initialized.")
292         found = []
293         for addr in range(start, end + 1):
294             # try a write
295             try:
296                 self.i2c_writeto(addr, b"\x00\x00\x00")
297             except RuntimeError:  # no reply!
298                 continue
299             # store if success
300             found.append(addr)
301         return found
302
303     # ----------------------------------------------------------------
304     # SPI
305     # ----------------------------------------------------------------
306     def spi_configure(self, baudrate):
307         """Configure SPI."""
308         if self._spi_index is None:
309             raise RuntimeError("SPI bus not initialized.")
310
311         resp = self._hid_xfer(
312             bytes(
313                 [
314                     self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
315                     0x00,  # mode, not yet implemented
316                 ]
317             )
318             + baudrate.to_bytes(4, byteorder="little"),
319             True,
320         )
321         if resp[1] != self.RESP_OK:
322             raise RuntimeError("SPI init error.")
323
324     def spi_set_port(self, index):
325         """Set SPI port."""
326         if index not in (0, 1):
327             raise ValueError("SPI index must be 0 or 1.")
328         self._spi_index = index
329
330     def spi_write(self, buffer, *, start=0, end=None):
331         """SPI write."""
332         if self._spi_index is None:
333             raise RuntimeError("SPI bus not initialized.")
334
335         end = end if end else len(buffer)
336
337         write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
338
339         while (end - start) > 0:
340             remain_bytes = end - start
341             chunk = min(remain_bytes, 64 - 3)
342             resp = self._hid_xfer(
343                 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
344             )
345             if resp[1] != self.RESP_OK:
346                 raise RuntimeError("SPI write error")
347             start += chunk
348
349     def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
350         """SPI readinto."""
351         if self._spi_index is None:
352             raise RuntimeError("SPI bus not initialized.")
353
354         end = end if end else len(buffer)
355         read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
356         read_size = end - start
357
358         resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
359         if resp[1] != self.RESP_OK:
360             raise RuntimeError("SPI write error")
361         # move into buffer
362         for i in range(read_size):
363             buffer[start + i] = resp[i + 2]
364
365     def spi_write_readinto(
366         self,
367         buffer_out,
368         buffer_in,
369         *,
370         out_start=0,
371         out_end=None,
372         in_start=0,
373         in_end=None
374     ):
375         """SPI write and readinto."""
376         raise NotImplementedError("SPI write_readinto Not implemented")
377
378     # ----------------------------------------------------------------
379     # NEOPIXEL
380     # ----------------------------------------------------------------
381     def neopixel_write(self, gpio, buf):
382         """NeoPixel write."""
383         # open serial (data is sent over this)
384         if self._serial is None:
385             import serial
386             import serial.tools.list_ports
387
388             ports = serial.tools.list_ports.comports()
389             for port in ports:
390                 if port.vid == self._vid and port.pid == self._pid:
391                     self._serial = serial.Serial(port.device)
392                     break
393         if self._serial is None:
394             raise RuntimeError("Could not find Pico com port.")
395
396         # init
397         if not self._neopixel_initialized:
398             # deinit any current setup
399             # pylint: disable=protected-access
400             self._hid_xfer(bytes([self.WS2812B_DEINIT]))
401             resp = self._hid_xfer(
402                 bytes(
403                     [
404                         self.WS2812B_INIT,
405                         gpio._pin.id,
406                     ]
407                 ),
408                 True,
409             )
410             if resp[1] != self.RESP_OK:
411                 raise RuntimeError("Neopixel init error")
412             self._neopixel_initialized = True
413
414         self._serial.reset_output_buffer()
415
416         # write
417         # command is done over HID
418         remain_bytes = len(buf)
419         resp = self._hid_xfer(
420             bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
421             True,
422         )
423         if resp[1] != self.RESP_OK:
424             # pylint: disable=no-else-raise
425             if resp[2] == 0x01:
426                 raise RuntimeError(
427                     "Neopixel write error : too many pixel for the firmware."
428                 )
429             elif resp[2] == 0x02:
430                 raise RuntimeError(
431                     "Neopixel write error : transfer already in progress."
432                 )
433             else:
434                 raise RuntimeError("Neopixel write error.")
435         # buffer is sent over serial
436         self._serial.write(buf)
437         # hack (see u2if)
438         if len(buf) % 64 == 0:
439             self._serial.write([0])
440         self._serial.flush()
441         # polling loop to wait for write complete?
442         time.sleep(0.1)
443         resp = self._hid.read(64)
444         while resp[0] != self.WS2812B_WRITE:
445             resp = self._hid.read(64)
446         if resp[1] != self.RESP_OK:
447             raise RuntimeError("Neopixel write (flush) error.")
448
449     # ----------------------------------------------------------------
450     # PWM
451     # ----------------------------------------------------------------
452     # pylint: disable=unused-argument
453     def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
454         """Configure PWM."""
455         self.pwm_deinit(pin)
456         resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
457         if resp[1] != self.RESP_OK:
458             raise RuntimeError("PWM init error.")
459
460         self.pwm_set_frequency(pin, frequency)
461         self.pwm_set_duty_cycle(pin, duty_cycle)
462
463     def pwm_deinit(self, pin):
464         """Deinit PWM."""
465         self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
466
467     def pwm_get_frequency(self, pin):
468         """PWM get freq."""
469         resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
470         if resp[1] != self.RESP_OK:
471             raise RuntimeError("PWM get frequency error.")
472         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
473
474     def pwm_set_frequency(self, pin, frequency):
475         """PWM set freq."""
476         resp = self._hid_xfer(
477             bytes([self.PWM_SET_FREQ, pin.id])
478             + frequency.to_bytes(4, byteorder="little"),
479             True,
480         )
481         if resp[1] != self.RESP_OK:
482             # pylint: disable=no-else-raise
483             if resp[3] == 0x01:
484                 raise RuntimeError("PWM different frequency on same slice.")
485             elif resp[3] == 0x02:
486                 raise RuntimeError("PWM frequency too low.")
487             elif resp[3] == 0x03:
488                 raise RuntimeError("PWM frequency too high.")
489             else:
490                 raise RuntimeError("PWM frequency error.")
491
492     def pwm_get_duty_cycle(self, pin):
493         """PWM get duty cycle."""
494         resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
495         if resp[1] != self.RESP_OK:
496             raise RuntimeError("PWM get duty cycle error.")
497         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
498
499     def pwm_set_duty_cycle(self, pin, duty_cycle):
500         """PWM set duty cycle."""
501         resp = self._hid_xfer(
502             bytes([self.PWM_SET_DUTY_U16, pin.id])
503             + duty_cycle.to_bytes(2, byteorder="little"),
504             True,
505         )
506         if resp[1] != self.RESP_OK:
507             raise RuntimeError("PWM set duty cycle error.")
508
509 rp2040_u2if = RP2040_u2if()