Source code for pydistsim.network.rangenetwork

from abc import ABC, abstractmethod
from collections.abc import Iterable
from typing import TYPE_CHECKING, Optional

from numpy import allclose, sign, sqrt
from numpy.random import random

from pydistsim.logging import logger
from pydistsim.network.environment import Environment
from pydistsim.network.network import BidirectionalNetwork, DirectedNetwork
from pydistsim.utils.helpers import with_typehint

if TYPE_CHECKING:
    from pydistsim.network.behavior import NetworkBehaviorModel
    from pydistsim.network.node import Node


[docs] class RangeType(ABC): """RangeType abstract base class. This class represents an abstract base class for different types of channels. Subclasses of RangeType should implement the `in_comm_range` method. :param environment: The environment in which the channel operates. :type environment: Environment """ def __init__(self, environment: "Environment"): self.environment = environment
[docs] @abstractmethod def in_comm_range(self, network: "RangeNetworkType", node1: "Node", node2: "Node"): """Check if two nodes are within communication range. This method should be implemented by subclasses to determine if two nodes are within communication range. :param network: The network in which the nodes are connected. :type network: RangeNetworkType :param node1: The first node. :type node1: Node :param node2: The second node. :type node2: Node :return: True if the nodes are within communication range, False otherwise. :rtype: bool """ ...
[docs] class UdgRangeType(RangeType): """Unit disc graph range type. This class represents the Unit Disc Graph (UDG) channel type. It determines if two nodes are within communication range based on their positions and communication range. :param environment: The environment in which the channel operates. :type environment: Environment """
[docs] def in_comm_range(self, network: "RangeNetworkType", node1: "Node", node2: "Node"): """Check if two nodes are within communication range. Two nodes are in communication range if they can see each other and are positioned so that their distance is smaller than the communication range. :param network: The network in which the nodes are connected. :type network: RangeNetworkType :param node1: The first node. :type node1: Node :param node2: The second node. :type node2: Node :return: True if the nodes are within communication range, False otherwise. :rtype: bool """ p1 = network.pos[node1] p2 = network.pos[node2] d = sqrt(sum(pow(p1 - p2, 2))) if d < node1.commRange and d < node2.commRange: if self.environment.are_visible(p1, p2): return True return False
[docs] class CompleteRangeType(RangeType): """Complete range type. This class represents the Complete channel type. It always returns True, indicating that any two nodes are within communication range. :param environment: The environment in which the channel operates. :type environment: Environment """
[docs] def in_comm_range(self, network: "RangeNetworkType", node1: "Node", node2: "Node"): """Check if two nodes are within communication range. This method always returns True, indicating that any two nodes are within communication range. :param network: The network in which the nodes are connected. :type network: RangeNetworkType :param node1: The first node. :type node1: Node :param node2: The second node. :type node2: Node :return: True if the nodes are within communication range, False otherwise. :rtype: bool """ return True
[docs] class SquareDiscRangeType(RangeType): """Square Disc channel type. This class represents the Square Disc channel type. It determines if two nodes are within communication range based on their positions, communication range, and a probability of connection. :param environment: The environment in which the channel operates. :type environment: Environment """
[docs] def in_comm_range(self, network: "RangeNetworkType", node1: "Node", node2: "Node"): """Check if two nodes are within communication range. Two nodes are in communication range if they can see each other, are positioned so that their distance is smaller than the communication range, and satisfy a probability of connection. :param network: The network in which the nodes are connected. :type network: RangeNetworkType :param node1: The first node. :type node1: Node :param node2: The second node. :type node2: Node :return: True if the nodes are within communication range, False otherwise. :rtype: bool """ p1 = network.pos[node1] p2 = network.pos[node2] d = sqrt(sum(pow(p1 - p2, 2))) if random() > d**2 / node1.commRange**2: if self.environment.are_visible(p1, p2): assert node1.commRange == node2.commRange return True return False
[docs] class RangeNetworkMixin(with_typehint(DirectedNetwork)): """ Mixin to define a type of network that decides which nodes are connected based on their communication range. :param environment: The environment in which the network operates. If not provided, a new Environment instance will be created. :type environment: Environment, optional :param rangeType: The type of channel to be used for communication. If not provided, a new RangeType instance will be created using the environment. :type rangeType: RangeType, optional :param graph: The graph representing the network topology. Defaults to None. :type graph: NetworkX graph, optional :param kwargs: Additional keyword arguments. """ def __init__( self, incoming_graph_data=None, environment: Optional["Environment"] = None, rangeType: RangeType | None = None, behavioral_properties: Optional["NetworkBehaviorModel"] = None, **kwargs, ): super().__init__(incoming_graph_data, environment, behavioral_properties, **kwargs) self.rangeType = rangeType or UdgRangeType(self._environment) self.rangeType.environment = self._environment @staticmethod def to_directed_class(): return RangeNetwork @staticmethod def to_undirected_class(): return BidirectionalRangeNetwork def __deepcopy__(self, memo, nodes=None, edges=None, init_args=None, cls=None): init_args = init_args or {} init_args["rangeType"] = self.rangeType return super().__deepcopy__(memo, nodes, edges, init_args, cls) def add_node(self, node=None, pos=None, ori=None, commRange=None): node = super().add_node(node, pos, ori, commRange) self.recalculate_edges([node]) return node def remove_node(self, node, skip_check=False): super().remove_node(node, skip_check) self.recalculate_edges()
[docs] def recalculate_edges(self, nodes: Iterable | None = None): """ Recalculate edges for given nodes or for all self.nodes(). :param nodes: A list of nodes to recalculate edges for. If not provided, edges will be recalculated for all nodes in the network. :type nodes: list, optional Edge between nodes n1 and n2 are added if both are RangeType.in_comm_range of each other. """ if not nodes: nodes = self.nodes() for n1 in nodes: for n2 in self.nodes(): if n1 != n2: for x, y in ((n1, n2), (n2, n1)): if self.rangeType.in_comm_range(self, x, y): super().add_edge(x, y) elif self.has_edge(x, y): self.remove_edge(x, y)
[docs] def add_edge(self, u_of_edge, v_of_edge, **attr): """ Add an edge to the network. :param u_of_edge: The source node of the edge. :param v_of_edge: The target node of the edge. :param attr: Additional attributes to be assigned to the edge. """ logger.warning("Edges are auto-calculated from rangeType and commRange") super().add_edge(u_of_edge, v_of_edge, **attr)
def _set_environment(self, environment: Environment): super()._set_environment(environment) self.rangeType.environment = environment for node in self.nodes_sorted(): self.remove_node(node, skip_check=True) self.add_node(node) logger.debug("All nodes are moved into new environment.") def validate_params(self, params: dict): super().validate_params(params) for param, value in params.items(): if param == "rangeType": assert self.rangeType.__class__ == value.__class__ elif param == "comm_range": for node in self: assert node.commRange == value
[docs] def modify_avg_degree(self, value): """ DEPRECATED AND UNTESTED Modifies (increases) average degree based on the given value by modifying nodes' commRange. :param value: The desired average degree value. :type value: float :raises AssertionError: If all nodes do not have the same commRange. :raises AssertionError: If the given value is not greater than the current average degree. This method increases the average degree of the network by modifying the communication range (`commRange`) of the nodes. It ensures that all nodes have the same communication range. The method uses a step-wise approach to gradually increase the average degree until it reaches the desired value. It adjusts the communication range of each node in the network by adding a step size calculated based on the difference between the desired average degree and the current average degree. The step size is determined by the `step_factor` parameter, which controls the rate of change in the communication range. If the step size overshoots or undershoots the desired average degree, the `step_factor` is halved to reduce the step size for the next iteration. Once the average degree reaches the desired value, the method logs the modified degree. Note: This method assumes that the network is initially connected and all nodes have the same communication range. Example usage: network.modify_avg_degree(5.0) """ # assert all nodes have the same commRange assert allclose([n.commRange for n in self], self.nodes_sorted()[0].commRange) # TODO: implement decreasing of degree, preserve connected network assert value + 1 > self.avg_degree() # only increment step_factor = 7.0 steps = [0] # TODO: while condition should call validate while not allclose(self.avg_degree(), value, atol=1): steps.append((value - self.avg_degree()) * step_factor) for node in self: node.commRange += steps[-1] # variable step_factor for step size for over/undershoot cases if len(steps) > 2 and sign(steps[-2]) != sign(steps[-1]): step_factor /= 2 logger.trace("Modified degree to {}", self.avg_degree())
[docs] class RangeNetwork(RangeNetworkMixin, DirectedNetwork): """ Type of network that decides which nodes are connected based on their communication range. Aims to represent a wireless network where nodes can only communicate with each other if they are within a certain range. Manual edge modification is not recommended. Edges are automatically calculated and any edge can be removed by moving the nodes out of communication range or by addition/removal of nodes. """
[docs] class BidirectionalRangeNetwork(RangeNetworkMixin, BidirectionalNetwork): """ Same as RangeNetwork but with bidirectional edges (undirected graph). """
RangeNetworkType = RangeNetwork | BidirectionalRangeNetwork "Type of network that decides which nodes are connected based on their communication range."