1 """Chip Definition for Pico with u2if firmware"""
2 # https://github.com/execuc/u2if
6 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
7 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
11 """MCP2221 Device Class Definition"""
33 I2C0_WRITE_FROM_UART = 0x84
34 I2C1_INIT = I2C0_INIT + 0x10
35 I2C1_DEINIT = I2C0_DEINIT + 0x10
36 I2C1_WRITE = I2C0_WRITE + 0x10
37 I2C1_READ = I2C0_READ + 0x10
38 I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
45 SPI0_WRITE_FROM_UART = 0x64
46 SPI1_INIT = SPI0_INIT + 0x10
47 SPI1_DEINIT = SPI0_DEINIT + 0x10
48 SPI1_WRITE = SPI0_WRITE + 0x10
49 SPI1_READ = SPI0_READ + 0x10
50 SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
62 PWM_SET_DUTY_U16 = 0x34
63 PWM_GET_DUTY_U16 = 0x35
64 PWM_SET_DUTY_NS = 0x36
65 PWM_GET_DUTY_NS = 0x37
74 self._hid = hid.device()
75 self._hid.open(Pico_u2if.VID, Pico_u2if.PID)
76 self._i2c_index = None
77 self._spi_index = None
79 self._neopixel_initialized = False
80 self._uart_rx_buffer = None
82 def _hid_xfer(self, report, response=True):
83 """Perform HID Transfer"""
84 # first byte is report ID, which =0
85 # remaing bytes = 64 byte report data
86 # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
87 self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
89 # return is 64 byte response report
90 return self._hid.read(64)
93 # ----------------------------------------------------------------
95 # ----------------------------------------------------------------
96 def gpio_init_pin(self, pin_id, direction, pull):
97 """Configure GPIO Pin."""
109 def gpio_set_pin(self, pin_id, value):
110 """Set Current GPIO Pin Value"""
121 def gpio_get_pin(self, pin_id):
122 """Get Current GPIO Pin Value"""
123 resp = self._hid_xfer(
132 return resp[3] != 0x00
134 # ----------------------------------------------------------------
136 # ----------------------------------------------------------------
137 def adc_init_pin(self, pin_id):
138 """Configure ADC Pin."""
148 def adc_get_value(self, pin_id):
149 """Get ADC value for pin."""
150 resp = self._hid_xfer(
159 return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
161 # ----------------------------------------------------------------
163 # ----------------------------------------------------------------
164 def i2c_configure(self, baudrate, pullup=False):
166 if self._i2c_index is None:
167 raise RuntimeError("I2C bus not initialized.")
169 resp = self._hid_xfer(
172 self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
173 0x00 if not pullup else 0x01,
176 + baudrate.to_bytes(4, byteorder="little"),
179 if resp[1] != self.RESP_OK:
180 raise RuntimeError("I2C init error.")
182 def i2c_set_port(self, index):
184 if index not in (0, 1):
185 raise ValueError("I2C index must be 0 or 1.")
186 self._i2c_index = index
188 def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
189 """Write data from the buffer to an address"""
190 if self._i2c_index is None:
191 raise RuntimeError("I2C bus not initialized.")
193 end = end if end else len(buffer)
195 write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
196 stop_flag = 0x01 if stop else 0x00
198 while (end - start) > 0:
199 remain_bytes = end - start
200 chunk = min(remain_bytes, 64 - 7)
201 resp = self._hid_xfer(
202 bytes([write_cmd, address, stop_flag])
203 + remain_bytes.to_bytes(4, byteorder="little")
204 + buffer[start : (start + chunk)],
207 if resp[1] != self.RESP_OK:
208 raise RuntimeError("I2C write error")
211 def _i2c_read(self, address, buffer, start=0, end=None):
212 """Read data from an address and into the buffer"""
213 # TODO: support chunkified reads
214 if self._i2c_index is None:
215 raise RuntimeError("I2C bus not initialized.")
217 end = end if end else len(buffer)
219 read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
220 stop_flag = 0x01 # always stop
221 read_size = end - start
223 resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
224 if resp[1] != self.RESP_OK:
225 raise RuntimeError("I2C write error")
227 for i in range(read_size):
228 buffer[start + i] = resp[i + 2]
230 def i2c_writeto(self, address, buffer, *, start=0, end=None):
231 """Write data from the buffer to an address"""
232 self._i2c_write(address, buffer, start, end)
234 def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
235 """Read data from an address and into the buffer"""
236 self._i2c_read(address, buffer, start, end)
238 def i2c_writeto_then_readfrom(
249 """Write data from buffer_out to an address and then
250 read data from an address and into buffer_in
252 self._i2c_write(address, out_buffer, out_start, out_end, False)
253 self._i2c_read(address, in_buffer, in_start, in_end)
255 def i2c_scan(self, *, start=0, end=0x79):
256 """Perform an I2C Device Scan"""
257 if self._i2c_index is None:
258 raise RuntimeError("I2C bus not initialized.")
260 for addr in range(start, end + 1):
263 self.i2c_writeto(addr, b"\x00\x00\x00")
264 except RuntimeError: # no reply!
270 # ----------------------------------------------------------------
272 # ----------------------------------------------------------------
273 def spi_configure(self, baudrate):
275 if self._spi_index is None:
276 raise RuntimeError("SPI bus not initialized.")
278 resp = self._hid_xfer(
281 self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
282 0x00, # mode, not yet implemented
285 + baudrate.to_bytes(4, byteorder="little"),
288 if resp[1] != self.RESP_OK:
289 raise RuntimeError("SPI init error.")
291 def spi_set_port(self, index):
293 if index not in (0, 1):
294 raise ValueError("SPI index must be 0 or 1.")
295 self._spi_index = index
297 def spi_write(self, buffer, *, start=0, end=None):
299 if self._spi_index is None:
300 raise RuntimeError("SPI bus not initialized.")
302 end = end if end else len(buffer)
304 write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
306 while (end - start) > 0:
307 remain_bytes = end - start
308 chunk = min(remain_bytes, 64 - 3)
309 resp = self._hid_xfer(
310 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
312 if resp[1] != self.RESP_OK:
313 raise RuntimeError("SPI write error")
316 def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
318 if self._spi_index is None:
319 raise RuntimeError("SPI bus not initialized.")
321 end = end if end else len(buffer)
322 read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
323 read_size = end - start
325 resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
326 if resp[1] != self.RESP_OK:
327 raise RuntimeError("SPI write error")
329 for i in range(read_size):
330 buffer[start + i] = resp[i + 2]
332 def spi_write_readinto(
342 """SPI write and readinto."""
343 raise NotImplementedError("SPI write_readinto Not implemented")
345 # ----------------------------------------------------------------
347 # ----------------------------------------------------------------
348 def neopixel_write(self, gpio, buf):
349 """NeoPixel write."""
350 # open serial (data is sent over this)
351 if self._serial is None:
353 import serial.tools.list_ports
355 ports = serial.tools.list_ports.comports()
357 if port.vid == self.VID and port.pid == self.PID:
358 self._serial = serial.Serial(port.device)
360 if self._serial is None:
361 raise RuntimeError("Could not find Pico com port.")
364 if not self._neopixel_initialized:
365 # deinit any current setup
366 # pylint: disable=protected-access
367 self._hid_xfer(bytes([self.WS2812B_DEINIT]))
368 resp = self._hid_xfer(
377 if resp[1] != self.RESP_OK:
378 raise RuntimeError("Neopixel init error")
379 self._neopixel_initialized = True
382 # command is done over HID
383 remain_bytes = len(buf)
384 resp = self._hid_xfer(
385 bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
388 if resp[1] != self.RESP_OK:
389 # pylint: disable=no-else-raise
392 "Neopixel write error : too many pixel for the firmware."
394 elif resp[2] == 0x02:
396 "Neopixel write error : transfer already in progress."
399 raise RuntimeError("Neopixel write error")
400 # buffer is sent over serial
401 self._serial.write(buf)
404 # ----------------------------------------------------------------
406 # ----------------------------------------------------------------
407 # pylint: disable=unused-argument
408 def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
411 resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
412 if resp[1] != self.RESP_OK:
413 raise RuntimeError("PWM init error.")
415 self.pwm_set_frequency(pin, frequency)
416 self.pwm_set_duty_cycle(pin, duty_cycle)
418 def pwm_deinit(self, pin):
420 self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
422 def pwm_get_frequency(self, pin):
424 resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
425 if resp[1] != self.RESP_OK:
426 raise RuntimeError("PWM get frequency error.")
427 return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
429 def pwm_set_frequency(self, pin, frequency):
431 resp = self._hid_xfer(
432 bytes([self.PWM_SET_FREQ, pin.id])
433 + frequency.to_bytes(4, byteorder="little"),
436 if resp[1] != self.RESP_OK:
437 # pylint: disable=no-else-raise
439 raise RuntimeError("PWM different frequency on same slice.")
440 elif resp[3] == 0x02:
441 raise RuntimeError("PWM frequency too low.")
442 elif resp[3] == 0x03:
443 raise RuntimeError("PWM frequency too high.")
445 raise RuntimeError("PWM frequency error.")
447 def pwm_get_duty_cycle(self, pin):
448 """PWM get duty cycle."""
449 resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
450 if resp[1] != self.RESP_OK:
451 raise RuntimeError("PWM get duty cycle error.")
452 return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
454 def pwm_set_duty_cycle(self, pin, duty_cycle):
455 """PWM set duty cycle."""
456 resp = self._hid_xfer(
457 bytes([self.PWM_SET_DUTY_U16, pin.id])
458 + duty_cycle.to_bytes(2, byteorder="little"),
461 if resp[1] != self.RESP_OK:
462 raise RuntimeError("PWM set duty cycle error.")
465 pico_u2if = Pico_u2if()