Source code for luna_soc.gateware.core.usb2.ep_control

#
# This file is part of LUNA.
#
# Copyright (c) 2020-2025 Great Scott Gadgets <info@greatscottgadgets.com>
# SPDX-License-Identifier: BSD-3-Clause

""" Implementation of a Triple-FIFO endpoint manager.

Equivalent (but not binary-compatbile) implementation of ValentyUSB's ``eptri``.

For an example, see ``examples/usb/eptri`` or TinyUSB's ``luna/dcd_eptri.c``.
"""

from typing                           import Annotated

from amaranth                         import *
from amaranth.hdl.xfrm                import ResetInserter, DomainRenamer
from amaranth.lib                     import wiring
from amaranth.lib.fifo                import SyncFIFOBuffered
from amaranth.lib.wiring              import In, Out, connect, flipped

from amaranth_soc                     import csr, event

from luna.gateware.usb.usb2.endpoint  import EndpointInterface


[docs] class Peripheral(wiring.Component): """ Setup component of our `eptri`-equivalent interface. Implements the USB Setup FIFO, which handles SETUP packets on any endpoint. This interface is similar to an :class:`OutFIFOInterface`, but always ACKs packets, and does not allow for any flow control; as a USB device must always be ready to accept control packets. [USB2.0: 8.6.1] Attributes ---------- interface: EndpointInterface Our primary interface to the core USB device hardware. """
[docs] class Control(csr.Register, access="w"): """ Control register address: Controls the current device's USB address. Should be written after a SET_ADDRESS request is received. Automatically resets back to zero on a USB reset. """ address : csr.Field(csr.action.W, unsigned(8))
[docs] class Status(csr.Register, access="r"): """ Status register address: Holds the current device's active USB address. epno: The endpoint number associated with the most recently captured SETUP packet. have: `1` iff data is available in the FIFO. """ address : csr.Field(csr.action.R, unsigned(8)) epno : csr.Field(csr.action.R, unsigned(4)) have : csr.Field(csr.action.R, unsigned(1)) _0 : csr.Field(csr.action.ResRAW0, unsigned(3))
[docs] class Reset(csr.Register, access="w"): """ Reset register fifo: Local reset control for the SETUP handler; writing a '1' to this register clears the handler state. """ fifo : csr.Field(csr.action.W, unsigned(1)) _0 : csr.Field(csr.action.ResRAW0, unsigned(7))
[docs] class Data(csr.Register, access="r"): """ Data register A FIFO that returns the bytes from the most recently captured SETUP packet. Reading a byte from this register advances the FIFO. The first eight bytes read from this contain the core SETUP packet. """ byte : csr.Field(csr.action.R, unsigned(8))
def __init__(self): # I/O port FIXME ambiguity - private or signature ? self.interface = EndpointInterface() # registers regs = csr.Builder(addr_width=4, data_width=8) self._control = regs.add("control", self.Control()) self._status = regs.add("status", self.Status()) self._reset = regs.add("reset", self.Reset()) self._data = regs.add("data", self.Data()) self._bridge = csr.Bridge(regs.as_memory_map()) # events from typing import Annotated EventSource = Annotated[event.Source, "Interrupt that triggers when a new SETUP packet is ready to be read."] self._setup_received = EventSource(trigger="rise", path=("setup_received",)) event_map = event.EventMap() event_map.add(self._setup_received) self._events = csr.event.EventMonitor(event_map, data_width=8) # csr decoder self._decoder = csr.Decoder(addr_width=5, data_width=8) self._decoder.add(self._bridge.bus) self._decoder.add(self._events.bus, name="ev") super().__init__({ "bus": Out(self._decoder.bus.signature), "irq": Out(unsigned(1)), }) self.bus.memory_map = self._decoder.bus.memory_map
[docs] def elaborate(self, platform): m = Module() m.submodules += [self._bridge, self._events, self._decoder] # connect bus connect(m, flipped(self.bus), self._decoder.bus) # Shortcuts to our components. interface = self.interface token = self.interface.tokenizer rx = self.interface.rx handshakes_out = self.interface.handshakes_out # Logic condition for getting a new setup packet. new_setup = token.new_token & token.is_setup reset_requested = self._reset.f.fifo.w_stb & self._reset.f.fifo.w_data clear_fifo = new_setup | reset_requested # # Core FIFO. # m.submodules.fifo = fifo = ResetInserter(clear_fifo)(SyncFIFOBuffered(width=8, depth=8)) m.d.comb += [ # We'll write to the active FIFO whenever the last received token is a SETUP # token, and we have incoming data; and we'll always write the data received fifo.w_en .eq(token.is_setup & rx.valid & rx.next), fifo.w_data .eq(rx.payload), # We'll advance the FIFO whenever our CPU reads from the data CSR; # and we'll always read our data from the FIFO. fifo.r_en .eq(self._data.f.byte.r_stb), self._data.f.byte.r_data .eq(fifo.r_data), # Pass the FIFO status on to our CPU. self._status.f.have.r_data .eq(fifo.r_rdy), # Always acknowledge SETUP packets as they arrive. handshakes_out.ack .eq(token.is_setup & interface.rx_ready_for_response), # Trigger a SETUP event as we ACK the setup packet, since that's also the point # where we know we're done receiving data. self._setup_received.i .eq(handshakes_out.ack) ] # control registers with m.If(self._control.f.address.w_stb): m.d.comb += [ interface.address_changed .eq(1), interface.new_address .eq(self._control.f.address.w_data), ] # status registers m.d.comb += self._status.f.address.r_data.eq(interface.active_address) with m.If(token.new_token & token.is_setup): m.d.usb += self._status.f.epno.r_data.eq(token.endpoint) # connect events to irq line m.d.comb += self.irq.eq(self._events.src.i) return DomainRenamer({"sync": "usb"})(m)