Source code for ekfsm.core.utils

from __future__ import annotations

import logging
from pprint import pformat
from typing import TYPE_CHECKING, Any, List

from schema import Optional, Or, Schema, SchemaError, Use
from termcolor import colored

from ekfsm.devices import CLASS_MAP
from ekfsm.exceptions import ConfigError

if TYPE_CHECKING:
    from ekfsm.devices.generic import Device

    from .components import HWModule


[docs] def import_board(logger: logging.Logger, data, parent=None, abort: bool = False): from .components import HWModule device_type = data.get("device_type") nodecls = CLASS_MAP.get(device_type) if nodecls is None: raise ConfigError(f"Unknown device type: {device_type}") children = data.pop("children", []) if parent is not None and isinstance(parent, HWModule): # ??? pass if provides := data.get("provides"): for p in provides: interfaces = data["provides"][p] for interface in interfaces: if isinstance(interface, str): if attr := getattr(nodecls, interface, None): if not callable(attr) and not abort: raise ConfigError("No such method") elif isinstance(interface, dict): for key, value in interface.items(): if attr := getattr(nodecls, value, None): if not callable(attr) and not abort: raise ConfigError("No such method") else: raise ConfigError("Error in board configuration") node = nodecls(parent=parent, abort=abort, **data) if children is not None: for child in children: try: logger.debug(f"Importing sub device {pformat(child)}") import_board(logger, data=child, parent=node, abort=abort) except Exception as e: if abort: logger.error( f"Failed to import sub device {pformat(child)}: {e}. Aborting." ) raise e else: logger.error( f"Failed to import sub device {pformat(child)}: {e}. Continuing anyway." ) return node
[docs] def provides_validator(x: Any) -> Any: if isinstance(x, str): return x elif isinstance(x, dict) and len(x) == 1: key, value = next(iter(x.items())) if isinstance(key, str) and isinstance(value, str): return x raise SchemaError( "Each provides item must be either a string or a dictionary with one string key/value pair" )
device_schema = Schema({}) _device_structure = Schema( { "device_type": str, "name": str, Optional("addr"): int, Optional("slot_coding_mask"): int, Optional("channel_id"): int, Optional("service_suffix"): str, Optional("keepaliveInterval"): int, Optional("provides"): { Optional(str): [Use(provides_validator)], }, Optional("children"): Or(None, [device_schema]), } ) device_schema._schema = _device_structure module_schema = Schema( { "id": Or(int, str), "name": str, "slot_type": str, Optional("children"): [device_schema], Optional("bus_masters"): { Optional("i2c"): dict, }, } )
[docs] def deserialize_module(logger: logging.Logger, data: dict) -> tuple[str, str, str]: """ docstring """ module_schema.validate(data) id, name, slot_type = (data[key] for key in ["id", "name", "slot_type"]) logger.debug(colored(f"Importing top level module {pformat(name)}", "green")) return id, name, slot_type
[docs] def deserialize_hardware_tree( logger: logging.Logger, data: dict, parent: "HWModule" ) -> List["Device"]: abort = parent.abort module_schema.validate(data) children = data.pop("children", None) if not children: return [] devices = [] for child in children: try: logger.debug( colored(f"Importing top level device {pformat(child)}", "green") ) node = import_board(logger, child, parent=parent, abort=abort) devices.append(node) except Exception as e: logger.error( colored( f"Failed to import top-level device {pformat(child)}: {e}", "red" ) ) logger.error(colored("Aborting." if abort else "Continuing anyway", "red")) if abort: raise return devices