Source code for ekfsm.devices.io4edge

from typing import Callable, List
from ekfsm.core.components import HWModule
from ekfsm.devices.generic import Device
from ekfsm.log import ekfsm_logger
import io4edge_client.core.coreclient as CClient
from io4edge_client.binaryiotypeb import Client

from re import sub

logger = ekfsm_logger(__name__)


[docs] class IO4Edge(Device): """ Device class for handling IO4Edge devices. See https://docs.ci4rail.com/user-docs/io4edge/ for more information. """ def __init__( self, name: str, parent: HWModule | None = None, children: List[Device] | None = None, abort: bool = False, *args, **kwargs, ): logger.debug("Initializing IO4Edge device '%s'", name) super().__init__(name, parent, children, abort, *args, **kwargs) attr = self.hw_module.slot.attributes if ( attr is None or not hasattr(attr, "slot_coding") or getattr(attr, "slot_coding") is None ): logger.error( "Slot attributes for %s are not set or do not contain 'slot_coding'", self.hw_module.slot.name, ) raise ValueError( f"Slot attributes for {self.hw_module.slot.name} are not set or do not contain 'slot_coding'." ) else: geoaddr = int(attr.slot_coding) self._geoaddr = geoaddr logger.debug("IO4Edge '%s' geo address: %s", name, geoaddr) _, module_name = sub(r"-.*$", "", self.hw_module.board_type).split(maxsplit=1) self._module_name = module_name logger.debug("IO4Edge '%s' module name: %s", name, module_name) try: self.client = CClient.new_core_client(self.deviceId, connect=False) logger.info( "IO4Edge '%s' initialized with device ID: %s", name, self.deviceId ) except Exception as e: logger.error("Failed to create IO4Edge core client for '%s': %s", name, e) raise @property def deviceId(self) -> str: """ Returns the device ID for the IO4Edge device. The device ID is a combination of the module name and the geo address. """ return f"{self._module_name}-geo_addr{self._geoaddr:02d}"
[docs] def identify_firmware(self) -> tuple[str, str]: """ Identify the firmware on the IO4Edge device. Returns ------- A tuple containing the firmware title and version. """ response = self.client.identify_firmware() return ( response.title, response.version, )
[docs] def load_firmware( self, cfg: bytes, progress_callback: Callable[[float], None] | None = None ) -> None: """ Load firmware onto the IO4Edge device. cfg Firmware configuration bytes. progress_callback Optional callback for progress updates. """ self.client.load_firmware(cfg, progress_callback)
[docs] def restart(self) -> None: """ Restart the IO4Edge device. Important --------- This will disconnect the client from the device. """ self.client.restart()
[docs] def load_parameter(self, name: str, value: str) -> None: """ Set a parameter onto the IO4Edge device. cfg The name of the parameter to load. value The value to set for the parameter. """ self.client.set_persistent_parameter(name, value)
[docs] def get_parameter(self, name: str) -> str: """ Get a parameter value from the IO4Edge device. Returns The value of the requested parameter. """ return self.client.get_persistent_parameter(name)
def __repr__(self): return f"{self.name}; DeviceId: {self.deviceId}"
[docs] class GPIOArray(Device): """ Device class for handling an io4edge GPIO array. """ def __init__( self, name: str, parent: IO4Edge, children: list[Device] | None = None, abort: bool = False, service_suffix: str | None = None, keepaliveInterval: int = 10000, *args, **kwargs, ): logger.debug( "Initializing GPIOArray '%s' with parent device %s", name, parent.deviceId ) super().__init__(name, parent, children, abort, *args, **kwargs) self.name = name if service_suffix is not None: self.service_suffix = service_suffix logger.debug("Using custom service suffix: %s", service_suffix) else: self.service_suffix = name logger.debug("Using default service suffix: %s", name) self.deviceId = parent.deviceId self.service_addr = f"{self.deviceId}-{self.service_suffix}" self.timeout = int(keepaliveInterval / 1000 + 5) logger.info( "GPIOArray '%s' configured with service address: %s", name, self.service_addr, ) try: self.client = Client( self.service_addr, command_timeout=self.timeout, connect=False ) logger.debug("IO4Edge client created for service: %s", self.service_addr) except Exception as e: logger.error( "Failed to create IO4Edge client for %s: %s", self.service_addr, e ) raise def __repr__(self): return f"{self.name}; Service Address: {self.service_addr}"