"""
This module contains the classes for observers and the mixin class for objects that can have observers.
"""
from typing import TYPE_CHECKING
from pydistsim.logging import logger
if TYPE_CHECKING:
from pydistsim.algorithm import BaseAlgorithm
from pydistsim.message import Message
from pydistsim.network import Node
from pydistsim.simulation import Simulation
[docs]
class ObservableEvents(str):
"""
Enum-like class for the events that can be observed.
:class:`StrEnum` from the :mod:`enum` module is not used because it does not support inheritance.
"""
added = "added"
step_done = "step_done"
message_sent = "message_sent"
sim_state_changed = "sim_state_changed"
algorithm_started = "algorithm_started"
algorithm_finished = "algorithm_finished"
network_changed = "network_changed"
message_delivered = "message_delivered"
node_status_changed = "node_status_changed"
_all = (
added,
step_done,
message_sent,
sim_state_changed,
algorithm_started,
algorithm_finished,
network_changed,
message_delivered,
node_status_changed,
)
[docs]
class Observer:
"""
Base class for observers.
Subclasses must implement the `on_{event_name}` methods for the events they want to listen to.
If an observer is notified of an event it does not listen to, the notification is discarded
and a debug message is logged.
To define the events an observer listens to, set the `events` attribute to a list of event names.
The definitive list of allowed events is the union of the `events` attribute of every class in
the inheritance chain.
For more information, refer to the :meth:`__get_allowed_events__` method.
Example of concrete implementations are:
- :class:`MetricCollector` class in :mod:`pydistsim.metrics`
- :class:`QThreadObserver` class in :mod:`pydistsim.gui.simulationgui`
"""
events = [ObservableEvents.added]
@classmethod
def __get_allowed_events__(cls) -> set[ObservableEvents]:
def get_bases(cls):
for base_cls in cls.__bases__:
yield from get_bases(base_cls)
yield base_cls
allowed_events = set(cls.events)
for base_cls in get_bases(cls):
if issubclass(base_cls, Observer) or base_cls == Observer:
allowed_events.update(base_cls.events)
return allowed_events
def notify(self, event: ObservableEvents, *args, **kwargs):
allowed_events = self.__get_allowed_events__()
if event not in allowed_events:
logger.trace(
"Invalid event name '{}' for observer type {}. Valid events are: {}.",
event,
self.__class__.__name__,
", ".join(allowed_events),
)
return
getattr(self, f"on_{event.lower()}")(*args, **kwargs)
[docs]
def on_added(self, observable: "ObserverManagerMixin") -> None:
"""
Called when the observer is added to an observable object.
:param observable: The observable object to which the observer is added.
:type observable: ObserverManagerMixin
:return: None
"""
...
[docs]
class AlgorithmObserver(Observer):
"""
Observer for algorithm events.
"""
events = [
ObservableEvents.step_done,
ObservableEvents.algorithm_started,
]
def on_step_done(self, algorithm: "BaseAlgorithm") -> None: ...
def on_algorithm_started(self, algorithm: "BaseAlgorithm") -> None: ...
[docs]
class SimulationObserver(Observer):
"""
Observer for simulation events.
"""
events = [
ObservableEvents.sim_state_changed,
ObservableEvents.algorithm_finished,
ObservableEvents.network_changed,
]
def on_sim_state_changed(self, simulation: "Simulation") -> None: ...
def on_algorithm_finished(self, algorithm: "BaseAlgorithm") -> None: ...
def on_network_changed(self, simulation: "Simulation") -> None: ...
[docs]
class NetworkObserver(Observer):
"""
Observer for network events.
No such events are defined yet.
"""
events = []
[docs]
class NodeObserver(Observer):
"""
Observer for node events.
No such events are defined yet.
"""
events = [
ObservableEvents.node_status_changed,
ObservableEvents.message_delivered,
ObservableEvents.message_sent,
]
def on_node_status_changed(self, node: "Node", previous_status: str, new_status: str) -> None: ...
def on_message_delivered(self, message: "Message") -> None: ...
def on_message_sent(self, message: "Message") -> None: ...
[docs]
class NodeNetworkObserver(NodeObserver, NetworkObserver):
"""
Observer for node AND network events.
"""
[docs]
class ObserverManagerMixin:
"""
Mixin class for objects that can have observers.
Subclasses must call the `super().__init__()` method in their :meth:`__init__` method or
add the :attr:`observers` attribute.
"""
def __init__(self, *args, **kwargs):
self.observers: set[Observer] = set()
super().__init__(*args, **kwargs)
def add_observers(self, *observers: "Observer"):
for observer in observers:
self.observers.add(observer)
observer.notify(ObservableEvents.added, self)
def clear_observers(self):
self.observers = set()
def notify_observers(self, event: ObservableEvents, *args, **kwargs):
for observer in self.observers:
observer.notify(event, *args, **kwargs)