from typing import TYPE_CHECKING, List
from ekfsm.devices.generic import Device
from ekfsm.devices.utils import retry
from ekfsm.log import ekfsm_logger
from io4edge_client.ssm import Client, Pb
if TYPE_CHECKING:
from ekfsm.devices.io4edge import IO4Edge
logger = ekfsm_logger(__name__)
[docs]
class SSM(Device):
"""
Device class for handling System State Manager (SSM) functionality.
The SSM manages system state transitions and provides watchdog functionality
for io4edge devices.
"""
def __init__(
self,
name: str,
parent: "IO4Edge",
children: List[Device] | None = None,
abort: bool = False,
service_suffix: str | None = None,
keepaliveInterval: int = 10000,
*args,
**kwargs,
):
logger.debug(
"Initializing SSM '%s' with parent device %s", name, parent.deviceId
)
super().__init__(name, parent, children, abort, *args, **kwargs)
self.name = name
if service_suffix is not None:
self.service_suffix = service_suffix
logger.debug("Using custom service suffix: %s", service_suffix)
else:
self.service_suffix = name
logger.debug("Using default service suffix: %s", name)
self.deviceId = parent.deviceId
self.service_addr = f"{self.deviceId}-{self.service_suffix}"
self.timeout = int(keepaliveInterval / 1000 + 5)
logger.info(
"SSM '%s' configured with service address: %s",
name,
self.service_addr,
)
try:
self.client = Client(
self.service_addr, command_timeout=self.timeout, connect=False
)
logger.debug("SSM client created for service: %s", self.service_addr)
except Exception as e:
logger.error("Failed to create SSM client for %s: %s", self.service_addr, e)
raise
[docs]
@retry()
def describe(self) -> Pb.ConfigurationDescribeResponse:
"""
Get the description from the System State Manager functionblock.
Returns
-------
Pb.ConfigurationDescribeResponse
Description from the SSM functionblock
Raises
------
RuntimeError
if the command fails
TimeoutError
if the command times out
ConnectionError
if not connected to the device
"""
logger.info("Getting SSM description for '%s'", self.name)
try:
result = self.client.describe()
logger.debug("SSM '%s' description retrieved successfully", self.name)
return result
except Exception as e:
logger.error("Failed to get SSM '%s' description: %s", self.name, e)
raise
[docs]
@retry()
def kick(self) -> None:
"""
Kick the System State Manager to prevent a timeout.
This method should be called periodically to indicate that the system
is still alive and prevent the watchdog from triggering.
Raises
------
RuntimeError
if the command fails
TimeoutError
if the command times out
ConnectionError
if not connected to the device
"""
logger.debug("Kicking SSM watchdog for '%s'", self.name)
try:
self.client.kick()
logger.debug("SSM '%s' watchdog kicked successfully", self.name)
except Exception as e:
logger.error("Failed to kick SSM '%s' watchdog: %s", self.name, e)
raise
[docs]
@retry()
def state(self) -> Pb.SystemState:
"""
Get the current system state from the SSM.
Returns
-------
Pb.SystemState
Current system state (OFF, ON, ERROR, etc.)
Raises
------
RuntimeError
if the command fails or unhandled response
TimeoutError
if the command times out
ConnectionError
if not connected to the device
"""
logger.info("Getting system state for SSM '%s'", self.name)
try:
state = self.client.state
logger.info("SSM '%s' current state: %s", self.name, state)
return state
except Exception as e:
logger.error("Failed to get SSM '%s' state: %s", self.name, e)
raise
[docs]
@retry()
def error(self, message: str) -> None:
"""
Set the system to error state with the given message.
Parameters
----------
message : str
Error message to log
Raises
------
RuntimeError
if command fails or unhandled response
TimeoutError
if the command times out
ConnectionError
if not connected to the device
InvalidStateError
if operation attempted in an invalid state
UnknownError
if unknown error occurs while setting error state
"""
logger.warning("Setting SSM '%s' to error state: %s", self.name, message)
try:
self.client.error(message)
logger.info("SSM '%s' error state set successfully", self.name)
except Exception as e:
logger.error("Failed to set SSM '%s' error state: %s", self.name, e)
raise
[docs]
@retry()
def resolve(self, message: str) -> None:
"""
Resolve the current error state.
Parameters
----------
message : str
Resolution message
Raises
------
RuntimeError
if command fails or unhandled response
TimeoutError
if the command times out
ConnectionError
if not connected to the device
InvalidStateError
if operation attempted in an invalid state
UnknownError
if unknown error occurs while resolving error
"""
logger.info("Resolving SSM '%s' error state: %s", self.name, message)
try:
self.client.resolve(message)
logger.info("SSM '%s' error resolved successfully", self.name)
except Exception as e:
logger.error("Failed to resolve SSM '%s' error: %s", self.name, e)
raise
[docs]
@retry()
def fatal(self, message: str) -> None:
"""
Signal a fatal error to the SSM.
Parameters
----------
message : str
Fatal error message
Raises
------
RuntimeError
if command fails or unhandled response
TimeoutError
if the command times out
ConnectionError
if not connected to the device
InvalidStateError
if operation attempted in an invalid state
UnknownError
if unknown error occurs signaling fatal error
"""
logger.critical("Setting SSM '%s' fatal error: %s", self.name, message)
try:
self.client.fatal(message)
logger.info("SSM '%s' fatal error set successfully", self.name)
except Exception as e:
logger.error("Failed to set SSM '%s' fatal error: %s", self.name, e)
raise
[docs]
@retry()
def shutdown(self) -> None:
"""
Indicate system shutdown to the SSM.
Raises
------
RuntimeError
if command fails or unhandled response
TimeoutError
if the command times out
ConnectionError
if not connected to the device
InvalidStateError
if operation attempted in an invalid state
UnknownError
if unknown error occurs while signaling shutdown
"""
logger.info("Signaling shutdown to SSM '%s'", self.name)
try:
self.client.shutdown()
logger.info("SSM '%s' shutdown signal sent successfully", self.name)
except Exception as e:
logger.error("Failed to signal shutdown to SSM '%s': %s", self.name, e)
raise
[docs]
@retry()
def on(self) -> None:
"""
Signal ON state to the SSM.
Raises
------
RuntimeError
if command fails or unhandled response
TimeoutError
if the command times out
ConnectionError
if not connected to the device
InvalidStateError
if operation attempted in an invalid state
UnknownError
if unknown error occurs while turning on
"""
logger.info("Turning on SSM '%s'", self.name)
try:
self.client.on()
logger.info("SSM '%s' turned on successfully", self.name)
except Exception as e:
logger.error("Failed to turn on SSM '%s': %s", self.name, e)
raise
[docs]
@retry()
def reboot(self) -> None:
"""
Signal a reboot to the SSM.
Raises
------
RuntimeError
if command fails or unhandled response
TimeoutError
if the command times out
ConnectionError
if not connected to the device
InvalidStateError
if operation attempted in an invalid state
UnknownError
if unknown error occurs while signaling reboot
"""
logger.info("Signaling reboot to SSM '%s'", self.name)
try:
self.client.reboot()
logger.info("SSM '%s' reboot signal sent successfully", self.name)
except Exception as e:
logger.error("Failed to signal reboot to SSM '%s': %s", self.name, e)
raise
def __repr__(self):
return f"SSM({self.name}; Service Address: {self.service_addr})"