Source code for luna_soc.gateware.core.spiflash.port

#
# This file is part of LUNA.
#
# Copyright (c) 2024 Great Scott Gadgets <info@greatscottgadgets.com>
# SPDX-License-Identifier: BSD-3-Clause

# Based on code from LiteSPI

from amaranth               import Module, Signal, Cat, EnableInserter
from amaranth.lib.fifo      import AsyncFIFO
from amaranth.lib.cdc       import FFSynchronizer
from amaranth.lib           import wiring
from amaranth.lib.wiring    import In, Out, connect

from .utils                 import RoundRobin


#
# Signatures.
#

[docs] class StreamCore2PHY(wiring.Signature): def __init__(self, data_width): self._data_width = data_width members = { "data": Out(data_width), "len": Out(6), "width": Out(4), "mask": Out(8), "valid": Out(1), "ready": In(1), } super().__init__(members) @property def data_width(self): return self._data_width def __eq__(self, other): return isinstance(other, StreamCore2PHY) and self.data_width == other.data_width def __repr__(self): return f"StreamCore2PHY({self.data_width})"
[docs] def create(self, *, path=None, src_loc_at=0): return StreamCore2PHYInterface(self, path=path, src_loc_at=1 + src_loc_at)
[docs] class StreamCore2PHYInterface(wiring.PureInterface): @property def payload(self): """ Joint signal with all data. """ return Cat(self.data, self.len, self.width, self.mask)
[docs] class StreamPHY2Core(wiring.Signature): def __init__(self, data_width): self._data_width = data_width members = { "data": Out(data_width), "valid": Out(1), "ready": In(1), } super().__init__(members) @property def data_width(self): return self._data_width def __eq__(self, other): return isinstance(other, StreamPHY2Core) and self.data_width == other.data_width def __repr__(self): return f"StreamPHY2Core({self.data_width})"
[docs] def create(self, *, path=None, src_loc_at=0): return StreamPHY2CoreInterface(self, path=path, src_loc_at=1 + src_loc_at)
[docs] class StreamPHY2CoreInterface(wiring.PureInterface): @property def payload(self): """ Joint signal with all data. """ return self.data
[docs] class SPIControlPort(wiring.Signature): def __init__(self, data_width): super().__init__({ "source": Out(StreamCore2PHY(data_width)), "sink": In(StreamPHY2Core(data_width)), "cs": Out(1), })
# # SPI control port utilities. #
[docs] class SPIControlPortCrossbar(wiring.Component): """ Merge multiple SPIControlPorts with a round-robin scheduler. """ def __init__(self, *, data_width=32, num_ports=1, domain="sync"): self._domain = domain self._num_ports = num_ports super().__init__(dict( controller=Out(SPIControlPort(data_width)), **{f"slave{i}": In(SPIControlPort(data_width)) for i in range(num_ports)} ))
[docs] def get_port(self, index): return getattr(self, f"slave{index}")
[docs] def elaborate(self, platform): m = Module() grant_update = Signal() m.submodules.rr = rr = EnableInserter(grant_update)(RoundRobin(count=self._num_ports)) m.d.comb += rr.requests.eq(Cat(self.get_port(i).cs for i in range(self._num_ports))) # Multiplexer. with m.Switch(rr.grant): for i in range(self._num_ports): with m.Case(i): connect(m, wiring.flipped(self.get_port(i)), wiring.flipped(self.controller)) m.d.comb += grant_update.eq(~rr.valid | ~rr.requests[i]) return m
[docs] class SPIControlPortCDC(wiring.Component): """ Converts one SPIControlPort between clock domains. """ def __init__(self, *, data_width=32, domain_a="sync", domain_b="sync", depth=4): super().__init__({ "a": In(SPIControlPort(data_width)), "b": Out(SPIControlPort(data_width)), }) self.domain_a = domain_a self.domain_b = domain_b self.depth = depth
[docs] def elaborate(self, platform): m = Module() a, b = self.a, self.b tx_cdc = AsyncFIFO(width=len(a.source.payload), depth=self.depth, w_domain=self.domain_a, r_domain=self.domain_b) rx_cdc = AsyncFIFO(width=len(b.sink.data), depth=self.depth, w_domain=self.domain_b, r_domain=self.domain_a) cs_cdc = FFSynchronizer(a.cs, b.cs, o_domain=self.domain_b) m.submodules.tx_cdc = tx_cdc m.submodules.rx_cdc = rx_cdc m.submodules.cs_cdc = cs_cdc m.d.comb += [ # Wire TX path. tx_cdc.w_data .eq(a.source.payload), tx_cdc.w_en .eq(a.source.valid), a.source.ready .eq(tx_cdc.w_rdy), b.source.payload .eq(tx_cdc.r_data), b.source.valid .eq(tx_cdc.r_rdy), tx_cdc.r_en .eq(b.source.ready), # Wire RX path. rx_cdc.w_data .eq(b.sink.data), rx_cdc.w_en .eq(b.sink.valid), b.sink.ready .eq(rx_cdc.w_rdy), a.sink.data .eq(rx_cdc.r_data), a.sink.valid .eq(rx_cdc.r_rdy), rx_cdc.r_en .eq(a.sink.ready), ] return m