1 """Chip Definition for Pico with u2if firmware"""
2 # https://github.com/execuc/u2if
8 # Use to set delay between reset and device reopen. if negative, don't reset at all
9 PICO_U2IF_RESET_DELAY = float(os.environ.get("PICO_U2IF_RESET_DELAY", 1))
11 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
12 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
16 """MCP2221 Device Class Definition"""
39 I2C0_WRITE_FROM_UART = 0x84
40 I2C1_INIT = I2C0_INIT + 0x10
41 I2C1_DEINIT = I2C0_DEINIT + 0x10
42 I2C1_WRITE = I2C0_WRITE + 0x10
43 I2C1_READ = I2C0_READ + 0x10
44 I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
51 SPI0_WRITE_FROM_UART = 0x64
52 SPI1_INIT = SPI0_INIT + 0x10
53 SPI1_DEINIT = SPI0_DEINIT + 0x10
54 SPI1_WRITE = SPI0_WRITE + 0x10
55 SPI1_READ = SPI0_READ + 0x10
56 SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
68 PWM_SET_DUTY_U16 = 0x34
69 PWM_GET_DUTY_U16 = 0x35
70 PWM_SET_DUTY_NS = 0x36
71 PWM_GET_DUTY_NS = 0x37
74 self._hid = hid.device()
75 self._hid.open(Pico_u2if.VID, Pico_u2if.PID)
76 if PICO_U2IF_RESET_DELAY >= 0:
78 self._i2c_index = None
79 self._spi_index = None
81 self._neopixel_initialized = False
82 self._uart_rx_buffer = None
84 def _hid_xfer(self, report, response=True):
85 """Perform HID Transfer"""
86 # first byte is report ID, which =0
87 # remaing bytes = 64 byte report data
88 # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
89 self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
91 # return is 64 byte response report
92 return self._hid.read(64)
96 self._hid_xfer(bytes([self.SYS_RESET]), False)
97 time.sleep(PICO_U2IF_RESET_DELAY)
98 start = time.monotonic()
99 while time.monotonic() - start < 5:
101 self._hid.open(Pico_u2if.VID, Pico_u2if.PID)
106 raise OSError("Pico open error.")
108 # ----------------------------------------------------------------
110 # ----------------------------------------------------------------
111 def gpio_init_pin(self, pin_id, direction, pull):
112 """Configure GPIO Pin."""
124 def gpio_set_pin(self, pin_id, value):
125 """Set Current GPIO Pin Value"""
136 def gpio_get_pin(self, pin_id):
137 """Get Current GPIO Pin Value"""
138 resp = self._hid_xfer(
147 return resp[3] != 0x00
149 # ----------------------------------------------------------------
151 # ----------------------------------------------------------------
152 def adc_init_pin(self, pin_id):
153 """Configure ADC Pin."""
163 def adc_get_value(self, pin_id):
164 """Get ADC value for pin."""
165 resp = self._hid_xfer(
174 return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
176 # ----------------------------------------------------------------
178 # ----------------------------------------------------------------
179 def i2c_configure(self, baudrate, pullup=False):
181 if self._i2c_index is None:
182 raise RuntimeError("I2C bus not initialized.")
184 resp = self._hid_xfer(
187 self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
188 0x00 if not pullup else 0x01,
191 + baudrate.to_bytes(4, byteorder="little"),
194 if resp[1] != self.RESP_OK:
195 raise RuntimeError("I2C init error.")
197 def i2c_set_port(self, index):
199 if index not in (0, 1):
200 raise ValueError("I2C index must be 0 or 1.")
201 self._i2c_index = index
203 def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
204 """Write data from the buffer to an address"""
205 if self._i2c_index is None:
206 raise RuntimeError("I2C bus not initialized.")
208 end = end if end else len(buffer)
210 write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
211 stop_flag = 0x01 if stop else 0x00
213 while (end - start) > 0:
214 remain_bytes = end - start
215 chunk = min(remain_bytes, 64 - 7)
216 resp = self._hid_xfer(
217 bytes([write_cmd, address, stop_flag])
218 + remain_bytes.to_bytes(4, byteorder="little")
219 + buffer[start : (start + chunk)],
222 if resp[1] != self.RESP_OK:
223 raise RuntimeError("I2C write error")
226 def _i2c_read(self, address, buffer, start=0, end=None):
227 """Read data from an address and into the buffer"""
228 # TODO: support chunkified reads
229 if self._i2c_index is None:
230 raise RuntimeError("I2C bus not initialized.")
232 end = end if end else len(buffer)
234 read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
235 stop_flag = 0x01 # always stop
236 read_size = end - start
238 resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
239 if resp[1] != self.RESP_OK:
240 raise RuntimeError("I2C write error")
242 for i in range(read_size):
243 buffer[start + i] = resp[i + 2]
245 def i2c_writeto(self, address, buffer, *, start=0, end=None):
246 """Write data from the buffer to an address"""
247 self._i2c_write(address, buffer, start, end)
249 def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
250 """Read data from an address and into the buffer"""
251 self._i2c_read(address, buffer, start, end)
253 def i2c_writeto_then_readfrom(
264 """Write data from buffer_out to an address and then
265 read data from an address and into buffer_in
267 self._i2c_write(address, out_buffer, out_start, out_end, False)
268 self._i2c_read(address, in_buffer, in_start, in_end)
270 def i2c_scan(self, *, start=0, end=0x79):
271 """Perform an I2C Device Scan"""
272 if self._i2c_index is None:
273 raise RuntimeError("I2C bus not initialized.")
275 for addr in range(start, end + 1):
278 self.i2c_writeto(addr, b"\x00\x00\x00")
279 except RuntimeError: # no reply!
285 # ----------------------------------------------------------------
287 # ----------------------------------------------------------------
288 def spi_configure(self, baudrate):
290 if self._spi_index is None:
291 raise RuntimeError("SPI bus not initialized.")
293 resp = self._hid_xfer(
296 self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
297 0x00, # mode, not yet implemented
300 + baudrate.to_bytes(4, byteorder="little"),
303 if resp[1] != self.RESP_OK:
304 raise RuntimeError("SPI init error.")
306 def spi_set_port(self, index):
308 if index not in (0, 1):
309 raise ValueError("SPI index must be 0 or 1.")
310 self._spi_index = index
312 def spi_write(self, buffer, *, start=0, end=None):
314 if self._spi_index is None:
315 raise RuntimeError("SPI bus not initialized.")
317 end = end if end else len(buffer)
319 write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
321 while (end - start) > 0:
322 remain_bytes = end - start
323 chunk = min(remain_bytes, 64 - 3)
324 resp = self._hid_xfer(
325 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
327 if resp[1] != self.RESP_OK:
328 raise RuntimeError("SPI write error")
331 def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
333 if self._spi_index is None:
334 raise RuntimeError("SPI bus not initialized.")
336 end = end if end else len(buffer)
337 read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
338 read_size = end - start
340 resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
341 if resp[1] != self.RESP_OK:
342 raise RuntimeError("SPI write error")
344 for i in range(read_size):
345 buffer[start + i] = resp[i + 2]
347 def spi_write_readinto(
357 """SPI write and readinto."""
358 raise NotImplementedError("SPI write_readinto Not implemented")
360 # ----------------------------------------------------------------
362 # ----------------------------------------------------------------
363 def neopixel_write(self, gpio, buf):
364 """NeoPixel write."""
365 # open serial (data is sent over this)
366 if self._serial is None:
368 import serial.tools.list_ports
370 ports = serial.tools.list_ports.comports()
372 if port.vid == self.VID and port.pid == self.PID:
373 self._serial = serial.Serial(port.device)
375 if self._serial is None:
376 raise RuntimeError("Could not find Pico com port.")
379 if not self._neopixel_initialized:
380 # deinit any current setup
381 # pylint: disable=protected-access
382 self._hid_xfer(bytes([self.WS2812B_DEINIT]))
383 resp = self._hid_xfer(
392 if resp[1] != self.RESP_OK:
393 raise RuntimeError("Neopixel init error")
394 self._neopixel_initialized = True
397 # command is done over HID
398 remain_bytes = len(buf)
399 resp = self._hid_xfer(
400 bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
403 if resp[1] != self.RESP_OK:
404 # pylint: disable=no-else-raise
407 "Neopixel write error : too many pixel for the firmware."
409 elif resp[2] == 0x02:
411 "Neopixel write error : transfer already in progress."
414 raise RuntimeError("Neopixel write error")
415 # buffer is sent over serial
416 self._serial.write(buf)
419 # ----------------------------------------------------------------
421 # ----------------------------------------------------------------
422 # pylint: disable=unused-argument
423 def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
426 resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
427 if resp[1] != self.RESP_OK:
428 raise RuntimeError("PWM init error.")
430 self.pwm_set_frequency(pin, frequency)
431 self.pwm_set_duty_cycle(pin, duty_cycle)
433 def pwm_deinit(self, pin):
435 self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
437 def pwm_get_frequency(self, pin):
439 resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
440 if resp[1] != self.RESP_OK:
441 raise RuntimeError("PWM get frequency error.")
442 return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
444 def pwm_set_frequency(self, pin, frequency):
446 resp = self._hid_xfer(
447 bytes([self.PWM_SET_FREQ, pin.id])
448 + frequency.to_bytes(4, byteorder="little"),
451 if resp[1] != self.RESP_OK:
452 # pylint: disable=no-else-raise
454 raise RuntimeError("PWM different frequency on same slice.")
455 elif resp[3] == 0x02:
456 raise RuntimeError("PWM frequency too low.")
457 elif resp[3] == 0x03:
458 raise RuntimeError("PWM frequency too high.")
460 raise RuntimeError("PWM frequency error.")
462 def pwm_get_duty_cycle(self, pin):
463 """PWM get duty cycle."""
464 resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
465 if resp[1] != self.RESP_OK:
466 raise RuntimeError("PWM get duty cycle error.")
467 return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
469 def pwm_set_duty_cycle(self, pin, duty_cycle):
470 """PWM set duty cycle."""
471 resp = self._hid_xfer(
472 bytes([self.PWM_SET_DUTY_U16, pin.id])
473 + duty_cycle.to_bytes(2, byteorder="little"),
476 if resp[1] != self.RESP_OK:
477 raise RuntimeError("PWM set duty cycle error.")
480 pico_u2if = Pico_u2if()