Source code for ekfsm.devices.leds

from ekfsm.devices.generic import Device
from ekfsm.devices.io4edge import IO4Edge
from ekfsm.devices.utils import retry
from ekfsm.log import ekfsm_logger
from io4edge_client.colorLED import Client, Pb

logger = ekfsm_logger(__name__)


[docs] class LEDArray(Device): """ Device class for handling a LED array. """ def __init__( self, name: str, parent: IO4Edge, children: list[Device] | None = None, abort: bool = False, service_suffix: str | None = None, *args, **kwargs, ): logger.debug( f"Initializing LEDArray '{name}' with parent device {parent.deviceId}" ) super().__init__(name, parent, children, abort, *args, **kwargs) self.name = name if service_suffix is not None: self.service_suffix = service_suffix logger.debug(f"Using custom service suffix: {service_suffix}") else: self.service_suffix = name logger.debug(f"Using default service suffix: {name}") self.service_addr = f"{parent.deviceId}-{self.service_suffix}" logger.info( f"LEDArray '{name}' configured with service address: {self.service_addr}" ) try: self.client = Client(self.service_addr, connect=False) logger.debug(f"LEDArray client created for service: {self.service_addr}") except Exception as e: logger.error( f"Failed to create LEDArray client for {self.service_addr}: {e}" ) raise def __repr__(self): return f"{self.name}; Service Address: {self.service_addr}"
[docs] class ColorLED(Device): """ Device class for handling a color LED. """ def __init__( self, name: str, parent: "LEDArray", children: list[Device] | None = None, abort: bool = False, channel_id: int = 0, *args, **kwargs, ): logger.debug(f"Initializing ColorLED '{name}' on channel {channel_id}") super().__init__(name, parent, children, abort, *args, **kwargs) self.name = name self.channel_id = channel_id self.client = parent.client logger.info( f"ColorLED '{name}' initialized on channel {channel_id} with parent LEDArray" )
[docs] @retry() def describe(self): pass
[docs] @retry() def get(self) -> tuple[int, int, int, bool]: """ Get color LED state. Returns ------- Current color and blink state. Raises ------ RuntimeError if the command fails TimeoutError if the command times out """ logger.info( "Getting color LED state for '%s' on channel %s", self.name, self.channel_id ) try: result = self.client.get(self.channel_id) r, g, b, blink = result logger.info( "ColorLED '%s' state: color=(%s, %s, %s), blink=%s", self.name, r, g, b, blink, ) return result except Exception as e: logger.error( "Failed to get ColorLED '%s' state on channel %s: %s", self.name, self.channel_id, e, ) raise
[docs] @retry() def set( self, color: Pb.Color | Pb.RGBColor | tuple[int, int, int] | str, blink: bool ) -> None: """ Set the color of the color LED. Parameters ---------- color : Pb.Color | Pb.RGBColor | tuple[int, int, int] | str Color to set the LED to, given as Pb.Color, Pb.RGBColor, a tuple of (red, green, blue) values, or a string giving a hex value (e.g. "#FF0000" for red). blink : bool Whether to blink the LED. Raises ------ RuntimeError if the command fails TimeoutError if the command times out ValueError if the color parameter is invalid or values are out of range """ logger.info( f"Setting ColorLED '{self.name}' on channel {self.channel_id}: color={color}, blink={blink}" ) try: self.client.set(self.channel_id, color, blink) logger.debug( f"ColorLED '{self.name}' successfully set to color={color}, blink={blink}" ) except Exception as e: logger.error( f"Failed to set ColorLED '{self.name}' on channel {self.channel_id}: {e}" ) raise
def __repr__(self): return f"{self.name}; Channel ID: {self.channel_id}"