Custom observer tutorial

The goal of this tutorial is to setup a custom observer that will be able to monitor certain events of the simulation.

[2]:
%matplotlib inline

from pydistsim import NetworkGenerator, Simulation
from pydistsim.logging import set_log_level, LogLevels, enable_logger

set_log_level(LogLevels.INFO)
enable_logger()

Algorithm

To demonstrate this, we will re-use the Echo algorithm from the “Hello World” example. The algorithm is a simple flooding algorithm that sends a message to all neighbors and waits for the response from all of them. Once every neighbor responds, the algorithms terminates.

The observer will be able to monitor the following events:

  • ‘echo_response_received’: This event is triggered when a response is received from a neighbor.

  • ‘echo_terminated’: This event is triggered when the algorithm terminates.

Implementation of Echo algorithm is given below. Notice the self.notify_observers() method that is called whenever an event occurs.

[3]:
from enum import StrEnum
from pydistsim.algorithm.node_algorithm import NodeAlgorithm, StatusValues
from pydistsim.algorithm.node_wrapper import NodeAccess
from pydistsim.message import Message
from pydistsim.restrictions.communication import BidirectionalLinks
from pydistsim.restrictions.reliability import TotalReliability
from pydistsim.restrictions.topological import Connectivity, UniqueInitiator


class EchoEvents(StrEnum):
    echo_response_received = "echo_response_received"
    echo_terminated = "echo_terminated"


class Echo(NodeAlgorithm):
    default_params = {
        "echo_message": "Hello world!",
    }

    class Status(StatusValues):
        INITIATOR = "INITIATOR"
        AWAITING_ECHO = "AWAITING_ECHO"
        AWAITING_ECHO_RESPONSE = "AWAITING_ECHO_RESPONSE"
        DONE = "DONE"

    S_init = (Status.INITIATOR, Status.AWAITING_ECHO)
    S_term = (Status.DONE,)

    # Here we define under which restrictions the algorithm is designed to work
    algorithm_restrictions = (
        BidirectionalLinks,  # The algorithm requires bidirectional links, so that the response can be sent back
        TotalReliability,  # The algorithm requires that all messages are delivered without loss
        Connectivity,  # The algorithm requires that the network is connected
        UniqueInitiator,  # The algorithm requires that only one node is the initiator
    )

    def initializer(self):
        # Set all nodes to AWAITING_ECHO
        for node in self.network.nodes():
            node.status = self.Status.AWAITING_ECHO

        # Choose the initiator
        ini_node = self.network.nodes_sorted()[0]

        # Send the initial message to the initiator
        ini_node.push_to_inbox(Message(meta_header=NodeAlgorithm.INI, destination=ini_node))

        # Set the initiator status to INITIATOR and store the initial information
        ini_node.status = self.Status.INITIATOR
        ini_node.memory["message"] = self.echo_message

    @Status.INITIATOR
    def spontaneously(self, node: NodeAccess, message: Message):
        self.send(
            node,
            data=node.memory["message"],
            destination=list(node.neighbors()),  # send to all neighbors
            header="Echo",
        )
        node.memory["responseCount"] = 0
        node.status = self.Status.AWAITING_ECHO_RESPONSE

    @Status.AWAITING_ECHO_RESPONSE
    def receiving(self, node: NodeAccess, message: Message):
        if message.header == "Echo response":
            self.notify_observers(EchoEvents.echo_response_received, node=node, message=message)

            # Count the number of responses received, only if the response is the same as the original message
            node.memory["responseCount"] += 1 if message.data == node.memory["message"] else 0

            # If all responses have been received, set the node status to DONE
            if node.memory["responseCount"] == len(list(node.neighbors())):
                self.notify_observers(EchoEvents.echo_terminated, node=node)
                node.status = self.Status.DONE

    @Status.AWAITING_ECHO
    def receiving(self, node: NodeAccess, message: Message):
        if message.header == "Echo":
            node.memory["message"] = message.data

            self.send(
                node,
                data=message.data,
                destination=message.source,
                header="Echo response",
            )
        node.status = self.Status.DONE

    @Status.DONE
    def default(self, *args, **kwargs):
        "Do nothing, for all inputs."
        pass

Observer

The observer will be implemented as a class that inherits from both AlgorithmObserver and SimulationObserver classes.

[4]:
from pydistsim.observers import AlgorithmObserver, SimulationObserver
from pydistsim.network import Node


class EchoObserver(AlgorithmObserver, SimulationObserver):
    events = [
        EchoEvents.echo_response_received,
        EchoEvents.echo_terminated,
    ]

    def __init__(self):
        self.list_of_events = []

    def on_echo_response_received(self, node: NodeAccess, message: Message):
        actual_node: Node = node.unbox()
        actual_source: Node = message.source.unbox()
        print(f"Node {actual_node} received echo response from {actual_source}.")
        self.list_of_events.append("received")

    def on_echo_terminated(self, node: NodeAccess):
        actual_node: Node = node.unbox()
        print(f"Node {actual_node} terminated.")
        self.list_of_events.append("terminated")

Assigning algorithm and observer to simulation

[5]:
MESSAGE = "Hello distributed and observable world"

net = NetworkGenerator.generate_star_network(16)
observer = EchoObserver()
sim = Simulation(net)

sim.algorithms = ((Echo, {"echo_message": MESSAGE}),)
sim.add_observers(observer)
2024-10-22 23:39:48.901 | INFO     | pydistsim.simulation:__init__:59 - Simulation 0x7f58b45e3190 created successfully.
[6]:
net.show()
/mnt/d/Proyectos/pymote/docs/notebooks/../../pydistsim/network/network.py:560: UserWarning: FigureCanvasAgg is non-interactive, and thus cannot be shown
  fig.show()
[6]:
../_images/notebooks_custom_observer_8_1.png

Running

Finally, run simulation:

[7]:
sim.run()
Node <Node id=1> received echo response from <Node id=2>.
Node <Node id=1> received echo response from <Node id=3>.
Node <Node id=1> received echo response from <Node id=4>.
Node <Node id=1> received echo response from <Node id=5>.
Node <Node id=1> received echo response from <Node id=6>.
Node <Node id=1> received echo response from <Node id=7>.
Node <Node id=1> received echo response from <Node id=8>.
Node <Node id=1> received echo response from <Node id=9>.
Node <Node id=1> received echo response from <Node id=10>.
Node <Node id=1> received echo response from <Node id=11>.
Node <Node id=1> received echo response from <Node id=12>.
Node <Node id=1> received echo response from <Node id=13>.
Node <Node id=1> received echo response from <Node id=14>.
Node <Node id=1> received echo response from <Node id=15>.
Node <Node id=1> received echo response from <Node id=16>.
Node <Node id=1> terminated.
2024-10-22 23:39:52.911 | INFO     | pydistsim.simulation:_run_algorithm:148 - [Echo] Algorithm finished

As we can see, the observer has executed the ‘print’ message me defied in the different trigger methods.

Now, we check if the observer has kept track of the events.

[8]:
observer.list_of_events
[8]:
['received',
 'received',
 'received',
 'received',
 'received',
 'received',
 'received',
 'received',
 'received',
 'received',
 'received',
 'received',
 'received',
 'received',
 'received',
 'terminated']