1 """SPI Class for Binho Nova"""
 
   2 from adafruit_blinka.microcontroller.nova import Connection
 
   6     """Custom SPI Class for Binho Nova"""
 
   9     BUFFER_PAYLOAD_MAX_LENGTH = 64
 
  10     WHR_PAYLOAD_MAX_LENGTH = 1024
 
  12     def __init__(self, clock):
 
  13         self._nova = Connection.getInstance()
 
  14         self._nova.setNumericalBase(10)
 
  15         self._nova.setOperationMode(0, "SPI")
 
  16         self._nova.setClockSPI(0, clock)
 
  17         self._nova.setModeSPI(0, 0)
 
  18         self._nova.setIOpinMode(0, "DOUT")
 
  19         self._nova.setIOpinMode(1, "DOUT")
 
  20         self._nova.beginSPI(0)
 
  21         self._novaCMDVer = "0"
 
  22         if hasattr(self._nova, "getCommandVer"):
 
  23             response = self._nova.getCommandVer().split(" ")
 
  24             if response[0] != "-NG":
 
  25                 self._novaCMDVer = response[1]
 
  27         # Cpol and Cpha set by mode
 
  35         """Close Nova on delete"""
 
  38     # pylint: disable=too-many-arguments,unused-argument
 
  50         """Initialize the Port"""
 
  51         self._nova.setClockSPI(0, baudrate)
 
  52         self._nova.setModeSPI(0, (polarity << 1) | (phase))
 
  54     # pylint: enable=too-many-arguments,unused-argument
 
  57     def get_received_data(lineOutput):
 
  58         """Return any received data"""
 
  59         return lineOutput.split("RXD ")[1]
 
  63         """Return the current frequency"""
 
  64         return self._nova.getClockSPI(0).split("CLK ")[1]
 
  66     def write(self, buf, start=0, end=None):
 
  67         """Write data from the buffer to SPI"""
 
  68         end = end if end else len(buf)
 
  69         payloadMaxLength = self.BUFFER_PAYLOAD_MAX_LENGTH
 
  70         if int(self._novaCMDVer) >= 1:
 
  71             payloadMaxLength = self.WHR_PAYLOAD_MAX_LENGTH
 
  72         chunks, rest = divmod(end - start, payloadMaxLength)
 
  74         for i in range(chunks):
 
  75             chunk_start = start + i * payloadMaxLength
 
  76             chunk_end = chunk_start + payloadMaxLength
 
  77             if int(self._novaCMDVer) >= 1:
 
  78                 self._nova.writeToReadFromSPI(
 
  79                     0, True, False, chunk_end - chunk_start, buf[chunk_start:chunk_end]
 
  82                 self._nova.clearBuffer(0)
 
  83                 self._nova.writeToBuffer(0, 0, buf[chunk_start:chunk_end])
 
  84                 self._nova.transferBufferSPI(0, chunk_end - chunk_start + 1)
 
  86             if int(self._novaCMDVer) >= 1:
 
  87                 self._nova.writeToReadFromSPI(0, True, False, rest, buf[-1 * rest :])
 
  89                 self._nova.clearBuffer(0)
 
  90                 self._nova.writeToBuffer(0, 0, buf[-1 * rest :])
 
  91                 self._nova.transferBufferSPI(0, rest)
 
  93     def readinto(self, buf, start=0, end=None, write_value=0):
 
  94         """Read data from SPI and into the buffer"""
 
  95         end = end if end else len(buf)
 
  96         if int(self._novaCMDVer) >= 1:
 
  97             chunks, rest = divmod(end - start, self.WHR_PAYLOAD_MAX_LENGTH)
 
  99             for i in range(chunks):
 
 100                 chunk_start = start + i * self.WHR_PAYLOAD_MAX_LENGTH
 
 101                 chunk_end = chunk_start + self.WHR_PAYLOAD_MAX_LENGTH
 
 102                 result = self._nova.writeToReadFromSPI(
 
 103                     0, False, True, chunk_end - chunk_start, write_value
 
 106                     resp = result.split(" ")
 
 108                     # loop over half of resp len as we're reading 2 chars at a time to form a byte
 
 109                     loops = int(len(resp) / 2)
 
 110                     for j in range(loops):
 
 111                         buf[(i * self.WHR_PAYLOAD_MAX_LENGTH) + start + j] = int(
 
 112                             resp[j * 2] + resp[j * 2 + 1], 16
 
 116                         "Received error response from Binho Nova, result = " + result
 
 119                 result = self._nova.writeToReadFromSPI(
 
 120                     0, False, True, rest, write_value
 
 123                     resp = result.split(" ")
 
 126                     # loop over half of resp len as we're reading 2 chars at a time to form a byte
 
 127                     loops = int(len(resp) / 2)
 
 128                     for j in range(loops):
 
 129                         buf[(i * self.WHR_PAYLOAD_MAX_LENGTH) + start + j] = int(
 
 130                             resp[j * 2] + resp[j * 2 + 1], 16
 
 134                         "Received error response from Binho Nova, result = " + result
 
 137             for i in range(start, end):
 
 138                 buf[start + i] = int(
 
 139                     self.get_received_data(self._nova.transferSPI(0, write_value))
 
 142     # pylint: disable=too-many-arguments,too-many-locals,too-many-branches
 
 144         self, buffer_out, buffer_in, out_start=0, out_end=None, in_start=0, in_end=None
 
 146         """Perform a half-duplex write from buffer_out and then
 
 147         read data into buffer_in
 
 149         out_end = out_end if out_end else len(buffer_out)
 
 150         in_end = in_end if in_end else len(buffer_in)
 
 151         readlen = in_end - in_start
 
 152         writelen = out_end - out_start
 
 153         if readlen > writelen:
 
 154             # resize out and pad with 0's
 
 155             tmp = bytearray(buffer_out)
 
 156             tmp.extend([0] * (readlen - len(buffer_out)))
 
 159         if int(self._novaCMDVer) >= 1:
 
 160             chunks, rest = divmod(len(buffer_out), self.WHR_PAYLOAD_MAX_LENGTH)
 
 162             for i in range(chunks):
 
 163                 chunk_start = out_start + i * self.WHR_PAYLOAD_MAX_LENGTH
 
 164                 chunk_end = chunk_start + self.WHR_PAYLOAD_MAX_LENGTH
 
 165                 result = self._nova.writeToReadFromSPI(
 
 169                     chunk_end - chunk_start,
 
 170                     buffer_out[chunk_start:chunk_end],
 
 174                     resp = result.split(" ")
 
 177                     # loop over half of resp len as we're reading 2 chars at a time to form a byte
 
 178                     loops = int(len(resp) / 2)
 
 179                     for j in range(loops):
 
 181                             (i * self.WHR_PAYLOAD_MAX_LENGTH) + in_start + j
 
 182                         ] = int(resp[j * 2] + resp[j * 2 + 1], 16)
 
 185                         "Received error response from Binho Nova, result = " + result
 
 188                 result = self._nova.writeToReadFromSPI(
 
 189                     0, True, True, rest, buffer_out[-1 * rest :]
 
 192                     resp = result.split(" ")
 
 195                     # loop over half of resp len as we're reading 2 chars at a time to form a byte
 
 196                     loops = int(len(resp) / 2)
 
 197                     for j in range(loops):
 
 199                             (i * self.WHR_PAYLOAD_MAX_LENGTH) + in_start + j
 
 200                         ] = int(resp[j * 2] + resp[j * 2 + 1], 16)
 
 203                         "Received error response from Binho Nova, result = " + result
 
 207             for data_out in buffer_out:
 
 209                     self.get_received_data(self._nova.transferSPI(0, data_out))
 
 212                     buffer_in[in_start + i] = data_in
 
 215     # pylint: enable=too-many-arguments,too-many-locals,too-many-branches