]> Repositories - Adafruit_Blinka-hackapet.git/blob - src/adafruit_blinka/microcontroller/pico_u2if/pico_u2if.py
f5f3945ba3403ffca9fe23a13702a7f700fd957b
[Adafruit_Blinka-hackapet.git] / src / adafruit_blinka / microcontroller / pico_u2if / pico_u2if.py
1 """Chip Definition for Pico with u2if firmware"""
2 # https://github.com/execuc/u2if
3
4 import time
5 import hid
6
7 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
8 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
9
10
11 class Pico_u2if:
12     """MCP2221 Device Class Definition"""
13
14     VID = 0xCAFE
15     PID = 0x4005
16
17     # MISC
18     RESP_OK = 0x01
19     SYS_RESET = 0x10
20
21     # GPIO
22     GPIO_INIT_PIN = 0x20
23     GPIO_SET_VALUE = 0x21
24     GPIO_GET_VALUE = 0x22
25
26     # ADC
27     ADC_INIT_PIN = 0x40
28     ADC_GET_VALUE = 0x41
29
30     # I2C
31     I2C0_INIT = 0x80
32     I2C0_DEINIT = 0x81
33     I2C0_WRITE = 0x82
34     I2C0_READ = 0x83
35     I2C0_WRITE_FROM_UART = 0x84
36     I2C1_INIT = I2C0_INIT + 0x10
37     I2C1_DEINIT = I2C0_DEINIT + 0x10
38     I2C1_WRITE = I2C0_WRITE + 0x10
39     I2C1_READ = I2C0_READ + 0x10
40     I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
41
42     # SPI
43     SPI0_INIT = 0x60
44     SPI0_DEINIT = 0x61
45     SPI0_WRITE = 0x62
46     SPI0_READ = 0x63
47     SPI0_WRITE_FROM_UART = 0x64
48     SPI1_INIT = SPI0_INIT + 0x10
49     SPI1_DEINIT = SPI0_DEINIT + 0x10
50     SPI1_WRITE = SPI0_WRITE + 0x10
51     SPI1_READ = SPI0_READ + 0x10
52     SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
53
54     # WS2812B (LED)
55     WS2812B_INIT = 0xA0
56     WS2812B_DEINIT = 0xA1
57     WS2812B_WRITE = 0xA2
58
59     # PWM
60     PWM_INIT_PIN = 0x30
61     PWM_DEINIT_PIN = 0x31
62     PWM_SET_FREQ = 0x32
63     PWM_GET_FREQ = 0x33
64     PWM_SET_DUTY_U16 = 0x34
65     PWM_GET_DUTY_U16 = 0x35
66     PWM_SET_DUTY_NS = 0x36
67     PWM_GET_DUTY_NS = 0x37
68
69     def __init__(self):
70         self._i2c_index = None
71         self._spi_index = None
72         self._serial = None
73         self._neopixel_initialized = False
74         self._uart_rx_buffer = None
75         self._reset()
76
77     def _hid_xfer(self, report, response=True):
78         """Perform HID Transfer"""
79         # first byte is report ID, which =0
80         # remaing bytes = 64 byte report data
81         # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
82         self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
83         if response:
84             # return is 64 byte response report
85             return self._hid.read(64)
86         return None
87
88     def _reset(self):
89         # get a HID device
90         self._hid = hid.device()
91         # open and reset
92         self._hid.open(Pico_u2if.VID, Pico_u2if.PID)
93         resp = self._hid_xfer(bytes([self.SYS_RESET]), True)
94         if resp[1] != self.RESP_OK:
95             raise RuntimeError("Reset error.")
96         # reopen
97         max_retry = 10
98         retries = 0
99         while True:
100             try:
101                 self._hid.open(Pico_u2if.VID, Pico_u2if.PID)
102                 return True
103             except OSError:
104                 if retries >= max_retry:
105                     break
106                 time.sleep(0.1)
107                 retries += 1
108         return False
109
110     # ----------------------------------------------------------------
111     # GPIO
112     # ----------------------------------------------------------------
113     def gpio_init_pin(self, pin_id, direction, pull):
114         """Configure GPIO Pin."""
115         self._hid_xfer(
116             bytes(
117                 [
118                     self.GPIO_INIT_PIN,
119                     pin_id,
120                     direction,
121                     pull,
122                 ]
123             )
124         )
125
126     def gpio_set_pin(self, pin_id, value):
127         """Set Current GPIO Pin Value"""
128         self._hid_xfer(
129             bytes(
130                 [
131                     self.GPIO_SET_VALUE,
132                     pin_id,
133                     int(value),
134                 ]
135             )
136         )
137
138     def gpio_get_pin(self, pin_id):
139         """Get Current GPIO Pin Value"""
140         resp = self._hid_xfer(
141             bytes(
142                 [
143                     self.GPIO_GET_VALUE,
144                     pin_id,
145                 ]
146             ),
147             True,
148         )
149         return resp[3] != 0x00
150
151     # ----------------------------------------------------------------
152     # ADC
153     # ----------------------------------------------------------------
154     def adc_init_pin(self, pin_id):
155         """Configure ADC Pin."""
156         self._hid_xfer(
157             bytes(
158                 [
159                     self.ADC_INIT_PIN,
160                     pin_id,
161                 ]
162             )
163         )
164
165     def adc_get_value(self, pin_id):
166         """Get ADC value for pin."""
167         resp = self._hid_xfer(
168             bytes(
169                 [
170                     self.ADC_GET_VALUE,
171                     pin_id,
172                 ]
173             ),
174             True,
175         )
176         return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
177
178     # ----------------------------------------------------------------
179     # I2C
180     # ----------------------------------------------------------------
181     def i2c_configure(self, baudrate, pullup=False):
182         """Configure I2C."""
183         if self._i2c_index is None:
184             raise RuntimeError("I2C bus not initialized.")
185
186         resp = self._hid_xfer(
187             bytes(
188                 [
189                     self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
190                     0x00 if not pullup else 0x01,
191                 ]
192             )
193             + baudrate.to_bytes(4, byteorder="little"),
194             True,
195         )
196         if resp[1] != self.RESP_OK:
197             raise RuntimeError("I2C init error.")
198
199     def i2c_set_port(self, index):
200         """Set I2C port."""
201         if index not in (0, 1):
202             raise ValueError("I2C index must be 0 or 1.")
203         self._i2c_index = index
204
205     def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
206         """Write data from the buffer to an address"""
207         if self._i2c_index is None:
208             raise RuntimeError("I2C bus not initialized.")
209
210         end = end if end else len(buffer)
211
212         write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
213         stop_flag = 0x01 if stop else 0x00
214
215         while (end - start) > 0:
216             remain_bytes = end - start
217             chunk = min(remain_bytes, 64 - 7)
218             resp = self._hid_xfer(
219                 bytes([write_cmd, address, stop_flag])
220                 + remain_bytes.to_bytes(4, byteorder="little")
221                 + buffer[start : (start + chunk)],
222                 True,
223             )
224             if resp[1] != self.RESP_OK:
225                 raise RuntimeError("I2C write error")
226             start += chunk
227
228     def _i2c_read(self, address, buffer, start=0, end=None):
229         """Read data from an address and into the buffer"""
230         # TODO: support chunkified reads
231         if self._i2c_index is None:
232             raise RuntimeError("I2C bus not initialized.")
233
234         end = end if end else len(buffer)
235
236         read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
237         stop_flag = 0x01  # always stop
238         read_size = end - start
239
240         resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
241         if resp[1] != self.RESP_OK:
242             raise RuntimeError("I2C write error")
243         # move into buffer
244         for i in range(read_size):
245             buffer[start + i] = resp[i + 2]
246
247     def i2c_writeto(self, address, buffer, *, start=0, end=None):
248         """Write data from the buffer to an address"""
249         self._i2c_write(address, buffer, start, end)
250
251     def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
252         """Read data from an address and into the buffer"""
253         self._i2c_read(address, buffer, start, end)
254
255     def i2c_writeto_then_readfrom(
256         self,
257         address,
258         out_buffer,
259         in_buffer,
260         *,
261         out_start=0,
262         out_end=None,
263         in_start=0,
264         in_end=None
265     ):
266         """Write data from buffer_out to an address and then
267         read data from an address and into buffer_in
268         """
269         self._i2c_write(address, out_buffer, out_start, out_end, False)
270         self._i2c_read(address, in_buffer, in_start, in_end)
271
272     def i2c_scan(self, *, start=0, end=0x79):
273         """Perform an I2C Device Scan"""
274         if self._i2c_index is None:
275             raise RuntimeError("I2C bus not initialized.")
276         found = []
277         for addr in range(start, end + 1):
278             # try a write
279             try:
280                 self.i2c_writeto(addr, b"\x00\x00\x00")
281             except RuntimeError:  # no reply!
282                 continue
283             # store if success
284             found.append(addr)
285         return found
286
287     # ----------------------------------------------------------------
288     # SPI
289     # ----------------------------------------------------------------
290     def spi_configure(self, baudrate):
291         """Configure SPI."""
292         if self._spi_index is None:
293             raise RuntimeError("SPI bus not initialized.")
294
295         resp = self._hid_xfer(
296             bytes(
297                 [
298                     self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
299                     0x00,  # mode, not yet implemented
300                 ]
301             )
302             + baudrate.to_bytes(4, byteorder="little"),
303             True,
304         )
305         if resp[1] != self.RESP_OK:
306             raise RuntimeError("SPI init error.")
307
308     def spi_set_port(self, index):
309         """Set SPI port."""
310         if index not in (0, 1):
311             raise ValueError("SPI index must be 0 or 1.")
312         self._spi_index = index
313
314     def spi_write(self, buffer, *, start=0, end=None):
315         """SPI write."""
316         if self._spi_index is None:
317             raise RuntimeError("SPI bus not initialized.")
318
319         end = end if end else len(buffer)
320
321         write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
322
323         while (end - start) > 0:
324             remain_bytes = end - start
325             chunk = min(remain_bytes, 64 - 3)
326             resp = self._hid_xfer(
327                 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
328             )
329             if resp[1] != self.RESP_OK:
330                 raise RuntimeError("SPI write error")
331             start += chunk
332
333     def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
334         """SPI readinto."""
335         if self._spi_index is None:
336             raise RuntimeError("SPI bus not initialized.")
337
338         end = end if end else len(buffer)
339         read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
340         read_size = end - start
341
342         resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
343         if resp[1] != self.RESP_OK:
344             raise RuntimeError("SPI write error")
345         # move into buffer
346         for i in range(read_size):
347             buffer[start + i] = resp[i + 2]
348
349     def spi_write_readinto(
350         self,
351         buffer_out,
352         buffer_in,
353         *,
354         out_start=0,
355         out_end=None,
356         in_start=0,
357         in_end=None
358     ):
359         """SPI write and readinto."""
360         raise NotImplementedError("SPI write_readinto Not implemented")
361
362     # ----------------------------------------------------------------
363     # NEOPIXEL
364     # ----------------------------------------------------------------
365     def neopixel_write(self, gpio, buf):
366         """NeoPixel write."""
367         # open serial (data is sent over this)
368         if self._serial is None:
369             import serial
370             import serial.tools.list_ports
371
372             ports = serial.tools.list_ports.comports()
373             for port in ports:
374                 if port.vid == self.VID and port.pid == self.PID:
375                     self._serial = serial.Serial(port.device)
376                     break
377         if self._serial is None:
378             raise RuntimeError("Could not find Pico com port.")
379
380         # init
381         if not self._neopixel_initialized:
382             # deinit any current setup
383             # pylint: disable=protected-access
384             self._hid_xfer(bytes([self.WS2812B_DEINIT]))
385             resp = self._hid_xfer(
386                 bytes(
387                     [
388                         self.WS2812B_INIT,
389                         gpio._pin.id,
390                     ]
391                 ),
392                 True,
393             )
394             if resp[1] != self.RESP_OK:
395                 raise RuntimeError("Neopixel init error")
396             self._neopixel_initialized = True
397
398         # write
399         # command is done over HID
400         remain_bytes = len(buf)
401         resp = self._hid_xfer(
402             bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
403             True,
404         )
405         if resp[1] != self.RESP_OK:
406             # pylint: disable=no-else-raise
407             if resp[2] == 0x01:
408                 raise RuntimeError(
409                     "Neopixel write error : too many pixel for the firmware."
410                 )
411             elif resp[2] == 0x02:
412                 raise RuntimeError(
413                     "Neopixel write error : transfer already in progress."
414                 )
415             else:
416                 raise RuntimeError("Neopixel write error")
417         # buffer is sent over serial
418         self._serial.write(buf)
419         self._serial.flush()
420
421     # ----------------------------------------------------------------
422     # PWM
423     # ----------------------------------------------------------------
424     # pylint: disable=unused-argument
425     def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
426         """Configure PWM."""
427         self.pwm_deinit(pin)
428         resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
429         if resp[1] != self.RESP_OK:
430             raise RuntimeError("PWM init error.")
431
432         self.pwm_set_frequency(pin, frequency)
433         self.pwm_set_duty_cycle(pin, duty_cycle)
434
435     def pwm_deinit(self, pin):
436         """Deinit PWM."""
437         self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
438
439     def pwm_get_frequency(self, pin):
440         """PWM get freq."""
441         resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
442         if resp[1] != self.RESP_OK:
443             raise RuntimeError("PWM get frequency error.")
444         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
445
446     def pwm_set_frequency(self, pin, frequency):
447         """PWM set freq."""
448         resp = self._hid_xfer(
449             bytes([self.PWM_SET_FREQ, pin.id])
450             + frequency.to_bytes(4, byteorder="little"),
451             True,
452         )
453         if resp[1] != self.RESP_OK:
454             # pylint: disable=no-else-raise
455             if resp[3] == 0x01:
456                 raise RuntimeError("PWM different frequency on same slice.")
457             elif resp[3] == 0x02:
458                 raise RuntimeError("PWM frequency too low.")
459             elif resp[3] == 0x03:
460                 raise RuntimeError("PWM frequency too high.")
461             else:
462                 raise RuntimeError("PWM frequency error.")
463
464     def pwm_get_duty_cycle(self, pin):
465         """PWM get duty cycle."""
466         resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
467         if resp[1] != self.RESP_OK:
468             raise RuntimeError("PWM get duty cycle error.")
469         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
470
471     def pwm_set_duty_cycle(self, pin, duty_cycle):
472         """PWM set duty cycle."""
473         resp = self._hid_xfer(
474             bytes([self.PWM_SET_DUTY_U16, pin.id])
475             + duty_cycle.to_bytes(2, byteorder="little"),
476             True,
477         )
478         if resp[1] != self.RESP_OK:
479             raise RuntimeError("PWM set duty cycle error.")
480
481
482 pico_u2if = Pico_u2if()