Source code for ekfsm.devices.thermal_humidity

from ekfsm.devices.generic import Device
from ekfsm.devices.io4edge import IO4Edge
from ekfsm.devices.utils import retry
from ekfsm.log import ekfsm_logger
from io4edge_client.analogintypeb import Client

logger = ekfsm_logger(__name__)


[docs] class ThermalHumidity(Device): """ Device class for handling a thermal humidity sensor. """ def __init__( self, name: str, parent: IO4Edge, children: list[Device] | None = None, abort: bool = False, service_suffix: str | None = None, *args, **kwargs, ): logger.debug( f"Initializing ThermalHumidity sensor '{name}' with parent device {parent.deviceId}" ) super().__init__(name, parent, children, abort, *args, **kwargs) self.name = name if service_suffix is not None: self.service_suffix = service_suffix logger.debug(f"Using custom service suffix: {service_suffix}") else: self.service_suffix = name logger.debug(f"Using default service suffix: {name}") self.service_addr = f"{parent.deviceId}-{self.service_suffix}" logger.info( f"ThermalHumidity '{name}' configured with service address: {self.service_addr}" ) try: self.client = Client(self.service_addr, connect=False) logger.debug( f"ThermalHumidity client created for service: {self.service_addr}" ) except Exception as e: logger.error( f"Failed to create ThermalHumidity client for {self.service_addr}: {e}" ) raise
[docs] @retry() def temperature(self) -> float: """ Get the temperature in Celsius. Raises ------ RuntimeError if the command fails TimeoutError if the command times out """ logger.info(f"Reading temperature from ThermalHumidity sensor '{self.name}'") try: temp = self.client.value()[0] logger.info(f"ThermalHumidity '{self.name}' temperature: {temp}°C") return temp except Exception as e: logger.error( f"Failed to read temperature from ThermalHumidity '{self.name}': {e}" ) raise
[docs] @retry() def humidity(self) -> float: """ Get the relative humidity in percent. Returns ------- humidity relative humidity Raises ------ RuntimeError if the command fails TimeoutError if the command times out """ logger.info(f"Reading humidity from ThermalHumidity sensor '{self.name}'") try: temp = self.client.value()[1] logger.info(f"ThermalHumidity '{self.name}' humidity: {temp}%") return temp except Exception as e: logger.error( f"Failed to read humidity from ThermalHumidity '{self.name}': {e}" ) raise
def __repr__(self): return f"{self.name}; Service Address: {self.service_addr}"