]> Repositories - hackapet/Adafruit_Blinka.git/blob - src/adafruit_blinka/microcontroller/pico_u2if/pico_u2if.py
adc fix, lint tweak
[hackapet/Adafruit_Blinka.git] / src / adafruit_blinka / microcontroller / pico_u2if / pico_u2if.py
1 """Chip Definition for Pico with u2if firmware"""
2 # https://github.com/execuc/u2if
3
4 import hid
5
6 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
7 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
8
9
10 class Pico_u2if:
11     """MCP2221 Device Class Definition"""
12
13     VID = 0xCAFE
14     PID = 0x4005
15
16     # MISC
17     RESP_OK = 0x01
18
19     # GPIO
20     GPIO_INIT_PIN = 0x20
21     GPIO_SET_VALUE = 0x21
22     GPIO_GET_VALUE = 0x22
23
24     # ADC
25     ADC_INIT_PIN = 0x40
26     ADC_GET_VALUE = 0x41
27
28     # I2C
29     I2C0_INIT = 0x80
30     I2C0_DEINIT = 0x81
31     I2C0_WRITE = 0x82
32     I2C0_READ = 0x83
33     I2C0_WRITE_FROM_UART = 0x84
34     I2C1_INIT = I2C0_INIT + 0x10
35     I2C1_DEINIT = I2C0_DEINIT + 0x10
36     I2C1_WRITE = I2C0_WRITE + 0x10
37     I2C1_READ = I2C0_READ + 0x10
38     I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
39
40     # SPI
41     SPI0_INIT = 0x60
42     SPI0_DEINIT = 0x61
43     SPI0_WRITE = 0x62
44     SPI0_READ = 0x63
45     SPI0_WRITE_FROM_UART = 0x64
46     SPI1_INIT = SPI0_INIT + 0x10
47     SPI1_DEINIT = SPI0_DEINIT + 0x10
48     SPI1_WRITE = SPI0_WRITE + 0x10
49     SPI1_READ = SPI0_READ + 0x10
50     SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
51
52     # WS2812B (LED)
53     WS2812B_INIT = 0xA0
54     WS2812B_DEINIT = 0xA1
55     WS2812B_WRITE = 0xA2
56
57     # PWM
58     PWM_INIT_PIN = 0x30
59     PWM_DEINIT_PIN = 0x31
60     PWM_SET_FREQ = 0x32
61     PWM_GET_FREQ = 0x33
62     PWM_SET_DUTY_U16 = 0x34
63     PWM_GET_DUTY_U16 = 0x35
64     PWM_SET_DUTY_NS = 0x36
65     PWM_GET_DUTY_NS = 0x37
66
67     def __init__(self):
68         self._hid = hid.device()
69         self._hid.open(Pico_u2if.VID, Pico_u2if.PID)
70         self._i2c_index = None
71         self._spi_index = None
72         self._serial = None
73         self._neopixel_initialized = False
74         self._uart_rx_buffer = None
75
76     def _hid_xfer(self, report, response=True):
77         """Perform HID Transfer"""
78         # first byte is report ID, which =0
79         # remaing bytes = 64 byte report data
80         # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
81         self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
82         if response:
83             # return is 64 byte response report
84             return self._hid.read(64)
85         return None
86
87     # ----------------------------------------------------------------
88     # GPIO
89     # ----------------------------------------------------------------
90     def gpio_init_pin(self, pin_id, direction, pull):
91         """Configure GPIO Pin."""
92         self._hid_xfer(
93             bytes(
94                 [
95                     self.GPIO_INIT_PIN,
96                     pin_id,
97                     direction,
98                     pull,
99                 ]
100             )
101         )
102
103     def gpio_set_pin(self, pin_id, value):
104         """Set Current GPIO Pin Value"""
105         self._hid_xfer(
106             bytes(
107                 [
108                     self.GPIO_SET_VALUE,
109                     pin_id,
110                     int(value),
111                 ]
112             )
113         )
114
115     def gpio_get_pin(self, pin_id):
116         """Get Current GPIO Pin Value"""
117         resp = self._hid_xfer(
118             bytes(
119                 [
120                     self.GPIO_GET_VALUE,
121                     pin_id,
122                 ]
123             ),
124             True,
125         )
126         return resp[3] != 0x00
127
128     # ----------------------------------------------------------------
129     # ADC
130     # ----------------------------------------------------------------
131     def adc_init_pin(self, pin_id):
132         """Configure ADC Pin."""
133         self._hid_xfer(
134             bytes(
135                 [
136                     self.ADC_INIT_PIN,
137                     pin_id,
138                 ]
139             )
140         )
141
142     def adc_get_value(self, pin_id):
143         """Get ADC value for pin."""
144         resp = self._hid_xfer(
145             bytes(
146                 [
147                     self.ADC_GET_VALUE,
148                     pin_id,
149                 ]
150             ),
151             True,
152         )
153         return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
154
155     # ----------------------------------------------------------------
156     # I2C
157     # ----------------------------------------------------------------
158     def i2c_configure(self, baudrate, pullup=False):
159         """Configure I2C."""
160         if self._i2c_index is None:
161             raise RuntimeError("I2C bus not initialized.")
162
163         resp = self._hid_xfer(
164             bytes(
165                 [
166                     self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
167                     0x00 if not pullup else 0x01,
168                 ]
169             )
170             + baudrate.to_bytes(4, byteorder="little"),
171             True,
172         )
173         if resp[1] != self.RESP_OK:
174             raise RuntimeError("I2C init error.")
175
176     def i2c_set_port(self, index):
177         """Set I2C port."""
178         if index not in (0, 1):
179             raise ValueError("I2C index must be 0 or 1.")
180         self._i2c_index = index
181
182     def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
183         """Write data from the buffer to an address"""
184         if self._i2c_index is None:
185             raise RuntimeError("I2C bus not initialized.")
186
187         end = end if end else len(buffer)
188
189         write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
190         stop_flag = 0x01 if stop else 0x00
191
192         while (end - start) > 0:
193             remain_bytes = end - start
194             chunk = min(remain_bytes, 64 - 7)
195             resp = self._hid_xfer(
196                 bytes([write_cmd, address, stop_flag])
197                 + remain_bytes.to_bytes(4, byteorder="little")
198                 + buffer[start : (start + chunk)],
199                 True,
200             )
201             if resp[1] != self.RESP_OK:
202                 raise RuntimeError("I2C write error")
203             start += chunk
204
205     def _i2c_read(self, address, buffer, start=0, end=None):
206         """Read data from an address and into the buffer"""
207         # TODO: support chunkified reads
208         if self._i2c_index is None:
209             raise RuntimeError("I2C bus not initialized.")
210
211         end = end if end else len(buffer)
212
213         read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
214         stop_flag = 0x01  # always stop
215         read_size = end - start
216
217         resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
218         if resp[1] != self.RESP_OK:
219             raise RuntimeError("I2C write error")
220         # move into buffer
221         for i in range(read_size):
222             buffer[start + i] = resp[i + 2]
223
224     def i2c_writeto(self, address, buffer, *, start=0, end=None):
225         """Write data from the buffer to an address"""
226         self._i2c_write(address, buffer, start, end)
227
228     def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
229         """Read data from an address and into the buffer"""
230         self._i2c_read(address, buffer, start, end)
231
232     def i2c_writeto_then_readfrom(
233         self,
234         address,
235         out_buffer,
236         in_buffer,
237         *,
238         out_start=0,
239         out_end=None,
240         in_start=0,
241         in_end=None
242     ):
243         """Write data from buffer_out to an address and then
244         read data from an address and into buffer_in
245         """
246         self._i2c_write(address, out_buffer, out_start, out_end, False)
247         self._i2c_read(address, in_buffer, in_start, in_end)
248
249     def i2c_scan(self, *, start=0, end=0x79):
250         """Perform an I2C Device Scan"""
251         if self._i2c_index is None:
252             raise RuntimeError("I2C bus not initialized.")
253         found = []
254         for addr in range(start, end + 1):
255             # try a write
256             try:
257                 self.i2c_writeto(addr, b"\x00\x00\x00")
258             except RuntimeError:  # no reply!
259                 continue
260             # store if success
261             found.append(addr)
262         return found
263
264     # ----------------------------------------------------------------
265     # SPI
266     # ----------------------------------------------------------------
267     def spi_configure(self, baudrate):
268         """Configure SPI."""
269         if self._spi_index is None:
270             raise RuntimeError("SPI bus not initialized.")
271
272         resp = self._hid_xfer(
273             bytes(
274                 [
275                     self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
276                     0x00,  # mode, not yet implemented
277                 ]
278             )
279             + baudrate.to_bytes(4, byteorder="little"),
280             True,
281         )
282         if resp[1] != self.RESP_OK:
283             raise RuntimeError("SPI init error.")
284
285     def spi_set_port(self, index):
286         """Set SPI port."""
287         if index not in (0, 1):
288             raise ValueError("SPI index must be 0 or 1.")
289         self._spi_index = index
290
291     def spi_write(self, buffer, *, start=0, end=None):
292         """SPI write."""
293         if self._spi_index is None:
294             raise RuntimeError("SPI bus not initialized.")
295
296         end = end if end else len(buffer)
297
298         write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
299
300         while (end - start) > 0:
301             remain_bytes = end - start
302             chunk = min(remain_bytes, 64 - 3)
303             resp = self._hid_xfer(
304                 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
305             )
306             if resp[1] != self.RESP_OK:
307                 raise RuntimeError("SPI write error")
308             start += chunk
309
310     def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
311         """SPI readinto."""
312         if self._spi_index is None:
313             raise RuntimeError("SPI bus not initialized.")
314
315         end = end if end else len(buffer)
316         read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
317         read_size = end - start
318
319         resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
320         if resp[1] != self.RESP_OK:
321             raise RuntimeError("SPI write error")
322         # move into buffer
323         for i in range(read_size):
324             buffer[start + i] = resp[i + 2]
325
326     def spi_write_readinto(
327         self,
328         buffer_out,
329         buffer_in,
330         *,
331         out_start=0,
332         out_end=None,
333         in_start=0,
334         in_end=None
335     ):
336         """SPI write and readinto."""
337         raise NotImplementedError("SPI write_readinto Not implemented")
338
339     # ----------------------------------------------------------------
340     # NEOPIXEL
341     # ----------------------------------------------------------------
342     def neopixel_write(self, gpio, buf):
343         """NeoPixel write."""
344         # open serial (data is sent over this)
345         if self._serial is None:
346             import serial
347             import serial.tools.list_ports
348
349             ports = serial.tools.list_ports.comports()
350             for port in ports:
351                 if port.vid == self.VID and port.pid == self.PID:
352                     self._serial = serial.Serial(port.device)
353                     break
354         if self._serial is None:
355             raise RuntimeError("Could not find Pico com port.")
356
357         # init
358         if not self._neopixel_initialized:
359             # deinit any current setup
360             # pylint: disable=protected-access
361             self._hid_xfer(bytes([self.WS2812B_DEINIT]))
362             resp = self._hid_xfer(
363                 bytes(
364                     [
365                         self.WS2812B_INIT,
366                         gpio._pin.id,
367                     ]
368                 ),
369                 True,
370             )
371             if resp[1] != self.RESP_OK:
372                 raise RuntimeError("Neopixel init error")
373             self._neopixel_initialized = True
374
375         # write
376         # command is done over HID
377         remain_bytes = len(buf)
378         resp = self._hid_xfer(
379             bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
380             True,
381         )
382         if resp[1] != self.RESP_OK:
383             # pylint: disable=no-else-raise
384             if resp[2] == 0x01:
385                 raise RuntimeError(
386                     "Neopixel write error : too many pixel for the firmware."
387                 )
388             elif resp[2] == 0x02:
389                 raise RuntimeError(
390                     "Neopixel write error : transfer already in progress."
391                 )
392             else:
393                 raise RuntimeError("Neopixel write error")
394         # buffer is sent over serial
395         self._serial.write(buf)
396         self._serial.flush()
397
398     # ----------------------------------------------------------------
399     # PWM
400     # ----------------------------------------------------------------
401     # pylint: disable=unused-argument
402     def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
403         """Configure PWM."""
404         self.pwm_deinit(pin)
405         resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
406         if resp[1] != self.RESP_OK:
407             raise RuntimeError("PWM init error.")
408
409         self.pwm_set_frequency(pin, frequency)
410         self.pwm_set_duty_cycle(pin, duty_cycle)
411
412     def pwm_deinit(self, pin):
413         """Deinit PWM."""
414         self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
415
416     def pwm_get_frequency(self, pin):
417         """PWM get freq."""
418         resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
419         if resp[1] != self.RESP_OK:
420             raise RuntimeError("PWM get frequency error.")
421         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
422
423     def pwm_set_frequency(self, pin, frequency):
424         """PWM set freq."""
425         resp = self._hid_xfer(
426             bytes([self.PWM_SET_FREQ, pin.id])
427             + frequency.to_bytes(4, byteorder="little"),
428             True,
429         )
430         if resp[1] != self.RESP_OK:
431             # pylint: disable=no-else-raise
432             if resp[3] == 0x01:
433                 raise RuntimeError("PWM different frequency on same slice.")
434             elif resp[3] == 0x02:
435                 raise RuntimeError("PWM frequency too low.")
436             elif resp[3] == 0x03:
437                 raise RuntimeError("PWM frequency too high.")
438             else:
439                 raise RuntimeError("PWM frequency error.")
440
441     def pwm_get_duty_cycle(self, pin):
442         """PWM get duty cycle."""
443         resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
444         if resp[1] != self.RESP_OK:
445             raise RuntimeError("PWM get duty cycle error.")
446         return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
447
448     def pwm_set_duty_cycle(self, pin, duty_cycle):
449         """PWM set duty cycle."""
450         resp = self._hid_xfer(
451             bytes([self.PWM_SET_DUTY_U16, pin.id])
452             + duty_cycle.to_bytes(2, byteorder="little"),
453             True,
454         )
455         if resp[1] != self.RESP_OK:
456             raise RuntimeError("PWM set duty cycle error.")
457
458
459 pico_u2if = Pico_u2if()