Source code for ekfsm.devices.buttons
import threading
from typing import Callable, List
from ekfsm.devices.utils import retry
from io4edge_client.binaryiotypeb import Pb
from ekfsm.devices.generic import Device
from ekfsm.devices.io4edge import GPIOArray
from ekfsm.log import ekfsm_logger
import io4edge_client.functionblock as fb
logger = ekfsm_logger(__name__)
[docs]
class Button(Device):
"""
Device class for handling a single button as part on array.
"""
def __init__(
self,
name: str,
parent: Device,
children: List[Device] | None = None,
abort: bool = False,
channel_id: int = 0,
*args,
**kwargs,
):
logger.debug(f"Initializing Button '{name}' on channel {channel_id}")
super().__init__(name, parent, children, abort, *args, **kwargs)
self.channel_id = channel_id
logger.debug(f"Button '{name}' assigned to channel {channel_id}")
self._handler: Callable | None = None
logger.info(f"Button '{name}' initialized on channel {channel_id}")
@property
def handler(self):
"""
Handle button events with a callback function.
"""
return self._handler
@handler.setter
def handler(self, func: Callable | None, *args, **kwargs):
"""
Handle button events with a callback function.
Parameters
----------
func : Callable | None
The function to call on button events. If None, no function is called.
"""
if callable(func):
self._handler = func
logger.info(
f"Handler set for button '{self.name}' on channel {self.channel_id}"
)
logger.debug(
f"Handler function: {func.__name__ if hasattr(func, '__name__') else str(func)}"
)
else:
self._handler = None
logger.debug(
f"Handler cleared for button '{self.name}' on channel {self.channel_id}"
)
def __repr__(self):
return f"{self.name}; Channel ID: {self.channel_id}"
[docs]
class ButtonArray(Device):
"""
Device class for handling an io4edge gpio based button array.
To read button events, call the `read` method in a separate thread.
Note
----
Button handlers are called in the context of the `read` method's thread and need to be set in the Button instances.
"""
def __init__(
self,
name: str,
parent: GPIOArray,
children: List[Device] | None = None,
abort: bool = False,
keepaliveInterval: int = 10000,
*args,
**kwargs,
):
logger.debug(
f"Initializing ButtonArray '{name}' with parent device {parent.deviceId}"
)
super().__init__(name, parent, children, abort, *args, **kwargs)
self.name = name
self.service_addr = parent.service_addr
self.client = parent.client
logger.info(
f"ButtonArray '{name}' configured with service address: {self.service_addr}"
)
self.subscriptionType = Pb.SubscriptionType.BINARYIOTYPEB_ON_RISING_EDGE
self.stream_cfg = fb.Pb.StreamControlStart(
bucketSamples=1, # 1 sample per bucket, also ein event pro bucket
keepaliveInterval=keepaliveInterval,
bufferedSamples=2, # 2 samples werden gepuffert
low_latency_mode=True, # schickt soweit moeglich sofort die Events
)
logger.debug(
"Stream configuration initialized with rising edge subscription and low latency mode"
)
# Log button children count
button_count = sum(1 for child in (children or []) if isinstance(child, Button))
logger.info(f"ButtonArray '{name}' initialized with {button_count} button(s)")
[docs]
def read(self, stop_event: threading.Event | None = None, timeout: float = 1):
"""
Read all button events and dispatch to handlers.
Parameters
----------
stop_event : threading.Event, optional
Event to signal stopping the reading loop. If None, the loop will run indefinitely.
timeout : float, optional
Timeout for reading from the stream in seconds. Default is 0.1 seconds.
Note
----
This method blocks and should be run in a separate thread.
"""
button_channels = [
button for button in self.children if isinstance(button, Button)
]
if not button_channels:
logger.warning(
f"No button children found in ButtonArray '{self.name}', read operation will have no effect"
)
return
logger.info(
f"Starting button event reading for {len(button_channels)} buttons on '{self.name}'"
)
logger.debug(
f"Read timeout: {timeout}s, stop_event provided: {stop_event is not None}"
)
# Prepare subscription channels
subscribe_channels = tuple(
Pb.SubscribeChannel(
channel=button.channel_id,
subscriptionType=self.subscriptionType,
)
for button in button_channels
)
channel_ids = [button.channel_id for button in button_channels]
try:
self._button_event_handling(
stop_event,
timeout,
subscribe_channels,
button_channels,
channel_ids,
)
except Exception as e:
logger.error(
f"Failed to establish connection or start stream for ButtonArray '{self.name}': {e}"
)
raise
@retry()
def _button_event_handling(
self,
stop_event: threading.Event | None,
timeout: float,
subscribe_channels: tuple,
button_channels: list,
channel_ids: list,
):
with self.client as client:
logger.debug(f"IO4Edge client connected to service: {self.service_addr}")
logger.debug(
f"Subscribing to {len(subscribe_channels)} button channels: {channel_ids}"
)
client.start_stream(
Pb.StreamControlStart(subscribeChannel=subscribe_channels),
self.stream_cfg,
)
logger.info(f"Button event stream started for ButtonArray '{self.name}'")
event_count = 0
try:
while not (stop_event and stop_event.is_set()):
try:
_, samples = client.read_stream(timeout=timeout)
for sample in samples.samples:
for button in button_channels:
pressed = bool(sample.inputs & (1 << button.channel_id))
if pressed:
event_count += 1
button_name = getattr(button, "name", "unnamed")
logger.debug(
f"Button press on channel {button.channel_id} ({button_name})"
)
if button.handler:
try:
logger.debug(
f"Calling handler for button on channel {button.channel_id}"
)
button.handler()
except Exception as e:
logger.error(
f"Error in button handler for channel {button.channel_id}: {e}"
)
else:
logger.debug(
f"No handler set for button on channel {button.channel_id}"
)
except TimeoutError:
# Timeout is expected during normal operation
continue
except Exception as e:
logger.error(f"Error reading button events from stream: {e}")
break
except KeyboardInterrupt:
logger.info(f"Button reading interrupted for ButtonArray '{self.name}'")
finally:
logger.info(
f"Button event reading stopped for '{self.name}' after processing {event_count} events"
)
if stop_event:
stop_event.clear()
logger.debug("Stop event cleared")
def __repr__(self):
return f"{self.name}; Service Address: {self.service_addr}"