]> Repositories - hackapet/Adafruit_Blinka.git/blob - src/adafruit_blinka/microcontroller/pico_u2if/pico_u2if.py
Merge pull request #457 from timonsku/patch-1
[hackapet/Adafruit_Blinka.git] / src / adafruit_blinka / microcontroller / pico_u2if / pico_u2if.py
1 """Chip Definition for Pico with u2if firmware"""
2 # https://github.com/execuc/u2if
3
4 import os
5 import time
6 import hid
7
8 # Use to set delay between reset and device reopen. if negative, don't reset at all
9 PICO_U2IF_RESET_DELAY = float(os.environ.get("PICO_U2IF_RESET_DELAY", 1))
10
11 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
12 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
13
14
15 class Pico_u2if:
16     """MCP2221 Device Class Definition"""
17
18     VID = 0xCAFE
19     PID = 0x4005
20
21     # MISC
22     RESP_OK = 0x01
23     SYS_RESET = 0x10
24
25     # GPIO
26     GPIO_INIT_PIN = 0x20
27     GPIO_SET_VALUE = 0x21
28     GPIO_GET_VALUE = 0x22
29
30     # ADC
31     ADC_INIT_PIN = 0x40
32     ADC_GET_VALUE = 0x41
33
34     # I2C
35     I2C0_INIT = 0x80
36     I2C0_DEINIT = 0x81
37     I2C0_WRITE = 0x82
38     I2C0_READ = 0x83
39     I2C0_WRITE_FROM_UART = 0x84
40     I2C1_INIT = I2C0_INIT + 0x10
41     I2C1_DEINIT = I2C0_DEINIT + 0x10
42     I2C1_WRITE = I2C0_WRITE + 0x10
43     I2C1_READ = I2C0_READ + 0x10
44     I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
45
46     # SPI
47     SPI0_INIT = 0x60
48     SPI0_DEINIT = 0x61
49     SPI0_WRITE = 0x62
50     SPI0_READ = 0x63
51     SPI0_WRITE_FROM_UART = 0x64
52     SPI1_INIT = SPI0_INIT + 0x10
53     SPI1_DEINIT = SPI0_DEINIT + 0x10
54     SPI1_WRITE = SPI0_WRITE + 0x10
55     SPI1_READ = SPI0_READ + 0x10
56     SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
57
58     # WS2812B (LED)
59     WS2812B_INIT = 0xA0
60     WS2812B_DEINIT = 0xA1
61     WS2812B_WRITE = 0xA2
62
63     # PWM
64     PWM_INIT_PIN = 0x30
65     PWM_DEINIT_PIN = 0x31
66     PWM_SET_FREQ = 0x32
67     PWM_GET_FREQ = 0x33
68     PWM_SET_DUTY_U16 = 0x34
69     PWM_GET_DUTY_U16 = 0x35
70     PWM_SET_DUTY_NS = 0x36
71     PWM_GET_DUTY_NS = 0x37
72
73     def __init__(self):
74         self._hid = hid.device()
75         self._hid.open(Pico_u2if.VID, Pico_u2if.PID)
76         if PICO_U2IF_RESET_DELAY >= 0:
77             self._reset()
78         self._i2c_index = None
79         self._spi_index = None
80         self._serial = None
81         self._neopixel_initialized = False
82         self._uart_rx_buffer = None
83
84     def _hid_xfer(self, report, response=True):
85         """Perform HID Transfer"""
86         # first byte is report ID, which =0
87         # remaing bytes = 64 byte report data
88         # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
89         self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
90         if response:
91             # return is 64 byte response report
92             return self._hid.read(64)
93         return None
94
95     def _reset(self):
96         self._hid_xfer(bytes([self.SYS_RESET]), False)
97         time.sleep(PICO_U2IF_RESET_DELAY)
98         start = time.monotonic()
99         while time.monotonic() - start < 5:
100             try:
101                 self._hid.open(Pico_u2if.VID, Pico_u2if.PID)
102             except OSError:
103                 time.sleep(0.1)
104                 continue
105             return
106         raise OSError("Pico open error.")
107
108     # ----------------------------------------------------------------
109     # GPIO
110     # ----------------------------------------------------------------
111     def gpio_init_pin(self, pin_id, direction, pull):
112         """Configure GPIO Pin."""
113         self._hid_xfer(
114             bytes(
115                 [
116                     self.GPIO_INIT_PIN,
117                     pin_id,
118                     direction,
119                     pull,
120                 ]
121             )
122         )
123
124     def gpio_set_pin(self, pin_id, value):
125         """Set Current GPIO Pin Value"""
126         self._hid_xfer(
127             bytes(
128                 [
129                     self.GPIO_SET_VALUE,
130                     pin_id,
131                     int(value),
132                 ]
133             )
134         )
135
136     def gpio_get_pin(self, pin_id):
137         """Get Current GPIO Pin Value"""
138         resp = self._hid_xfer(
139             bytes(
140                 [
141                     self.GPIO_GET_VALUE,
142                     pin_id,
143                 ]
144             ),
145             True,
146         )
147         return resp[3] != 0x00
148
149     # ----------------------------------------------------------------
150     # ADC
151     # ----------------------------------------------------------------
152     def adc_init_pin(self, pin_id):
153         """Configure ADC Pin."""
154         self._hid_xfer(
155             bytes(
156                 [
157                     self.ADC_INIT_PIN,
158                     pin_id,
159                 ]
160             )
161         )
162
163     def adc_get_value(self, pin_id):
164         """Get ADC value for pin."""
165         resp = self._hid_xfer(
166             bytes(
167                 [
168                     self.ADC_GET_VALUE,
169                     pin_id,
170                 ]
171             ),
172             True,
173         )
174         return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
175
176     # ----------------------------------------------------------------
177     # I2C
178     # ----------------------------------------------------------------
179     def i2c_configure(self, baudrate, pullup=False):
180         """Configure I2C."""
181         if self._i2c_index is None:
182             raise RuntimeError("I2C bus not initialized.")
183
184         resp = self._hid_xfer(
185             bytes(
186                 [
187                     self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
188                     0x00 if not pullup else 0x01,
189                 ]
190             )
191             + baudrate.to_bytes(4, byteorder="little"),
192             True,
193         )
194         if resp[1] != self.RESP_OK:
195             raise RuntimeError("I2C init error.")
196
197     def i2c_set_port(self, index):
198         """Set I2C port."""
199         if index not in (0, 1):
200             raise ValueError("I2C index must be 0 or 1.")
201         self._i2c_index = index
202
203     def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
204         """Write data from the buffer to an address"""
205         if self._i2c_index is None:
206             raise RuntimeError("I2C bus not initialized.")
207
208         end = end if end else len(buffer)
209
210         write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
211         stop_flag = 0x01 if stop else 0x00
212
213         while (end - start) > 0:
214             remain_bytes = end - start
215             chunk = min(remain_bytes, 64 - 7)
216             resp = self._hid_xfer(
217                 bytes([write_cmd, address, stop_flag])
218                 + remain_bytes.to_bytes(4, byteorder="little")
219                 + buffer[start : (start + chunk)],
220                 True,
221             )
222             if resp[1] != self.RESP_OK:
223                 raise RuntimeError("I2C write error")
224             start += chunk
225
226     def _i2c_read(self, address, buffer, start=0, end=None):
227         """Read data from an address and into the buffer"""
228         # TODO: support chunkified reads
229         if self._i2c_index is None:
230             raise RuntimeError("I2C bus not initialized.")
231
232         end = end if end else len(buffer)
233
234         read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
235         stop_flag = 0x01  # always stop
236         read_size = end - start
237
238         resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
239         if resp[1] != self.RESP_OK:
240             raise RuntimeError("I2C write error")
241         # move into buffer
242         for i in range(read_size):
243             buffer[start + i] = resp[i + 2]
244
245     def i2c_writeto(self, address, buffer, *, start=0, end=None):
246         """Write data from the buffer to an address"""
247         self._i2c_write(address, buffer, start, end)
248
249     def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
250         """Read data from an address and into the buffer"""
251         self._i2c_read(address, buffer, start, end)
252
253     def i2c_writeto_then_readfrom(
254         self,
255         address,
256         out_buffer,
257         in_buffer,
258         *,
259         out_start=0,
260         out_end=None,
261         in_start=0,
262         in_end=None
263     ):
264         """Write data from buffer_out to an address and then
265         read data from an address and into buffer_in
266         """
267         self._i2c_write(address, out_buffer, out_start, out_end, False)
268         self._i2c_read(address, in_buffer, in_start, in_end)
269
270     def i2c_scan(self, *, start=0, end=0x79):
271         """Perform an I2C Device Scan"""
272         if self._i2c_index is None:
273             raise RuntimeError("I2C bus not initialized.")
274         found = []
275         for addr in range(start, end + 1):
276             # try a write
277             try:
278                 self.i2c_writeto(addr, b"\x00\x00\x00")
279             except RuntimeError:  # no reply!
280                 continue
281             # store if success
282             found.append(addr)
283         return found
284
285     # ----------------------------------------------------------------
286     # SPI
287     # ----------------------------------------------------------------
288     def spi_configure(self, baudrate):
289         """Configure SPI."""
290         if self._spi_index is None:
291             raise RuntimeError("SPI bus not initialized.")
292
293         resp = self._hid_xfer(
294             bytes(
295                 [
296                     self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
297                     0x00,  # mode, not yet implemented
298                 ]
299             )
300             + baudrate.to_bytes(4, byteorder="little"),
301             True,
302         )
303         if resp[1] != self.RESP_OK:
304             raise RuntimeError("SPI init error.")
305
306     def spi_set_port(self, index):
307         """Set SPI port."""
308         if index not in (0, 1):
309             raise ValueError("SPI index must be 0 or 1.")
310         self._spi_index = index
311
312     def spi_write(self, buffer, *, start=0, end=None):
313         """SPI write."""
314         if self._spi_index is None:
315             raise RuntimeError("SPI bus not initialized.")
316
317         end = end if end else len(buffer)
318
319         write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
320
321         while (end - start) > 0:
322             remain_bytes = end - start
323             chunk = min(remain_bytes, 64 - 3)
324             resp = self._hid_xfer(
325                 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
326             )
327             if resp[1] != self.RESP_OK:
328                 raise RuntimeError("SPI write error")
329             start += chunk
330
331     def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
332         """SPI readinto."""
333         if self._spi_index is None:
334             raise RuntimeError("SPI bus not initialized.")
335
336         end = end if end else len(buffer)
337         read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
338         read_size = end - start
339
340         resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
341         if resp[1] != self.RESP_OK:
342             raise RuntimeError("SPI write error")
343         # move into buffer
344         for i in range(read_size):
345             buffer[start + i] = resp[i + 2]
346
347     def spi_write_readinto(
348         self,
349         buffer_out,
350         buffer_in,
351         *,
352         out_start=0,
353         out_end=None,
354         in_start=0,
355         in_end=None
356     ):
357         """SPI write and readinto."""
358         raise NotImplementedError("SPI write_readinto Not implemented")
359
360     # ----------------------------------------------------------------
361     # NEOPIXEL
362     # ----------------------------------------------------------------
363     def neopixel_write(self, gpio, buf):
364         """NeoPixel write."""
365         # open serial (data is sent over this)
366         if self._serial is None:
367             import serial
368             import serial.tools.list_ports
369
370             ports = serial.tools.list_ports.comports()
371             for port in ports:
372                 if port.vid == self.VID and port.pid == self.PID:
373                     self._serial = serial.Serial(port.device)
374                     break
375         if self._serial is None:
376             raise RuntimeError("Could not find Pico com port.")
377
378         # init
379         if not self._neopixel_initialized:
380             # deinit any current setup
381             # pylint: disable=protected-access
382             self._hid_xfer(bytes([self.WS2812B_DEINIT]))
383             resp = self._hid_xfer(
384                 bytes(
385                     [
386                         self.WS2812B_INIT,
387                         gpio._pin.id,
388                     ]
389                 ),
390                 True,
391             )
392             if resp[1] != self.RESP_OK:
393                 raise RuntimeError("Neopixel init error")
394             self._neopixel_initialized = True
395
396         self._serial.reset_output_buffer()
397
398         # write
399         # command is done over HID
400         remain_bytes = len(buf)
401         resp = self._hid_xfer(
402             bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
403             True,
404         )
405         if resp[1] != self.RESP_OK:
406             # pylint: disable=no-else-raise
407             if resp[2] == 0x01:
408                 raise RuntimeError(
409                     "Neopixel write error : too many pixel for the firmware."
410                 )
411             elif resp[2] == 0x02:
412                 print(resp[0:10])
413                 raise RuntimeError(
414                     "Neopixel write error : transfer already in progress."
415                 )
416             else:
417                 raise RuntimeError("Neopixel write error.")
418         # buffer is sent over serial
419         self._serial.write(buf)
420         # hack (see u2if)
421         if len(buf) % 64 == 0:
422             self._serial.write([0])
423         self._serial.flush()
424         # polling loop to wait for write complete?
425         resp = self._hid.read(64)
426         while resp[0] != self.WS2812B_WRITE:
427             resp = self._hid.read(64)
428         if resp[1] != self.RESP_OK:
429             raise RuntimeError("Neopixel write (flush) error.")
430
431     # ----------------------------------------------------------------
432     # PWM
433     # ----------------------------------------------------------------
434     # pylint: disable=unused-argument
435     def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
436         """Configure PWM."""
437         self.pwm_deinit(pin)
438         resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
439         if resp[1] != self.RESP_OK:
440             raise RuntimeError("PWM init error.")
441
442         self.pwm_set_frequency(pin, frequency)
443         self.pwm_set_duty_cycle(pin, duty_cycle)
444
445     def pwm_deinit(self, pin):
446         """Deinit PWM."""
447         self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
448
449     def pwm_get_frequency(self, pin):
450         """PWM get freq."""
451         resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
452         if resp[1] != self.RESP_OK:
453             raise RuntimeError("PWM get frequency error.")
454         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
455
456     def pwm_set_frequency(self, pin, frequency):
457         """PWM set freq."""
458         resp = self._hid_xfer(
459             bytes([self.PWM_SET_FREQ, pin.id])
460             + frequency.to_bytes(4, byteorder="little"),
461             True,
462         )
463         if resp[1] != self.RESP_OK:
464             # pylint: disable=no-else-raise
465             if resp[3] == 0x01:
466                 raise RuntimeError("PWM different frequency on same slice.")
467             elif resp[3] == 0x02:
468                 raise RuntimeError("PWM frequency too low.")
469             elif resp[3] == 0x03:
470                 raise RuntimeError("PWM frequency too high.")
471             else:
472                 raise RuntimeError("PWM frequency error.")
473
474     def pwm_get_duty_cycle(self, pin):
475         """PWM get duty cycle."""
476         resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
477         if resp[1] != self.RESP_OK:
478             raise RuntimeError("PWM get duty cycle error.")
479         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
480
481     def pwm_set_duty_cycle(self, pin, duty_cycle):
482         """PWM set duty cycle."""
483         resp = self._hid_xfer(
484             bytes([self.PWM_SET_DUTY_U16, pin.id])
485             + duty_cycle.to_bytes(2, byteorder="little"),
486             True,
487         )
488         if resp[1] != self.RESP_OK:
489             raise RuntimeError("PWM set duty cycle error.")
490
491
492 pico_u2if = Pico_u2if()