from ekfsm.devices.generic import Device
from ekfsm.devices.io4edge import IO4Edge
from ekfsm.devices.utils import retry
from ekfsm.log import ekfsm_logger
from io4edge_client.colorLED import Client, Pb
logger = ekfsm_logger(__name__)
[docs]
class LEDArray(Device):
"""
Device class for handling a LED array.
"""
def __init__(
self,
name: str,
parent: IO4Edge,
children: list[Device] | None = None,
abort: bool = False,
service_suffix: str | None = None,
*args,
**kwargs,
):
logger.debug(
f"Initializing LEDArray '{name}' with parent device {parent.deviceId}"
)
super().__init__(name, parent, children, abort, *args, **kwargs)
self.name = name
if service_suffix is not None:
self.service_suffix = service_suffix
logger.debug(f"Using custom service suffix: {service_suffix}")
else:
self.service_suffix = name
logger.debug(f"Using default service suffix: {name}")
self.service_addr = f"{parent.deviceId}-{self.service_suffix}"
logger.info(
f"LEDArray '{name}' configured with service address: {self.service_addr}"
)
try:
self.client = Client(self.service_addr, connect=False)
logger.debug(f"LEDArray client created for service: {self.service_addr}")
except Exception as e:
logger.error(
f"Failed to create LEDArray client for {self.service_addr}: {e}"
)
raise
def __repr__(self):
return f"{self.name}; Service Address: {self.service_addr}"
[docs]
class ColorLED(Device):
"""
Device class for handling a color LED.
"""
def __init__(
self,
name: str,
parent: "LEDArray",
children: list[Device] | None = None,
abort: bool = False,
channel_id: int = 0,
*args,
**kwargs,
):
logger.debug(f"Initializing ColorLED '{name}' on channel {channel_id}")
super().__init__(name, parent, children, abort, *args, **kwargs)
self.name = name
self.channel_id = channel_id
self.client = parent.client
logger.info(
f"ColorLED '{name}' initialized on channel {channel_id} with parent LEDArray"
)
[docs]
@retry()
def describe(self):
pass
[docs]
@retry()
def get(self) -> tuple[int, int, int, bool]:
"""
Get color LED state.
Returns
-------
Current color and blink state.
Raises
------
RuntimeError
if the command fails
TimeoutError
if the command times out
"""
logger.info(
"Getting color LED state for '%s' on channel %s", self.name, self.channel_id
)
try:
result = self.client.get(self.channel_id)
r, g, b, blink = result
logger.info(
"ColorLED '%s' state: color=(%s, %s, %s), blink=%s",
self.name,
r,
g,
b,
blink,
)
return result
except Exception as e:
logger.error(
"Failed to get ColorLED '%s' state on channel %s: %s",
self.name,
self.channel_id,
e,
)
raise
[docs]
@retry()
def set(
self, color: Pb.Color | Pb.RGBColor | tuple[int, int, int] | str, blink: bool
) -> None:
"""
Set the color of the color LED.
Parameters
----------
color : Pb.Color | Pb.RGBColor | tuple[int, int, int] | str
Color to set the LED to, given as Pb.Color, Pb.RGBColor, a tuple of
(red, green, blue) values, or a string giving a hex value (e.g. "#FF0000" for red).
blink : bool
Whether to blink the LED.
Raises
------
RuntimeError
if the command fails
TimeoutError
if the command times out
ValueError
if the color parameter is invalid or values are out of range
"""
logger.info(
f"Setting ColorLED '{self.name}' on channel {self.channel_id}: color={color}, blink={blink}"
)
try:
self.client.set(self.channel_id, color, blink)
logger.debug(
f"ColorLED '{self.name}' successfully set to color={color}, blink={blink}"
)
except Exception as e:
logger.error(
f"Failed to set ColorLED '{self.name}' on channel {self.channel_id}: {e}"
)
raise
def __repr__(self):
return f"{self.name}; Channel ID: {self.channel_id}"