Source code for pydistsim.tests.test_algorithm

import unittest

from pydistsim import Node
from pydistsim._exceptions import SimulationException
from pydistsim.algorithm import (
    Actions,
    AlgorithmException,
    NetworkAlgorithm,
    NodeAlgorithm,
    StatusValues,
)
from pydistsim.message import Message
from pydistsim.network import NetworkException, NetworkGenerator
from pydistsim.simulation import Simulation
from pydistsim.utils.helpers import first
from pydistsim.utils.testing import PyDistSimTestCase


[docs] def set_algorithms(sim, algorithms): sim.algorithms = algorithms
[docs] class SomeNodeAlgorithm(NodeAlgorithm): required_params = ("rp1", "rp2", "rp3") default_params = {"dp1": "dp1_value", "dp2": "dp2_value", "dp3": "dp3_value"}
[docs] class SomeNetworkAlgorithm(NetworkAlgorithm): default_params = {"dp1": "dp1_value", "dp2": "dp2_value", "dp3": "dp3_value"}
[docs] class SomeAlgorithmWithInheritance(SomeNodeAlgorithm): required_params = ("rp4",) default_params = { "dp4": "dp4_value", }
[docs] class SomeAlgorithmWithInheritanceChild(SomeAlgorithmWithInheritance): required_params = ("rp5", "rp6") default_params = { "dp2": "overriden_dp2_value", }
[docs] def rp_multiple(): class SomeAlgorithmWhereRpIsRedefined(SomeAlgorithmWithInheritance): required_params = ("rp2",)
[docs] def dp_is_rp(): class SomeAlgorithmWhereDpIsInheritedRp(SomeAlgorithmWithInheritance): default_params = { "rp2": "dp2_value", }
[docs] def rp_is_dp(): class SomeAlgorithmWhereRpIsInheritedDp(SomeAlgorithmWithInheritance): required_params = ("dp2",)
[docs] class TestAlgorithmsSetter(unittest.TestCase):
[docs] def setUp(self): net_gen = NetworkGenerator(100) self.net = net_gen.generate_random_network() self.algorithms_ok = ( ( SomeNodeAlgorithm, { "rp1": 1, "rp2": 2, "rp3": 3, }, ), (SomeNetworkAlgorithm, {}), SomeNetworkAlgorithm, (SomeAlgorithmWithInheritance, {"rp1": 1, "rp2": 2, "rp3": 3, "rp4": 4}), ( SomeAlgorithmWithInheritanceChild, {"rp1": 1, "rp2": 2, "rp3": 3, "rp4": 4, "rp5": 5, "rp6": 6}, ), ) self.check = [ # wrong_format ( SimulationException, [ (SomeNodeAlgorithm, {"rp1": 1, "rp2": 2, "rp3": 3}), ], ), # wrong_base_class (SimulationException, ((Node, {}),)), # missing_req_params ( AlgorithmException, ( ( SomeNodeAlgorithm, { "rp1": 1, }, ), ), ), ( AlgorithmException, ( ( SomeAlgorithmWithInheritance, { "rp1": 1, }, ), ), ), ]
[docs] def test_setter(self): """Test different algorithm initialization formats and params.""" sim = Simulation(self.net) set_algorithms(sim, self.algorithms_ok) for exc, alg in self.check: self.assertRaises(exc, set_algorithms, sim, alg)
[docs] def test_params_inheritance(self): """Test default params inheritance algorithm classes.""" sim = Simulation(self.net) sim.algorithms = ( ( SomeAlgorithmWithInheritanceChild, {"rp1": 1, "rp2": 2, "rp3": 3, "rp4": 4, "rp5": 5, "rp6": 6}, ), ) self.assertTrue(sim.algorithms[0].dp1 == "dp1_value") self.assertTrue(sim.algorithms[0].dp2 == "overriden_dp2_value") self.assertTrue(sim.algorithms[0].dp3 == "dp3_value") self.assertRaises(AssertionError, rp_multiple) self.assertRaises(AssertionError, dp_is_rp) self.assertRaises(AssertionError, rp_is_dp)
[docs] def test_default_params(self): """Test default params.""" sim = Simulation(self.net) sim.algorithms = ( ( SomeNetworkAlgorithm, { "dp2": "overriden_dp2_value", }, ), ) self.assertTrue(sim.algorithms[0].dp1 == "dp1_value") self.assertTrue(sim.algorithms[0].dp2 == "overriden_dp2_value") self.assertTrue(sim.algorithms[0].dp3 == "dp3_value")
[docs] class TestStatusValues(unittest.TestCase):
[docs] def setUp(self): class Status(StatusValues): IDLE = "IDLE" DONE = "DONE" self.Status = Status
[docs] def test_state_action_names(self): """Test if state action names raise exception and correct ones are allowed.""" @self.Status.IDLE def spontaneously(): ... @self.Status.IDLE def receiving(): ... @self.Status.IDLE def alarm(): ... # Test if wrong name raises exception def wrong_name(): ... self.assertRaises(AssertionError, self.Status.IDLE, wrong_name) # Test if good name with capital letters raises exception def ALARM(): ... self.assertRaises(AssertionError, self.Status.IDLE, ALARM)
[docs] def test_implements_method(self): """Test Status.implements method.""" @self.Status.DONE def alarm(): ... # Using correct ActionEnum assert self.Status.DONE.implements(Actions.alarm) # Using wrong ActionEnum assert not self.Status.DONE.implements(Actions.spontaneously) # Using string assert self.Status.DONE.implements("alarm") # Using wrong string assert not self.Status.DONE.implements("spontaneously") # Using string with capital letters assert not self.Status.DONE.implements("ALARM")
[docs] class TimerAlgorithm(NodeAlgorithm):
[docs] class Status(StatusValues): IDLE = "IDLE" DONE = "DONE"
[docs] def initializer(self): for node in self.network.nodes(): node.status = self.Status.IDLE self.set_alarm(node, 3, Message(data=f"ALARM{node._internal_id}")) self.set_alarm(node, 6, Message(data=f"ALARM{node._internal_id}")) node.memory["LAST_ALARM"] = self.set_alarm(node, 9, Message(data=f"ALARM{node._internal_id}"))
@Status.IDLE def alarm(self, node, message): node.memory["alarm"] = message.data self.disable_alarm(node.memory["LAST_ALARM"]) self.disable_all_node_alarms(node) node.status = self.Status.DONE
[docs] class TimerDefaultMessage(TimerAlgorithm):
[docs] def initializer(self): for node in self.network.nodes(): node.status = self.Status.IDLE self.set_alarm(node, 3) self.set_alarm(node, 6) node.memory["LAST_ALARM"] = self.set_alarm(node, 9)
[docs] class TimerDefaultMessageDecreasedTime(TimerAlgorithm): @TimerAlgorithm.Status.IDLE def alarm(self, node, message): node.memory["alarm"] = message.data self.disable_alarm(node.memory["LAST_ALARM"]) self.disable_all_node_alarms(node) node.status = self.Status.DONE @TimerAlgorithm.Status.IDLE def spontaneously(self, node, message): self.update_alarm_time(node.memory["FIRST_ALARM"], -100) self.disable_alarm(node.memory["LAST_ALARM"])
[docs] def initializer(self): for node in self.network.nodes(): node.status = self.Status.IDLE node.memory["FIRST_ALARM"] = self.set_alarm(node, 4) node.memory["SECOND_ALARM"] = self.set_alarm(node, 6) node.memory["LAST_ALARM"] = self.set_alarm(node, 9) node.push_to_inbox(Message(meta_header=NodeAlgorithm.INI))
[docs] class TestAlarms(unittest.TestCase): def test_run_base_algorithm(self): def node_data_test(node: "Node"): return (node.status == TimerAlgorithm.Status.DONE and node.memory["alarm"] == f"ALARM{node._internal_id}",) self.aux_test(TimerAlgorithm, node_data_test) def test_run_base_algorithm_default_message(self): self.aux_test( TimerDefaultMessage, lambda node: node.status == TimerAlgorithm.Status.DONE and node.memory["alarm"] is None, ) def aux_test(self, algo_class, data_test): self.net = NetworkGenerator(10).generate_random_network() sim = Simulation(self.net) sim.algorithms = (algo_class,) algorithm: NodeAlgorithm = sim.get_current_algorithm() some_n = first(self.net.nodes()) assert sim.is_halted() # 1 step for initialization, 3 steps for alarms, 1 step for processing alarm messages sim.run(1) assert len(algorithm._alarms) == 3 * len(self.net.nodes()) assert len(some_n.inbox) == 0 sim.run(1) # alarm tic assert len(algorithm._alarms) == 3 * len(self.net.nodes()) assert len(some_n.inbox) == 0 sim.run(1) # alarm tic assert len(algorithm._alarms) == 3 * len(self.net.nodes()) assert len(some_n.inbox) == 0 sim.run(1) # alarm tic assert len(algorithm._alarms) == 2 * len(self.net.nodes()) assert all(len(node.inbox) == 1 for node in self.net.nodes()) sim.run(1) # 1 step for processing alarm messages assert all([data_test(node) for node in self.net.nodes()]) assert len(algorithm._alarms) == 0 assert sim.is_halted() def test_update_alarm(self): self.net = NetworkGenerator(10).generate_random_network() sim = Simulation(self.net) sim.algorithms = (TimerDefaultMessageDecreasedTime,) algorithm: NodeAlgorithm = sim.get_current_algorithm() some_n = first(self.net.nodes()) assert sim.is_halted() # 1 step for initialization, 3 steps for alarms, 1 step for processing alarm messages sim.run(1) assert len(algorithm._alarms) == 3 * len(self.net.nodes()) assert len(some_n.inbox) == 1 # INI message assert some_n.memory["FIRST_ALARM"].time_left == 4 sim.run(1) # alarm tic assert len(algorithm._alarms) == 3 * len(self.net.nodes()) assert len(some_n.inbox) == 1 # INI message assert some_n.memory["FIRST_ALARM"].time_left == 3 sim.run(1) # alarm tic -> processes INI and disables LAST_ALARM assert len(algorithm._alarms) == 2 * len(self.net.nodes()) assert len(some_n.inbox) == 0 assert some_n.memory["FIRST_ALARM"].time_left == -98 # -100 + 2 sim.run(1) # alarm tic -> FIRST_ALARM triggered assert len(algorithm._alarms) == len(self.net.nodes()) # Only SECOND_ALARM left assert all(len(node.inbox) == 1 for node in self.net.nodes()) # 1 alarm message (generated by FIRST_ALARM) sim.run(1) # 1 step for processing alarm messages, disables SECOND_ALARM assert len(algorithm._alarms) == 0 assert sim.is_halted()