from collections.abc import MutableMapping
from pathlib import Path
from typing import Callable, List, Union
from more_itertools import first_true
from ekfsm.exceptions import ConversionError, DriverError, SysFSError
SYSFS_ROOT = Path("/sys")
[docs]
def sysfs_root() -> Path:
return SYSFS_ROOT
[docs]
def set_sysfs_root(path: Path) -> None:
global SYSFS_ROOT
SYSFS_ROOT = path
[docs]
def file_is_sysfs_attr(path: Path) -> bool:
return path.is_file() and not path.stat().st_mode & 0o111
[docs]
class SysFSAttribute:
"""
A SysFSAttribute is a singular sysfs attribute located somewhere in */sys*.
Parameters
----------
path: :class:`~pathlib.Path`
Path to the underlying file for the SysFSAttribute instance.
"""
def __init__(self, path: Path):
if not path.exists() or not path.is_file():
raise FileNotFoundError("Invalid sysfs attribute path")
self.path = path
self.name: str = path.name
[docs]
def read_utf8(self) -> str:
try:
return self.path.read_text()
except OSError as e:
raise SysFSError("Error accessing SysFS attribute") from e
[docs]
def read_bytes(self) -> bytes:
try:
return self.path.read_bytes()
except OSError as e:
raise SysFSError("Error accessing SysFS attribute") from e
# FIXME: This cannot work due to sysfs attributes not supporting seek().
[docs]
def write(self, data: Union[str, bytes, None], offset: int = 0) -> None:
if self.is_sysfs_attr() and data is not None:
mode = "r+" if isinstance(data, str) else "rb+"
try:
with open(self.path, mode) as f:
f.seek(offset)
f.write(data)
except OSError as e:
raise SysFSError("Error accessing SysFS attribute") from e
[docs]
def is_sysfs_attr(self) -> bool:
return file_is_sysfs_attr(self.path)
def __repr__(self):
return f"SysFSAttribute: {self.name}"
[docs]
def list_sysfs_attributes(path: Path) -> List[SysFSAttribute]:
if not path.exists() or not path.is_dir():
raise FileNotFoundError(f"Invalid sysfs directory: {path}")
return [SysFSAttribute(item) for item in path.iterdir() if file_is_sysfs_attr(item)]
[docs]
class SysfsDevice(MutableMapping):
def __init__(
self,
base_dir: Path,
driver_required=True,
find_driver: Callable | None = None,
):
self.path: Path = base_dir
self.driver_required = driver_required
try:
self.driver = self.get_driver()
except Exception:
self.driver = None
if driver_required:
raise DriverError(f"No driver found for device at {base_dir}")
try:
self.attributes: List[SysFSAttribute] = list_sysfs_attributes(self.path)
except FileNotFoundError as e:
raise SysFSError(f"SysFS entry for {base_dir} does not exist") from e
def __getitem__(self, key):
if (
attr := first_true(self.attributes, pred=lambda a: a.name == key)
) is not None:
return attr
raise KeyError(f"'{key}' is not a valid sysfs attribute in {self.path}")
def __setitem__(self, key, value):
self[key].write(value)
def __delitem__(self, key):
del self.attributes[key]
def __iter__(self):
return iter(self.attributes)
def __len__(self):
return len(self.attributes)
[docs]
def pre(self) -> None:
pass
[docs]
def post(self) -> None:
pass
[docs]
def write_attr(self, attr: str, data: str | bytes) -> None:
next(x for x in self.attributes if x.name == attr).write(data)
[docs]
def write_attr_bytes(self, attr: str, data: str) -> None:
# TODO: This
pass
[docs]
def read_attr_utf8(self, attr: str) -> str:
return next(x for x in self.attributes if x.name == attr).read_utf8()
[docs]
def read_float(self, attr: str) -> float:
"""
Read a sysfs attribute as a floating-point number
Parameters
----------
attr: str
The sysfs attribute to read
Returns
-------
float
The sysfs attribute as a floating-point number
Raises
------
SysFSError
If the sysfs attribute does not exist
ConversionError
If the sysfs attribute could not be converted to a floating-point number
"""
try:
value = self.read_attr_utf8(attr)
return float(value)
except StopIteration as e:
raise SysFSError(f"'{attr}' sysfs attribute does not exist") from e
except SysFSError:
raise
except ValueError as e:
raise ConversionError(
"Failed to convert sysfs value to floating-point value"
) from e
[docs]
def read_int(self, attr) -> int:
"""
Read a sysfs attribute stored as a string as an integer
Parameters
----------
attr: str
The sysfs attribute to read
Returns
-------
int
The sysfs attribute as an integer
Raises
------
SysFSError
If the sysfs attribute does not exist
ConversionError
If the sysfs attribute could not be converted to an integer
"""
try:
value = self.read_attr_utf8(attr).strip()
return int(value)
except StopIteration as e:
raise SysFSError(f"'{attr}' sysfs attribute does not exist") from e
except SysFSError:
raise
except ValueError as e:
raise ConversionError("Failed to convert sysfs value to int") from e
[docs]
def read_hex(self, attr) -> int:
"""
Read a sysfs attribute stored as a hexadecimal integer as integer
Parameters
----------
attr: str
The sysfs attribute to read
Returns
-------
int
The sysfs attribute as a hexadecimal integer
Raises
------
SysFSError
If the sysfs attribute does not exist
ConversionError
If the sysfs attribute could not be converted to a hexadecimal integer
"""
try:
value = self.read_attr_utf8(attr).strip()
return int(value, 16)
except StopIteration as e:
raise SysFSError(f"'{attr}' sysfs attribute does not exist") from e
except SysFSError:
raise
except ValueError as e:
raise ConversionError("Failed to convert sysfs value to hex int") from e
[docs]
def read_utf8(self, attr, strip=True) -> str:
"""
Read a sysfs attribute as a UTF-8 encoded string
Parameters
----------
attr: str
The sysfs attribute to read
strip: bool
Strip whitespace, defaults to true
Returns
-------
str
The sysfs attribute as a UTF-8 encoded string
Raises
------
SysFSError
If the sysfs attribute does not exist
"""
try:
value = self.read_attr_utf8(attr)
if strip:
value = value.strip()
return value
except StopIteration as e:
raise SysFSError(f"'{attr}' sysfs attribute does not exist") from e
except SysFSError:
raise
[docs]
def read_attr_bytes(self, attr: str) -> bytes:
return next(x for x in self.attributes if x.name == attr).read_bytes()
[docs]
def read_bytes(self, attr) -> bytes:
try:
value = self.read_attr_bytes(attr)
return value
except StopIteration as e:
raise SysFSError(f"'{attr}' sysfs attribute does not exist") from e
except SysFSError:
raise
[docs]
def extend_attributes(self, attributes: List[SysFSAttribute]):
self.attributes.extend(attributes)
[docs]
def get_driver(self) -> str | None:
path = self.path
if self.path.joinpath("device").exists():
path = self.path.joinpath("device")
elif not path.joinpath("driver").exists():
raise DriverError("Failed to retrieve driver info")
return path.joinpath("driver").readlink().name