"""
Simulation module for the PyDistSim package.
"""
import inspect
from copy import copy, deepcopy
from typing import TYPE_CHECKING, Optional
from pydistsim._exceptions import SimulationException
from pydistsim.algorithm import BaseAlgorithm
from pydistsim.logging import logger
from pydistsim.observers import (
NetworkObserver,
ObservableEvents,
ObserverManagerMixin,
SimulationObserver,
)
if TYPE_CHECKING:
from pydistsim.network import NetworkType
AlgorithmsParam = tuple[type["BaseAlgorithm"] | tuple[type["BaseAlgorithm"], dict]]
[docs]
class Simulation(ObserverManagerMixin):
"""
Controls single network algorithm and node algorithms simulation.
It is responsible for visualization and logging, also.
:param network: The network object representing the simulation network.
:type network: NetworkType
:param algorithms: The algorithms to be executed on the network.
:type algorithms: AlgorithmsParam, optional
:param check_restrictions: Whether to check restrictions during the simulation.
:type check_restrictions: bool, optional
:param kwargs: Additional keyword arguments.
"""
def __init__(
self,
network: "NetworkType",
algorithms: AlgorithmsParam | None = None,
check_restrictions: bool = True,
**kwargs,
):
super().__init__()
self._network = network
self._network.simulation = self
self._algorithms = ()
if algorithms is not None:
self.algorithms = algorithms
self.algorithmState = {"index": 0, "step": 1, "finished": False}
self.stepsLeft = 0
self.check_restrictions = check_restrictions
logger.info("Simulation {} created successfully.", hex(id(self)))
def __deepcopy__(self, memo):
if id(self) in memo:
return memo[id(self)]
# Shallow copy of the object
copy_s = type(self)(deepcopy(self._network, memo))
memo[id(self)] = copy_s
# Shallow copy of the immutable attributes
copy_s.algorithms = self._algorithms_param
copy_s.algorithmState = copy(self.algorithmState)
copy_s.stepsLeft = self.stepsLeft
copy_s.check_restrictions = self.check_restrictions
# Deep copy of the mutable attributes
copy_s.algorithmState = deepcopy(self.algorithmState, memo)
copy_s.stepsLeft = self.stepsLeft
copy_s.clear_observers()
return copy_s
[docs]
def run(self, steps=0):
"""
Run simulation from the current state.
:param steps: Number of steps to run the simulation.
If steps = 0, it runs until all algorithms are finished.
If steps > 0, the simulation is in stepping mode.
If steps > number of steps to finish the current algorithm, it finishes it.
:type steps: int
"""
self.stepsLeft = steps
for _ in range(len(self.algorithms) * len(self.network)):
algorithm: Optional["BaseAlgorithm"] = self.get_current_algorithm()
if not algorithm:
logger.info(
"Simulation has finished. There are no algorithms left to run. "
"To run it from the start use sim.reset()."
)
self.notify_observers(ObservableEvents.sim_state_changed, self)
break
algorithm.add_observers(*self.observers)
self._run_algorithm(algorithm)
self.notify_observers(ObservableEvents.sim_state_changed, self)
if self.stepsLeft <= 0 and steps != 0:
break
[docs]
def run_step(self):
"""
Run a single step of the simulation.
This is equivalent to calling sim.run(1).
"""
self.run(1)
[docs]
def _run_algorithm(self, algorithm: BaseAlgorithm):
"""
Run the given algorithm on the given network.
Update stepsLeft and sim.algorithmState['step'].
If stepsLeft hit 0 it may return unfinished.
:param algorithm: The algorithm to run on the network.
"""
for _ in range(1000 * len(self.network)):
logger.debug(
"[{}] Step {} started",
algorithm.name,
self.algorithmState["step"],
)
algorithm.step(self.check_restrictions, self.algorithmState["step"])
self.stepsLeft -= 1
self.algorithmState["step"] += 1
self.network.increment_node_clocks()
logger.debug(
"[{}] Step {} finished",
algorithm.name,
self.algorithmState["step"],
)
if algorithm.is_halted():
break # algorithm finished
if self.stepsLeft == 0:
return # stepped execution finished
self.notify_observers(ObservableEvents.algorithm_finished, algorithm)
logger.info("[{}] Algorithm finished", algorithm.name)
self.algorithmState["finished"] = True
[docs]
def reset(self):
"""
Reset the simulation.
"""
logger.debug("Resetting simulation.")
self.algorithmState = {"index": 0, "step": 1, "finished": False}
self._network.reset()
if self._algorithms:
for algorithm in self._algorithms:
algorithm.reset()
[docs]
def is_halted(self):
"""
Check if simulation has come to an end or deadlock,
i.e. there are no messages to pass and no alarms set.
A not-started algorithm is considered halted. If there are
no algorithms left to run, the simulation is also considered halted.
:return: True if the algorithm is halted, False otherwise.
:rtype: bool
"""
algorithm: Optional["BaseAlgorithm"] = self.get_current_algorithm()
return algorithm is None or algorithm.is_halted()
@property
def network(self):
"""
Get the network associated with the simulation.
"""
return self._network
@network.setter
def network(self, network: "NetworkType"):
"""
Set the network for the simulation.
:param network: The network object to set.
:type network: NetworkType
"""
self._network.simulation = None # remove reference to this simulation in the old network
self._network.clear_observers()
self._network = network
self._network.simulation = self
self.notify_observers(ObservableEvents.network_changed, self)
self._copy_observers_to_network()
def add_observers(self, *observers: "SimulationObserver"):
super().add_observers(*observers)
self._copy_observers_to_network()
def _copy_observers_to_network(self):
self.network.add_observers(*(observer for observer in self.observers if isinstance(observer, NetworkObserver)))
#### Algorithm relation methods ####
[docs]
def get_current_algorithm(self) -> BaseAlgorithm | None:
"""
Try to return the current algorithm based on the algorithmState.
:return: The current algorithm.
:rtype: BaseAlgorithm or None
:raises NetworkException: If there are no algorithms defined in the network.
"""
if len(self.algorithms) == 0:
logger.error("There is no algorithm defined in the network.")
raise SimulationException(SimulationException.ERRORS.ALGORITHM_NOT_FOUND)
if self.algorithmState["finished"]:
if len(self.algorithms) > self.algorithmState["index"] + 1:
self.algorithmState["index"] += 1
self.algorithmState["step"] = 1
self.algorithmState["finished"] = False
else:
return None
return self.algorithms[self.algorithmState["index"]]
@property
def algorithms(self):
"""
Set algorithms by passing tuple of Algorithm subclasses.
>>> sim.algorithms = (Algorithm1, Algorithm2,)
For params pass tuples in form (Algorithm, params) like this
>>> sim.algorithms = ((Algorithm1, {'param1': value,}), Algorithm2)
"""
return self._algorithms
@algorithms.setter
def algorithms(self, algorithms: AlgorithmsParam):
self.reset()
self._algorithms = ()
if not isinstance(algorithms, tuple):
raise SimulationException(SimulationException.ERRORS.ALGORITHM)
for algorithm in algorithms:
if inspect.isclass(algorithm) and issubclass(algorithm, BaseAlgorithm):
self._algorithms += (algorithm(self),)
elif (
isinstance(algorithm, tuple)
and len(algorithm) == 2
and issubclass(algorithm[0], BaseAlgorithm)
and isinstance(algorithm[1], dict)
):
self._algorithms += (algorithm[0](self, **algorithm[1]),)
else:
raise SimulationException(SimulationException.ERRORS.ALGORITHM)
# If everything went ok, set algorithms param for coping
self._algorithms_param = algorithms
[docs]
def get_dic(self):
"""
Return all simulation data in the form of a dictionary.
:return: A dictionary containing the simulation data.
:rtype: dict
"""
algorithms = {
"%d %s" % (ind, alg.name): ("active" if alg == self.algorithms[self.algorithmState["index"]] else "")
for ind, alg in enumerate(self.algorithms)
}
return {
"algorithms": algorithms, # A dictionary mapping algorithm names to their status (active or not).
"algorithmState": {
"name": ( # The name of the current algorithm.
self.get_current_algorithm().name if self.get_current_algorithm() else "No algorithm"
),
"step": self.algorithmState["step"], # The current step of the algorithm.
"finished": self.algorithmState["finished"], # Whether the algorithm has finished or not.
},
}