Source code for ekfsm.devices.generic

from pathlib import Path
from typing import TYPE_CHECKING, Callable, List

from munch import Munch

from ekfsm.core.components import SysTree
from ekfsm.core.sysfs import SysfsDevice, sysfs_root
from ekfsm.exceptions import ConfigError, SysFSError, UnsupportedModeError

if TYPE_CHECKING:
    from ekfsm.core.components import HWModule


[docs] class Device(SysTree): """ A generic device. """ def __init__( self, name: str, parent: SysTree | None = None, children: List["Device"] | None = None, abort: bool = False, *args, **kwargs, ): super().__init__(name, abort=abort) self.parent = parent self.device_args = kwargs if children: self.children = children # Needs to be set during init because root will be changed after tree is complete self.hw_module = self.root # pyright: ignore[reportAttributeAccessIssue] # I2C initialization if not hasattr(self, "sysfs_device") or self.sysfs_device is None: self.sysfs_device: SysfsDevice | None = None # Post-initialization steps self._provides_attrs = kwargs.get("provides", {}) self.provides = self.__post_init__(Munch(self._provides_attrs), abort) def __post_init__(self, provides: Munch, abort) -> Munch: for key, fields in provides.items(): if isinstance(fields, dict): provides[key] = self.__post_init__(Munch(fields), abort) elif isinstance(fields, (list, str)): provides[key] = Munch() if isinstance(fields, str): fields = [fields] while fields: interface = fields.pop() # TODO: Check if this is superfluous after schema validation if isinstance(interface, dict): name = list(interface.keys())[0] try: func = list(interface.values())[0] except IndexError: raise ConfigError( f"{self.name}: No function given for interface {name}." ) if not hasattr(self, func): if abort: raise NotImplementedError( f"{self.name}: Function {func} for interface {name} not implemented." ) continue provides[key].update({name: getattr(self, func)}) else: if not hasattr(self, interface): if abort: raise NotImplementedError( f"{self.name}: Function {interface} for provider {key} not implemented." ) continue provides[key].update({interface: getattr(self, interface)}) return provides @property def sysfs(self): """ Access sysfs device for device Returns ------- :class:`~ekfsm.core.sysfs.SysfsDevice` Sysfs device for device Raises ------ SysFSError If the SysFSDevice does not exist """ if self.sysfs_device is None: raise SysFSError(f"No sysfs device attached to device {self.name}") return self.sysfs_device
[docs] def read_attr(self, attr: str, mode: str = "utf", strip: bool = True): if mode == "utf": return self.sysfs.read_utf8(attr, strip) elif mode == "bytes": return self.sysfs.read_bytes(attr) elif mode == "float": return self.sysfs.read_float(attr) elif mode == "int": return self.sysfs.read_int(attr) elif mode == "hex": return self.sysfs.read_hex(attr) else: raise UnsupportedModeError(f"Mode {mode} is not supported")
[docs] def read_attr_or_default( self, attr: str, mode: str = "utf", strip: bool = True, default=None ): try: return self.read_attr(attr, mode, strip) except UnsupportedModeError: raise except Exception: if default is not None: return default return None
[docs] def read_sysfs_bytes(self, attr) -> bytes: if len(attr) <= 0: raise SysFSError("sysfs attribute name too short") return self.sysfs.read_bytes(attr)
[docs] def read_sysfs_attr_bytes(self, attr: str) -> bytes | None: """ Read a sysfs attribute as bytes. Parameters ---------- attr The sysfs attribute to read. Returns ------- content: bytes The contents of the sysfs attribute as bytes. None: If the sysfs device is not set or the attribute does not exist. """ if ( self.sysfs_device is not None and len(attr) != 0 and attr in [x.name for x in self.sysfs_device.attributes] ): return self.sysfs_device.read_attr_bytes(attr) return None
[docs] def read_sysfs_attr_utf8(self, attr: str) -> str | None: """ Read a sysfs attribute as UTF-8 string. Parameters ---------- attr The sysfs attribute to read. Returns ------- content: str The contents of the sysfs attribute as UTF-8 string. None: If the sysfs device is not set or the attribute does not exist. """ try: return self.sysfs.read_utf8(attr) except Exception: return None
[docs] def write_sysfs_attr(self, attr: str, data: str | bytes) -> None: """ Write data to a sysfs attribute. Parameters ---------- attr The sysfs attribute to write to. data The data to write to the sysfs attribute. """ if self.sysfs_device and len(attr) != 0: return self.sysfs_device.write_attr(attr, data) return None
@property def hw_module(self) -> "HWModule": """ Get or set the HWModule instance that this device belongs to. Parameters ---------- hw_module: optional The HWModule instance to set. Returns ------- :class:`~ekfsm.core.components.HWModule` The HWModule instance that this device belongs to. None If used as a setter. """ from ekfsm.core.components import HWModule if isinstance(self._hw_module, HWModule): return self._hw_module else: raise RuntimeError("Device is not a child of HWModule") @hw_module.setter def hw_module(self, hw_module: "HWModule") -> None: self._hw_module = hw_module
[docs] def get_i2c_chip_addr(self) -> int: if self.parent is None: raise ConfigError( f"{self.name}: Device must have a parent to get I2C chip address" ) chip_addr = self.device_args.get("addr") if chip_addr is None: raise ConfigError( f"{self.name}: Chip address not provided in board definition" ) if not hasattr(self.parent, "sysfs_device") or self.parent.sysfs_device is None: # our device is the top level device of the slot # compute chip address from board yaml and slot attributes slot_attributes = self.hw_module.slot.attributes if slot_attributes is None: raise ConfigError( f"{self.name}: Slot attributes not provided in system configuration" ) if not self.hw_module.is_master: # slot coding is only used for non-master devices if not hasattr(slot_attributes, "slot_coding"): raise ConfigError( f"{self.name}: Slot coding not provided in slot attributes" ) slot_coding_mask = 0xFF if hasattr(slot_attributes, "slot_coding_mask"): slot_coding_mask = slot_attributes.slot_coding_mask chip_addr |= slot_attributes.slot_coding & slot_coding_mask return chip_addr
[docs] def get_i2c_sysfs_device( self, addr: int, driver_required=True, find_driver: Callable | None = None ) -> SysfsDevice: from ekfsm.core.components import HWModule parent = self.parent if parent is None: raise ConfigError( f"{self.name}: Device must have a parent to get I2C sysfs device" ) # If parent is a HWModule, we can get the i2c bus from the master device # XXX: Does this still hold true after refactoring? if isinstance(parent, HWModule): i2c_bus_path = self.__master_i2c_bus() else: # otherwise the parent must be a MuxChannel from ekfsm.devices.mux import MuxChannel if not isinstance(parent, MuxChannel): raise ConfigError( f"{self.name}: Parent must be MuxChannel when not a HWModule" ) if parent.sysfs_device is None: raise ConfigError( f"{self.name}: Parent MuxChannel must have a sysfs_device" ) i2c_bus_path = parent.sysfs_device.path # search for device with addr for entry in i2c_bus_path.iterdir(): if ( entry.is_dir() and not (entry / "new_device").exists() # skip bus entries and (entry / "name").exists() ): # PRP devices unfortunately do not readily expose the underlying I2C address of the device like # regular I2C devices that follow the `${I2C_BUS}-${ADDR}` pattern. To address this issue, we # initialize the ACPI _STR object for each PRP device with the necessary information, which is # accessible in the `${DEVICE_SYSFS_PATH}/firmware_node/description` file. if (entry / "firmware_node").exists() and ( entry / "firmware_node" / "description" ).exists(): description = ( (entry / "firmware_node/description").read_text().strip() ) acpi_addr = int(description.split(" - ")[0], 16) if acpi_addr == addr: return SysfsDevice(entry, driver_required, find_driver) # For regular non-PRP devices, the address is contained in the directory name (e.g. 2-0018). else: acpi_addr = int(entry.name.split("-")[1], 16) if acpi_addr == addr: return SysfsDevice(entry, driver_required, find_driver) raise FileNotFoundError( f"Device with address 0x{addr:x} not found in {i2c_bus_path}" )
@staticmethod def __master_i2c_get_config(master: "HWModule") -> dict: if ( master.config.get("bus_masters") is not None and master.config["bus_masters"].get("i2c") is not None ): return master.config["bus_masters"]["i2c"] else: raise ConfigError("Master definition incomplete") def __master_i2c_bus(self) -> Path: if self.hw_module.is_master: # we are the master master = self.hw_module master_key = "MASTER_LOCAL_DEFAULT" override_master_key = self.device_args.get("i2c_master", None) if override_master_key is not None: master_key = override_master_key else: # another board is the master if self.hw_module.slot.master is None: raise ConfigError( f"{self.name}: Master board not found in slot attributes" ) master = self.hw_module.slot.master master_key = self.hw_module.slot.slot_type.name i2c_masters = self.__master_i2c_get_config(master) if i2c_masters.get(master_key) is not None: dir = sysfs_root() / Path(i2c_masters[master_key]) bus_dirs = list(dir.glob("i2c-*")) if len(bus_dirs) == 1: return bus_dirs[0] elif len(bus_dirs) > 1: raise ConfigError(f"Multiple master I2C buses found for {master_key}") raise ConfigError(f"No master I2C bus found for {master_key}") else: raise ConfigError(f"Master I2C bus not found for {master_key}")
[docs] def get_i2c_bus_number(self) -> int: """ Get the I2C bus number of the device. Works for devices that do not have a sysfs_device attribute. """ from ekfsm.devices.mux import MuxChannel if isinstance(self, MuxChannel): raise RuntimeError(f"{self.name}: MuxChannel does not have a bus number") if self.sysfs_device is None: if self.parent is None: raise RuntimeError(f"{self.name}: Must have a parent to get bus number") parent_path = self.parent.sysfs_device.path else: parent_path = self.sysfs_device.path.parent if parent_path.is_symlink(): parent_path = parent_path.readlink() bus_number = parent_path.name.split("-")[1] return int(bus_number)
def __repr__(self) -> str: sysfs_path = getattr(self.sysfs_device, "path", "") return f"{self.name}; Path: {sysfs_path}"