1 """Chip Definition for Pico with u2if firmware"""
2 # https://github.com/execuc/u2if
6 # pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements
7 # pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods
11 """MCP2221 Device Class Definition"""
33 I2C0_WRITE_FROM_UART = 0x84
34 I2C1_INIT = I2C0_INIT + 0x10
35 I2C1_DEINIT = I2C0_DEINIT + 0x10
36 I2C1_WRITE = I2C0_WRITE + 0x10
37 I2C1_READ = I2C0_READ + 0x10
38 I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
45 SPI0_WRITE_FROM_UART = 0x64
46 SPI1_INIT = SPI0_INIT + 0x10
47 SPI1_DEINIT = SPI0_DEINIT + 0x10
48 SPI1_WRITE = SPI0_WRITE + 0x10
49 SPI1_READ = SPI0_READ + 0x10
50 SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10
62 PWM_SET_DUTY_U16 = 0x34
63 PWM_GET_DUTY_U16 = 0x35
64 PWM_SET_DUTY_NS = 0x36
65 PWM_GET_DUTY_NS = 0x37
68 self._hid = hid.device()
69 self._hid.open(Pico_u2if.VID, Pico_u2if.PID)
70 self._i2c_index = None
71 self._spi_index = None
73 self._neopixel_initialized = False
74 self._uart_rx_buffer = None
76 def _hid_xfer(self, report, response=True):
77 """Perform HID Transfer"""
78 # first byte is report ID, which =0
79 # remaing bytes = 64 byte report data
80 # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185
81 self._hid.write(b"\0" + report + b"\0" * (64 - len(report)))
83 # return is 64 byte response report
84 return self._hid.read(64)
87 # ----------------------------------------------------------------
89 # ----------------------------------------------------------------
90 def gpio_init_pin(self, pin_id, direction, pull):
91 """Configure GPIO Pin."""
103 def gpio_set_pin(self, pin_id, value):
104 """Set Current GPIO Pin Value"""
115 def gpio_get_pin(self, pin_id):
116 """Get Current GPIO Pin Value"""
117 resp = self._hid_xfer(
126 return resp[3] != 0x00
128 # ----------------------------------------------------------------
130 # ----------------------------------------------------------------
131 def adc_init_pin(self, pin_id):
132 """Configure ADC Pin."""
142 def adc_get_value(self, pin_id):
143 """Get ADC value for pin."""
144 resp = self._hid_xfer(
153 return int.from_bytes(resp[3 : 3 + 2], byteorder="little")
155 # ----------------------------------------------------------------
157 # ----------------------------------------------------------------
158 def i2c_configure(self, baudrate, pullup=False):
160 if self._i2c_index is None:
161 raise RuntimeError("I2C bus not initialized.")
163 resp = self._hid_xfer(
166 self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT,
167 0x00 if not pullup else 0x01,
170 + baudrate.to_bytes(4, byteorder="little"),
173 if resp[1] != self.RESP_OK:
174 raise RuntimeError("I2C init error.")
176 def i2c_set_port(self, index):
178 if index not in (0, 1):
179 raise ValueError("I2C index must be 0 or 1.")
180 self._i2c_index = index
182 def _i2c_write(self, address, buffer, start=0, end=None, stop=True):
183 """Write data from the buffer to an address"""
184 if self._i2c_index is None:
185 raise RuntimeError("I2C bus not initialized.")
187 end = end if end else len(buffer)
189 write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE
190 stop_flag = 0x01 if stop else 0x00
192 while (end - start) > 0:
193 remain_bytes = end - start
194 chunk = min(remain_bytes, 64 - 7)
195 resp = self._hid_xfer(
196 bytes([write_cmd, address, stop_flag])
197 + remain_bytes.to_bytes(4, byteorder="little")
198 + buffer[start : (start + chunk)],
201 if resp[1] != self.RESP_OK:
202 raise RuntimeError("I2C write error")
205 def _i2c_read(self, address, buffer, start=0, end=None):
206 """Read data from an address and into the buffer"""
207 # TODO: support chunkified reads
208 if self._i2c_index is None:
209 raise RuntimeError("I2C bus not initialized.")
211 end = end if end else len(buffer)
213 read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ
214 stop_flag = 0x01 # always stop
215 read_size = end - start
217 resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True)
218 if resp[1] != self.RESP_OK:
219 raise RuntimeError("I2C write error")
221 for i in range(read_size):
222 buffer[start + i] = resp[i + 2]
224 def i2c_writeto(self, address, buffer, *, start=0, end=None):
225 """Write data from the buffer to an address"""
226 self._i2c_write(address, buffer, start, end)
228 def i2c_readfrom_into(self, address, buffer, *, start=0, end=None):
229 """Read data from an address and into the buffer"""
230 self._i2c_read(address, buffer, start, end)
232 def i2c_writeto_then_readfrom(
243 """Write data from buffer_out to an address and then
244 read data from an address and into buffer_in
246 self._i2c_write(address, out_buffer, out_start, out_end, False)
247 self._i2c_read(address, in_buffer, in_start, in_end)
249 def i2c_scan(self, *, start=0, end=0x79):
250 """Perform an I2C Device Scan"""
251 if self._i2c_index is None:
252 raise RuntimeError("I2C bus not initialized.")
254 for addr in range(start, end + 1):
257 self.i2c_writeto(addr, b"\x00\x00\x00")
258 except RuntimeError: # no reply!
264 # ----------------------------------------------------------------
266 # ----------------------------------------------------------------
267 def spi_configure(self, baudrate):
269 if self._spi_index is None:
270 raise RuntimeError("SPI bus not initialized.")
272 resp = self._hid_xfer(
275 self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT,
276 0x00, # mode, not yet implemented
279 + baudrate.to_bytes(4, byteorder="little"),
282 if resp[1] != self.RESP_OK:
283 raise RuntimeError("SPI init error.")
285 def spi_set_port(self, index):
287 if index not in (0, 1):
288 raise ValueError("SPI index must be 0 or 1.")
289 self._spi_index = index
291 def spi_write(self, buffer, *, start=0, end=None):
293 if self._spi_index is None:
294 raise RuntimeError("SPI bus not initialized.")
296 end = end if end else len(buffer)
298 write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE
300 while (end - start) > 0:
301 remain_bytes = end - start
302 chunk = min(remain_bytes, 64 - 3)
303 resp = self._hid_xfer(
304 bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True
306 if resp[1] != self.RESP_OK:
307 raise RuntimeError("SPI write error")
310 def spi_readinto(self, buffer, *, start=0, end=None, write_value=0):
312 if self._spi_index is None:
313 raise RuntimeError("SPI bus not initialized.")
315 end = end if end else len(buffer)
316 read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ
317 read_size = end - start
319 resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True)
320 if resp[1] != self.RESP_OK:
321 raise RuntimeError("SPI write error")
323 for i in range(read_size):
324 buffer[start + i] = resp[i + 2]
326 def spi_write_readinto(
336 """SPI write and readinto."""
337 raise NotImplementedError("SPI write_readinto Not implemented")
339 # ----------------------------------------------------------------
341 # ----------------------------------------------------------------
342 def neopixel_write(self, gpio, buf):
343 """NeoPixel write."""
344 # open serial (data is sent over this)
345 if self._serial is None:
347 import serial.tools.list_ports
349 ports = serial.tools.list_ports.comports()
351 if port.vid == self.VID and port.pid == self.PID:
352 self._serial = serial.Serial(port.device)
354 if self._serial is None:
355 raise RuntimeError("Could not find Pico com port.")
358 if not self._neopixel_initialized:
359 # deinit any current setup
360 # pylint: disable=protected-access
361 self._hid_xfer(bytes([self.WS2812B_DEINIT]))
362 resp = self._hid_xfer(
371 if resp[1] != self.RESP_OK:
372 raise RuntimeError("Neopixel init error")
373 self._neopixel_initialized = True
376 # command is done over HID
377 remain_bytes = len(buf)
378 resp = self._hid_xfer(
379 bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"),
382 if resp[1] != self.RESP_OK:
383 # pylint: disable=no-else-raise
386 "Neopixel write error : too many pixel for the firmware."
388 elif resp[2] == 0x02:
390 "Neopixel write error : transfer already in progress."
393 raise RuntimeError("Neopixel write error")
394 # buffer is sent over serial
395 self._serial.write(buf)
398 # ----------------------------------------------------------------
400 # ----------------------------------------------------------------
401 # pylint: disable=unused-argument
402 def pwm_configure(self, pin, frequency=500, duty_cycle=0, variable_frequency=False):
405 resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin.id]), True)
406 if resp[1] != self.RESP_OK:
407 raise RuntimeError("PWM init error.")
409 self.pwm_set_frequency(pin, frequency)
410 self.pwm_set_duty_cycle(pin, duty_cycle)
412 def pwm_deinit(self, pin):
414 self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin.id]))
416 def pwm_get_frequency(self, pin):
418 resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin.id]), True)
419 if resp[1] != self.RESP_OK:
420 raise RuntimeError("PWM get frequency error.")
421 return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
423 def pwm_set_frequency(self, pin, frequency):
425 resp = self._hid_xfer(
426 bytes([self.PWM_SET_FREQ, pin.id])
427 + frequency.to_bytes(4, byteorder="little"),
430 if resp[1] != self.RESP_OK:
431 # pylint: disable=no-else-raise
433 raise RuntimeError("PWM different frequency on same slice.")
434 elif resp[3] == 0x02:
435 raise RuntimeError("PWM frequency too low.")
436 elif resp[3] == 0x03:
437 raise RuntimeError("PWM frequency too high.")
439 raise RuntimeError("PWM frequency error.")
441 def pwm_get_duty_cycle(self, pin):
442 """PWM get duty cycle."""
443 resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin.id]), True)
444 if resp[1] != self.RESP_OK:
445 raise RuntimeError("PWM get duty cycle error.")
446 return int.from_bytes(resp[3 : 3 + 4], byteorder="little")
448 def pwm_set_duty_cycle(self, pin, duty_cycle):
449 """PWM set duty cycle."""
450 resp = self._hid_xfer(
451 bytes([self.PWM_SET_DUTY_U16, pin.id])
452 + duty_cycle.to_bytes(2, byteorder="little"),
455 if resp[1] != self.RESP_OK:
456 raise RuntimeError("PWM set duty cycle error.")
459 pico_u2if = Pico_u2if()