]> Repositories - Adafruit_Blinka-hackapet.git/blob - src/adafruit_blinka/microcontroller/rp2040_u2if/rp2040_u2if.py
initial working feather
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / rp2040_u2if / rp2040_u2if.py
1 """Helper class for use with RP2040 running u2if firmware"""
2 # https://github.com/execuc/u2if
3
4 import os
5 import time
6 import hid
7
8 # Use to set delay between reset and device reopen. if negative, don't reset at all
9 RP2040_U2IF_RESET_DELAY = float(os.environ.get("RP2040_U2IF_RESET_DELAY", 1))
10
11 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
12 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
13
14
15 class RP2040_u2if:
16     """Helper class for use with RP2040 running u2if firmware"""
17
18     # MISC
19     RESP_OK = 0x01
20     SYS_RESET = 0x10
21
22     # GPIO
23     GPIO_INIT_PIN = 0x20
24     GPIO_SET_VALUE = 0x21
25     GPIO_GET_VALUE = 0x22
26
27     # ADC
28     ADC_INIT_PIN = 0x40
29     ADC_GET_VALUE = 0x41
30
31     # I2C
32     I2C0_INIT = 0x80
33     I2C0_DEINIT = 0x81
34     I2C0_WRITE = 0x82
35     I2C0_READ = 0x83
36     I2C0_WRITE_FROM_UART = 0x84
37     I2C1_INIT = I2C0_INIT + 0x10
38     I2C1_DEINIT = I2C0_DEINIT + 0x10
39     I2C1_WRITE = I2C0_WRITE + 0x10
40     I2C1_READ = I2C0_READ + 0x10
41     I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
42
43     # SPI
44     SPI0_INIT = 0x60
45     SPI0_DEINIT = 0x61
46     SPI0_WRITE = 0x62
47     SPI0_READ = 0x63
48     SPI0_WRITE_FROM_UART = 0x64
49     SPI1_INIT = SPI0_INIT + 0x10
50     SPI1_DEINIT = SPI0_DEINIT + 0x10
51     SPI1_WRITE = SPI0_WRITE + 0x10
52     SPI1_READ = SPI0_READ + 0x10
53     SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
54
55     # WS2812B (LED)
56     WS2812B_INIT = 0xA0
57     WS2812B_DEINIT = 0xA1
58     WS2812B_WRITE = 0xA2
59
60     # PWM
61     PWM_INIT_PIN = 0x30
62     PWM_DEINIT_PIN = 0x31
63     PWM_SET_FREQ = 0x32
64     PWM_GET_FREQ = 0x33
65     PWM_SET_DUTY_U16 = 0x34
66     PWM_GET_DUTY_U16 = 0x35
67     PWM_SET_DUTY_NS = 0x36
68     PWM_GET_DUTY_NS = 0x37
69
70     def __init__(self):
71         self._opened = False
72         self._i2c_index = None
73         self._spi_index = None
74         self._serial = None
75         self._neopixel_initialized = False
76         # self._vid = vid
77         # self._pid = pid
78         # self._hid = hid.device()
79         # self._hid.open(self._vid, self._pid)
80         # if RP2040_U2IF_RESET_DELAY >= 0:
81         #     self._reset()
82         # self._i2c_index = None
83         # self._spi_index = None
84         # self._serial = None
85         # self._neopixel_initialized = False
86         # self._uart_rx_buffer = None
87
88     def _hid_xfer(self, report, response=True):
89         """Perform HID Transfer"""
90         # first byte is report ID, which =0
91         # remaing bytes = 64 byte report data
92         # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
93         self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
94         if response:
95             # return is 64 byte response report
96             return self._hid.read(64)
97         return None
98
99     def _reset(self):
100         self._hid_xfer(bytes([self.SYS_RESET]), False)
101         time.sleep(RP2040_U2IF_RESET_DELAY)
102         start = time.monotonic()
103         while time.monotonic() - start < 5:
104             try:
105                 self._hid.open(self._vid, self._pid)
106             except OSError:
107                 time.sleep(0.1)
108                 continue
109             return
110         raise OSError("RP2040 u2if open error.")
111
112     # ----------------------------------------------------------------
113     # MISC
114     # ----------------------------------------------------------------
115     def open(self, vid, pid):
116         if self._opened:
117             return
118         self._vid = vid
119         self._pid = pid
120         self._hid = hid.device()
121         self._hid.open(self._vid, self._pid)
122         if RP2040_U2IF_RESET_DELAY >= 0:
123             self._reset()
124         self._opened = True
125
126     # ----------------------------------------------------------------
127     # GPIO
128     # ----------------------------------------------------------------
129     def gpio_init_pin(self, pin_id, direction, pull):
130         """Configure GPIO Pin."""
131         self._hid_xfer(
132             bytes(
133                 [
134                     self.GPIO_INIT_PIN,
135                     pin_id,
136                     direction,
137                     pull,
138                 ]
139             )
140         )
141
142     def gpio_set_pin(self, pin_id, value):
143         """Set Current GPIO Pin Value"""
144         self._hid_xfer(
145             bytes(
146                 [
147                     self.GPIO_SET_VALUE,
148                     pin_id,
149                     int(value),
150                 ]
151             )
152         )
153
154     def gpio_get_pin(self, pin_id):
155         """Get Current GPIO Pin Value"""
156         resp = self._hid_xfer(
157             bytes(
158                 [
159                     self.GPIO_GET_VALUE,
160                     pin_id,
161                 ]
162             ),
163             True,
164         )
165         return resp[3] != 0x00
166
167     # ----------------------------------------------------------------
168     # ADC
169     # ----------------------------------------------------------------
170     def adc_init_pin(self, pin_id):
171         """Configure ADC Pin."""
172         self._hid_xfer(
173             bytes(
174                 [
175                     self.ADC_INIT_PIN,
176                     pin_id,
177                 ]
178             )
179         )
180
181     def adc_get_value(self, pin_id):
182         """Get ADC value for pin."""
183         resp = self._hid_xfer(
184             bytes(
185                 [
186                     self.ADC_GET_VALUE,
187                     pin_id,
188                 ]
189             ),
190             True,
191         )
192         return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
193
194     # ----------------------------------------------------------------
195     # I2C
196     # ----------------------------------------------------------------
197     def i2c_configure(self, baudrate, pullup=False):
198         """Configure I2C."""
199         if self._i2c_index is None:
200             raise RuntimeError("I2C bus not initialized.")
201
202         resp = self._hid_xfer(
203             bytes(
204                 [
205                     self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
206                     0x00 if not pullup else 0x01,
207                 ]
208             )
209             + baudrate.to_bytes(4, byteorder="little"),
210             True,
211         )
212         if resp[1] != self.RESP_OK:
213             raise RuntimeError("I2C init error.")
214
215     def i2c_set_port(self, index):
216         """Set I2C port."""
217         if index not in (0, 1):
218             raise ValueError("I2C index must be 0 or 1.")
219         self._i2c_index = index
220
221     def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
222         """Write data from the buffer to an address"""
223         if self._i2c_index is None:
224             raise RuntimeError("I2C bus not initialized.")
225
226         end = end if end else len(buffer)
227
228         write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
229         stop_flag = 0x01 if stop else 0x00
230
231         while (end - start) > 0:
232             remain_bytes = end - start
233             chunk = min(remain_bytes, 64 - 7)
234             resp = self._hid_xfer(
235                 bytes([write_cmd, address, stop_flag])
236                 + remain_bytes.to_bytes(4, byteorder="little")
237                 + buffer[start : (start + chunk)],
238                 True,
239             )
240             if resp[1] != self.RESP_OK:
241                 raise RuntimeError("I2C write error")
242             start += chunk
243
244     def _i2c_read(self, address, buffer, start=0, end=None):
245         """Read data from an address and into the buffer"""
246         # TODO: support chunkified reads
247         if self._i2c_index is None:
248             raise RuntimeError("I2C bus not initialized.")
249
250         end = end if end else len(buffer)
251
252         read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
253         stop_flag = 0x01  # always stop
254         read_size = end - start
255
256         resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
257         if resp[1] != self.RESP_OK:
258             raise RuntimeError("I2C write error")
259         # move into buffer
260         for i in range(read_size):
261             buffer[start + i] = resp[i + 2]
262
263     def i2c_writeto(self, address, buffer, *, start=0, end=None):
264         """Write data from the buffer to an address"""
265         self._i2c_write(address, buffer, start, end)
266
267     def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
268         """Read data from an address and into the buffer"""
269         self._i2c_read(address, buffer, start, end)
270
271     def i2c_writeto_then_readfrom(
272         self,
273         address,
274         out_buffer,
275         in_buffer,
276         *,
277         out_start=0,
278         out_end=None,
279         in_start=0,
280         in_end=None
281     ):
282         """Write data from buffer_out to an address and then
283         read data from an address and into buffer_in
284         """
285         self._i2c_write(address, out_buffer, out_start, out_end, False)
286         self._i2c_read(address, in_buffer, in_start, in_end)
287
288     def i2c_scan(self, *, start=0, end=0x79):
289         """Perform an I2C Device Scan"""
290         if self._i2c_index is None:
291             raise RuntimeError("I2C bus not initialized.")
292         found = []
293         for addr in range(start, end + 1):
294             # try a write
295             try:
296                 self.i2c_writeto(addr, b"\x00\x00\x00")
297             except RuntimeError:  # no reply!
298                 continue
299             # store if success
300             found.append(addr)
301         return found
302
303     # ----------------------------------------------------------------
304     # SPI
305     # ----------------------------------------------------------------
306     def spi_configure(self, baudrate):
307         """Configure SPI."""
308         if self._spi_index is None:
309             raise RuntimeError("SPI bus not initialized.")
310
311         resp = self._hid_xfer(
312             bytes(
313                 [
314                     self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
315                     0x00,  # mode, not yet implemented
316                 ]
317             )
318             + baudrate.to_bytes(4, byteorder="little"),
319             True,
320         )
321         if resp[1] != self.RESP_OK:
322             raise RuntimeError("SPI init error.")
323
324     def spi_set_port(self, index):
325         """Set SPI port."""
326         if index not in (0, 1):
327             raise ValueError("SPI index must be 0 or 1.")
328         self._spi_index = index
329
330     def spi_write(self, buffer, *, start=0, end=None):
331         """SPI write."""
332         if self._spi_index is None:
333             raise RuntimeError("SPI bus not initialized.")
334
335         end = end if end else len(buffer)
336
337         write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
338
339         while (end - start) > 0:
340             remain_bytes = end - start
341             chunk = min(remain_bytes, 64 - 3)
342             resp = self._hid_xfer(
343                 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
344             )
345             if resp[1] != self.RESP_OK:
346                 raise RuntimeError("SPI write error")
347             start += chunk
348
349     def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
350         """SPI readinto."""
351         if self._spi_index is None:
352             raise RuntimeError("SPI bus not initialized.")
353
354         end = end if end else len(buffer)
355         read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
356         read_size = end - start
357
358         resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
359         if resp[1] != self.RESP_OK:
360             raise RuntimeError("SPI write error")
361         # move into buffer
362         for i in range(read_size):
363             buffer[start + i] = resp[i + 2]
364
365     def spi_write_readinto(
366         self,
367         buffer_out,
368         buffer_in,
369         *,
370         out_start=0,
371         out_end=None,
372         in_start=0,
373         in_end=None
374     ):
375         """SPI write and readinto."""
376         raise NotImplementedError("SPI write_readinto Not implemented")
377
378     # ----------------------------------------------------------------
379     # NEOPIXEL
380     # ----------------------------------------------------------------
381     def neopixel_write(self, gpio, buf):
382         """NeoPixel write."""
383         print("open serial")
384         # open serial (data is sent over this)
385         if self._serial is None:
386             import serial
387             import serial.tools.list_ports
388
389             ports = serial.tools.list_ports.comports()
390             for port in ports:
391                 if port.vid == self._vid and port.pid == self._pid:
392                     self._serial = serial.Serial(port.device)
393                     break
394         if self._serial is None:
395             raise RuntimeError("Could not find Pico com port.")
396
397         print("init")
398         # init
399         if not self._neopixel_initialized:
400             # deinit any current setup
401             # pylint: disable=protected-access
402             self._hid_xfer(bytes([self.WS2812B_DEINIT]))
403             resp = self._hid_xfer(
404                 bytes(
405                     [
406                         self.WS2812B_INIT,
407                         gpio._pin.id,
408                     ]
409                 ),
410                 True,
411             )
412             if resp[1] != self.RESP_OK:
413                 raise RuntimeError("Neopixel init error")
414             self._neopixel_initialized = True
415
416         self._serial.reset_output_buffer()
417
418         print("write")
419         # write
420         # command is done over HID
421         remain_bytes = len(buf)
422         resp = self._hid_xfer(
423             bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
424             True,
425         )
426         if resp[1] != self.RESP_OK:
427             # pylint: disable=no-else-raise
428             if resp[2] == 0x01:
429                 raise RuntimeError(
430                     "Neopixel write error : too many pixel for the firmware."
431                 )
432             elif resp[2] == 0x02:
433                 print(resp[0:10])
434                 raise RuntimeError(
435                     "Neopixel write error : transfer already in progress."
436                 )
437             else:
438                 raise RuntimeError("Neopixel write error.")
439         print("write 1")
440         # buffer is sent over serial
441         self._serial.write(buf)
442         # hack (see u2if)
443         print("write 2")
444         if len(buf) % 64 == 0:
445             self._serial.write([0])
446         self._serial.flush()
447         # polling loop to wait for write complete?
448         print("write 3")
449         time.sleep(0.1)
450         resp = self._hid.read(64)
451         print("write 4")
452         while resp[0] != self.WS2812B_WRITE:
453             resp = self._hid.read(64)
454         print("write 5")
455         if resp[1] != self.RESP_OK:
456             raise RuntimeError("Neopixel write (flush) error.")
457
458     # ----------------------------------------------------------------
459     # PWM
460     # ----------------------------------------------------------------
461     # pylint: disable=unused-argument
462     def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
463         """Configure PWM."""
464         self.pwm_deinit(pin)
465         resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
466         if resp[1] != self.RESP_OK:
467             raise RuntimeError("PWM init error.")
468
469         self.pwm_set_frequency(pin, frequency)
470         self.pwm_set_duty_cycle(pin, duty_cycle)
471
472     def pwm_deinit(self, pin):
473         """Deinit PWM."""
474         self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
475
476     def pwm_get_frequency(self, pin):
477         """PWM get freq."""
478         resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
479         if resp[1] != self.RESP_OK:
480             raise RuntimeError("PWM get frequency error.")
481         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
482
483     def pwm_set_frequency(self, pin, frequency):
484         """PWM set freq."""
485         resp = self._hid_xfer(
486             bytes([self.PWM_SET_FREQ, pin.id])
487             + frequency.to_bytes(4, byteorder="little"),
488             True,
489         )
490         if resp[1] != self.RESP_OK:
491             # pylint: disable=no-else-raise
492             if resp[3] == 0x01:
493                 raise RuntimeError("PWM different frequency on same slice.")
494             elif resp[3] == 0x02:
495                 raise RuntimeError("PWM frequency too low.")
496             elif resp[3] == 0x03:
497                 raise RuntimeError("PWM frequency too high.")
498             else:
499                 raise RuntimeError("PWM frequency error.")
500
501     def pwm_get_duty_cycle(self, pin):
502         """PWM get duty cycle."""
503         resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
504         if resp[1] != self.RESP_OK:
505             raise RuntimeError("PWM get duty cycle error.")
506         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
507
508     def pwm_set_duty_cycle(self, pin, duty_cycle):
509         """PWM set duty cycle."""
510         resp = self._hid_xfer(
511             bytes([self.PWM_SET_DUTY_U16, pin.id])
512             + duty_cycle.to_bytes(2, byteorder="little"),
513             True,
514         )
515         if resp[1] != self.RESP_OK:
516             raise RuntimeError("PWM set duty cycle error.")
517
518 rp2040_u2if = RP2040_u2if()