Source code for pydistsim.network.generator

from collections import OrderedDict
from itertools import product
from typing import TYPE_CHECKING, Optional, TypeVar

from numpy import array, cos, log2, pi, sign, sin, sqrt
from numpy.random import rand

from pydistsim.logging import logger
from pydistsim.network.network import BidirectionalNetwork, DirectedNetwork
from pydistsim.network.node import Node
from pydistsim.network.rangenetwork import BidirectionalRangeNetwork, RangeNetwork

if TYPE_CHECKING:
    from pydistsim.network.environment import Environment2D
    from pydistsim.network.network import NetworkType
    from pydistsim.network.rangenetwork import RangeNetworkType

T = TypeVar("T", bound="NetworkType")


[docs] class NetworkGenerator: """ Class for generating networks with specified properties. Instance mode ------------- Usage example: >>> net_gen = NetworkGenerator(n_count=45, degree=4, directed=False, enforce_connected=False) >>> net = net_gen.generate_random_network() The generated network is returned as a :class:`pydistsim.network.rangenetwork.RangeNetwork` or :class:`pydistsim.network.rangenetwork.BidirectionalRangeNetwork` object. Class mode ---------- Here instancing the class will have no effect at all. The only parameters considered are the ones passed to the class methods. The class methods are: #. :meth:`generate_complete_network` #. :meth:`generate_ring_network` #. :meth:`generate_star_network` #. :meth:`generate_hypercube_network` #. :meth:`generate_mesh_network` Usage example: >>> net = NetworkGenerator.generate_complete_network(5) :param n_count: int, number of nodes, if None, 100 is used :param n_min: int, minimum number of nodes, if not set it is equal to n_count :param n_max: int, maximum number of nodes, if not set it is equal to n_count :param enforce_connected: bool, if True network must be fully connected :param degree: int, average number of neighbors per node :param comm_range: int, nodes communication range, if None, 100 is used and it is a signal that this value can be changed if needed to satisfy other wanted properties (connected and degree) :param method: str, sufix of the name of the method used to generate network :param degree_tolerance: float, tolerance for degree parameter :param directed: bool, if True generated network is directed :param kwargs: network and node __init__ kwargs i.e.: - environment: Environment, environment in which the network should be created, if None Environment2D is used - rangeType: RangeType - algorithms: tuple - commRange: int, overrides `comm_range` - sensors: tuple """ DIRECTED_NETWORK_T = RangeNetwork UNDIRECTED_NETWORK_T = BidirectionalRangeNetwork def __init__( self, n_count=None, n_min=None, n_max=None, enforce_connected=True, degree=None, comm_range=None, method="random_network", degree_tolerance=0.5, directed=False, **kwargs, ): self.n_count = n_count or 100 self.n_min = self.n_count if n_min is None else n_min self.n_max = self.n_count if n_max is None else n_max if self.n_count < self.n_min or self.n_count > self.n_max: raise NetworkGeneratorException("Number of nodes (n_count parameter) must be between n_min and n_max.") if degree and degree >= self.n_max: raise NetworkGeneratorException( "Degree %d must be smaller than maximum number of nodes %d." % (degree, self.n_max) ) # TODO: optimize recalculation of edges on bigger commRanges if degree: logger.warning("Generation could be slow for large degree parameter with bounded n_max.") self.enforce_connected = enforce_connected self.degree = degree self.degree_tolerance = degree_tolerance self.directed = directed self.comm_range = kwargs.pop("commRange", comm_range) # TODO: use subclass based generators instead of method based self.generate = self.__getattribute__("generate_" + method) self.kwargs = kwargs
[docs] def _create_modify_network(self, net: Optional["RangeNetworkType"] = None, step=1) -> Optional["RangeNetworkType"]: """ Helper method for creating a new network or modifying a given network. :param net: NetworkType object, optional The network to modify. If None, create a new network from scratch. :param step: int, optional If step > 0, the new network should be more dense. If step < 0, the new network should be less dense. :return: NetworkType object or None The modified network if successful, None otherwise. """ if net is None: net_class = self.DIRECTED_NETWORK_T if self.directed else self.UNDIRECTED_NETWORK_T net = net_class(**self.kwargs) for _n in range(self.n_count): node = Node(commRange=self.comm_range, **self.kwargs) net.add_node(node) else: if step > 0: if len(net) < self.n_max: node = Node(**self.kwargs) net.add_node(node) logger.trace("Added node, number of nodes: {}", len(net)) elif not self.comm_range: for node in net.nodes(): node.commRange += step logger.trace("Increased commRange to {}", node.commRange) else: return None else: min_node = net.nodes_sorted()[0] if len(net) > self.n_min and len(net) > 1: net.remove_node(min_node) logger.trace("Removed node, nodes left: {}", len(net)) elif not self.comm_range: for node in net: node.commRange += step logger.trace("Decreased commRange to {}", node.commRange) else: return None return net
[docs] def _are_conditions_satisfied(self, net: "RangeNetworkType"): """ Check if the conditions for the network are satisfied. :param net: The network to check the conditions for. :type net: Network :return: The condition value. :rtype: int """ cr = net.nodes_sorted()[0].commRange if self.enforce_connected and not net.is_connected(): logger.debug("Not connected") return round(0.2 * cr) elif self.degree: diff = self.degree - net.avg_degree() if abs(diff) > self.degree_tolerance: logger.debug("Degree not satisfied: {} with {} nodes", net.avg_degree(), len(net)) diff = sign(diff) * min( max(abs(diff), 3), 7 ) # If diff is too big, it will be set to 7, if it is too small, it will be set to 3 condition_returned = round((sign(diff) * (round(diff)) ** 2) * cr / 100) logger.debug("Degree condition returned: {}", condition_returned) return condition_returned return 0
[docs] def generate_random_network( self, net: Optional["RangeNetworkType"] = None, max_steps=1000 ) -> Optional["RangeNetworkType"]: """ Generates a random network with randomly positioned nodes. :param net: The network to modify. If not provided, a new network will be created. :type net: Optional[RangeNetworkType] :param max_steps: The maximum number of steps to take. :type max_steps: int :return: The generated network, optional. :rtype: Optional[RangeNetworkType] """ # TODO: try some more advanced algorithm for situation when # both connected network and too small degree are needed # that is agnostic to actual dimensions of the environment steps = [0] while True: net = self._create_modify_network(net, steps[-1]) if not net: break steps.append(self._are_conditions_satisfied(net)) if len(steps) > max_steps: break if steps[-1] == 0: return net logger.error( "Could not generate connected network with given " "parameters. Try removing and/or modifying some of " "them." )
[docs] def generate_neigborhood_network(self) -> "RangeNetworkType": """ Generates a network where all nodes are in one hop neighborhood of at least one node. Finds out the node in the middle, which is the node with the minimum maximum distance to all other nodes, and sets that distance as the new commRange. This generator ignores all other parameters except comm_range and n counts. :return: The generated network. :rtype: RangeNetworkType """ net = self._create_modify_network() max_distances = [] for node in net: distances = [sqrt(sum((net.pos[node] - net.pos[neighbor]) ** 2)) for neighbor in net] max_distances.append(max(distances)) min_distance = min(max_distances) for node in net: node.commRange = min_distance + 1 return net
[docs] def generate_homogeneous_network(self, randomness=0.11) -> Optional["RangeNetworkType"]: """ Generates a network where nodes are located approximately homogeneous. :param randomness: Controls random perturbation of the nodes. It is given as a part of the environment size. :type randomness: float :return: The generated random network. :rtype: RangeNetworkType """ net = self._create_modify_network() n = len(net) h, w = net.environment.image.shape assert net.environment.dim == 2 # works only for 2d environments size = w positions = generate_mesh_positions(net.environment, n) for i in range(n): pos = array([-1, -1]) # some non space point while not net.environment.is_space(pos): pos = positions[i, :n] + (rand(2) - 0.5) * (size * randomness) net.pos[net.nodes_sorted()[i]] = pos if isinstance(net, RangeNetwork): net.recalculate_edges() # TODO: this is not intuitive but generate_random network with net # given as argument will check if conditions are satisfied and act # accordingly, to change only commRange set limits to number of nodes self.n_count = self.n_max = self.n_min = n return self.generate_random_network(net)
@staticmethod def __get_ring_pos(n: int, env: "Environment2D") -> list[tuple[float, float]]: """ A helper method for generating positions of nodes on a circle. :param n: The number of nodes. :type n: int :param env: The environment object. :type env: Environment2D :return: The list of positions. :rtype: list[tuple[int, int]] """ env_shape = env.image.shape center = (env_shape[0] // 2, env_shape[1] // 2) radius = 0.82 * (min(env_shape) / 2) # 82% of the smallest dimension, leaves enough space for the node labels. nodes = [] for i in range(n): angle = 2 * i * pi / n x = center[0] + radius * cos(angle) y = center[1] + radius * sin(angle) nodes.append((x, y)) return nodes
[docs] @classmethod def generate_complete_network(cls, n: int, network_type: type[T] = None, directed_network: bool = None) -> T: """ Generate a complete network with n nodes. The nodes are placed on a circle. Examples: .. code-block:: python >>> net1 = NetworkGenerator.generate_complete_network(5) <BidirectionalNetwork object with 5 nodes and 10 edges> >>> net2 = NetworkGenerator.generate_complete_network(5, directed_network=True) <DirectedNetwork object with 5 nodes and 20 edges> >>> net3 = NetworkGenerator.generate_complete_network(5, network_type=BidirectionalNetwork) <BidirectionalNetwork object with 5 nodes and 10 edges> DO NOT instantiate the class, this is a class method. :param n: The number of nodes in the network. :type n: int :param network_type: The type of network to generate. :type network_type: type[NetworkType] :param directed_network: Only if True, the network is directed. :type directed_network: bool :return: The generated network. :rtype: NetworkType """ if directed_network is None: directed_network = False if network_type is None: network_type = DirectedNetwork if directed_network else BidirectionalNetwork net = network_type() node_pos_list = cls.__get_ring_pos(n, net.environment) nodes = [net.add_node(pos=node_pos_list) for node_pos_list in node_pos_list] for i, j in product(range(n), range(n)): if i != j: net.add_edge(nodes[i], nodes[j]) return net
[docs] @classmethod def generate_ring_network(cls, n: int, network_type: type[T] = None, directed_network: bool = None) -> T: """ Generate a ring network with n nodes. The nodes are placed on a circle. If network_type is a directed type, the links between the nodes are one-way only so the network is fully connected. Example: .. code-block:: python >>> net = NetworkGenerator.generate_ring_network(6) <BidirectionalNetwork object with 6 nodes and 6 edges> >>> net = NetworkGenerator.generate_ring_network(6, directed_network=False) <DirectedNetwork object with 6 nodes and 6 edges> DO NOT instantiate the class, this is a class method. :param n: The number of nodes in the network. :type n: int :param network_type: The type of network to generate. :type network_type: type[NetworkType] :param directed_network: Only if True, the network is directed. :type directed_network: bool :return: The generated network. :rtype: NetworkType """ if directed_network is None: directed_network = False if network_type is None: network_type = DirectedNetwork if directed_network else BidirectionalNetwork net = network_type() node_pos_list = cls.__get_ring_pos(n, net.environment) nodes = [net.add_node(pos=node_pos_list) for node_pos_list in node_pos_list] for i in range(n): if i != (i + 1) % n: net.add_edge(nodes[i], nodes[(i + 1) % n]) return net
[docs] @classmethod def generate_star_network(cls, n: int, network_type: type[T] = None, directed_network: bool = None) -> T: """ Generate a star network with n nodes. The nodes are placed on a circle with one node in the center. If network_type is a directed type, the links are only from the center node to the other nodes. Example: .. code-block:: python >>> net = NetworkGenerator.generate_star_network(5) DO NOT instantiate the class, this is a class method. :param n: The number of nodes in the network. :type n: int :param network_type: The type of network to generate. :type network_type: type[NetworkType] :param directed_network: Only if True, the network is directed. :type directed_network: bool :return: The generated network. :rtype: NetworkType """ if n <= 1: raise ValueError("Star network requires at least 2 nodes.") if directed_network is None: directed_network = False if network_type is None: network_type = DirectedNetwork if directed_network else BidirectionalNetwork net = network_type() node_pos_list = cls.__get_ring_pos(n - 1, net.environment) center = ( net.environment.image.shape[0] / 2, net.environment.image.shape[1] / 2, ) center_node = net.add_node(pos=center) for i in range(n - 1): node = net.add_node(pos=node_pos_list[i]) net.add_edge(center_node, node) return net
[docs] @staticmethod def generate_hypercube_network( n: int | None = None, dimension: int | None = None, use_binary_labels: bool = True, network_type: type[T] = None, directed_network: bool = None, ) -> T: """ Generate a hypercube network of the given dimension (or node count). The nodes are placed in a hypercube structure. Examples: .. code-block:: python >>> net = NetworkGenerator.generate_hypercube_network(dimension=3) <BidirectionalNetwork object with 8 nodes and 12 edges> >>> net2 = NetworkGenerator.generate_hypercube_network(dimension=3, directed_network=True) <DirectedNetwork object with 8 nodes and 24 edges> >>> net3 = NetworkGenerator.generate_complete_network(n=8) <BidirectionalNetwork object with 8 nodes and 12 edges> DO NOT instantiate the class, this is a class method. :param n: The number of nodes in the network. If None, the dimension parameter must be set. :type n: int :param dimension: The dimension of the hypercube. If None, the n parameter must be set. :type dimension: int :param network_type: The type of network to generate. :type network_type: type[NetworkType] :param directed_network: Only if True, the network is directed. :type directed_network: bool :return: The generated network. :rtype: NetworkType """ if n is None and dimension is None: raise ValueError("Either the number of nodes or the dimension of the hypercube must be set.") if n is not None and dimension is not None: raise ValueError("Only one of the number of nodes or the dimension of the hypercube can be set.") if n is not None: dimension = int(log2(n)) if 2**dimension != n: raise ValueError("The number of nodes must be a power of 2.") if directed_network is None: directed_network = False if network_type is None: network_type = DirectedNetwork if directed_network else BidirectionalNetwork LABEL_KEY = "HYPERCUBE_LABEL" def create_hypercube(dim): # Recursive function to create the hypercube # A hypercube of dimension n is the disjoint union of two hypercubes of dimension n-1, forming a perfect # matching between the nodes of the two hypercubes. node_pos = OrderedDict() edges = [] dist = 1 if dim == 0: node = Node() node.memory[LABEL_KEY] = "0" node_pos[node] = (0, 0) elif dim == 1: node1 = Node() node1.memory[LABEL_KEY] = "0" node2 = Node() node2.memory[LABEL_KEY] = "1" node_pos[node1] = (0, 0) node_pos[node2] = (dist, 0) edges.append((node1, node2)) else: part_1_nodes, part_1_edges = create_hypercube(dim - 1) part_2_nodes, part_2_edges = create_hypercube(dim - 1) edges += part_1_edges edges += part_2_edges part_1_width = max([pos[0] for pos in part_1_nodes.values()]) part_1_height = max([pos[1] for pos in part_1_nodes.values()]) if dim == 1: # Grow only in the x-axis x_offset = dist y_offset = 0 elif dim == 2: # Grow only in the y-axis x_offset = 0 y_offset = dist elif dim == 3: # Grow equally in both axes x_offset = dist / 2 y_offset = dist / 2 elif dim % 2 == 0: # Even dimensions grow in the x-axis x_offset = part_1_width * 1.1 y_offset = dist / 4 else: # Odd dimensions grow in the y-axis x_offset = dist / 4 y_offset = part_1_height * 1.1 for node, pos in part_1_nodes.items(): node_pos[node] = pos node.memory[LABEL_KEY] = "0" + node.memory[LABEL_KEY] for node, pos in part_2_nodes.items(): node_pos[node] = (pos[0] + x_offset, pos[1] + y_offset) node.memory[LABEL_KEY] = "1" + node.memory[LABEL_KEY] for node1, node2 in zip(part_1_nodes.keys(), part_2_nodes.keys()): edges.append((node1, node2)) return node_pos, edges # Create the nodes node_pos_list, edges = create_hypercube(dimension) # Create the network net = network_type() canvas_shape = net.environment.image.shape usable_shape = (canvas_shape[1] * 0.82, canvas_shape[0] * 0.82) # Normalize the positions max_x = max(max([pos[0] for pos in node_pos_list.values()]), 1) max_y = max(max([pos[1] for pos in node_pos_list.values()]), 1) for node, pos in node_pos_list.items(): # If the dimension is 0 or 1, the nodes are in the center x = pos[0] if dimension != 0 else 0.5 y = pos[1] if dimension not in (0, 1) else 0.5 final_pos = (x / max_x * usable_shape[0], y / max_y * usable_shape[1]) # Add the node net.add_node(node, pos=final_pos) if use_binary_labels: net.labels[node] = node.memory[LABEL_KEY] # Remove the label from the memory as it represents some knowledge of the network del node.memory[LABEL_KEY] # Add the edges to the network for node1, node2 in edges: net.add_edge(node1, node2) if net.is_directed(): net.add_edge(node2, node1) return net
[docs] @classmethod def generate_mesh_network( cls, n: int = None, a: int = None, b: int = None, torus: bool = False, network_type: type[T] = None, directed_network: bool = None, ) -> T: """ Generate a mesh (or torus) network with n nodes (or a x b nodes). If network_type is a directed type, the links are only north and east. Only with torus, this gives a fully connected network nonetheless. Example: .. code-block:: python >>> net = NetworkGenerator.generate_mesh_network(a=4, b=7) DO NOT instantiate the class, this is a class method. :param n: The number of nodes in the mesh/torus network. Optional. :type n: int :param a: Width of the mesh/torus. Optional. :type a: int :param b: Height of the mesh/torus. Optional. :type b: int :param torus: Whether or not to add "returning" edges. :type torus: bool :param network_type: The type of network to generate. :type network_type: type[NetworkType] :param directed_network: Only if True, the network is directed. :type directed_network: bool :return: The generated network. :rtype: NetworkType """ if directed_network is None: directed_network = False if network_type is None: network_type = DirectedNetwork if directed_network else BidirectionalNetwork if a is None or b is None: if n is None: raise ValueError("Mesh network generator needs (a, b) or n.") if int(sqrt(n)) != sqrt(n): raise ValueError("Value n given must be a perfect square.") a = b = int(sqrt(n)) node_pos_list = {(i, j): Node() for i in range(a) for j in range(b)} net = network_type() canvas_shape = net.environment.image.shape usable_shape = (canvas_shape[1] * 0.82, canvas_shape[0] * 0.82) bottom_margin = ( canvas_shape[1] * (1 - 0.83) / 4, canvas_shape[0] * (1 - 0.83) / 4, ) # Normalize the positions max_x = max(max([pos[0] for pos in node_pos_list.keys()]), 1) max_y = max(max([pos[1] for pos in node_pos_list.keys()]), 1) for (x, y), node in node_pos_list.items(): # If the dimension is 0 or 1, the nodes are in the center final_pos = ( x / max_x * usable_shape[0] + bottom_margin[0], y / max_y * usable_shape[1] + bottom_margin[1], ) net.add_node(node, pos=final_pos) for (x, y), node in node_pos_list.items(): north_and_east_neighbors = [(x, y + 1), (x + 1, y)] if not torus else [(x, (y + 1) % b), ((x + 1) % a, y)] for pos_neigh in north_and_east_neighbors: if pos_neigh in node_pos_list: net.add_edge(node, node_pos_list[pos_neigh]) return net
generate_grid_network = generate_mesh_network
[docs] def generate_mesh_positions(env, n): """ Generate mesh positions for the given environment and number of intersections. :param env: The environment object. :type env: Environment :param n: The desired number of intersections. :type n: int :return: An array of mesh positions. :rtype: numpy.ndarray """ h, w = env.image.shape # initial d d = sqrt(h * w / n) def get_mesh_pos(d, dx, dy, w, h): return [ (xi_yi[0] * d + dx, xi_yi[1] * d + dy) for xi_yi in product(list(range(int(round(w / d)))), list(range(int(round(h / d))))) ] n_mesh = 0 direction = [] while True: n_mesh = len([1 for pos in get_mesh_pos(d, 0.5 * d, 0.5 * d, w, h) if env.is_space(pos)]) direction.append(sign(n - n_mesh)) if n_mesh == n or (len(direction) >= 10 and abs(sum(direction[-3:])) < 3 and n_mesh > n): break d *= sqrt(n_mesh / float(n)) return array(tuple(pos for pos in get_mesh_pos(d, 0.5 * d, 0.5 * d, w, h) if env.is_space(pos)))
# TODO: n_mesh could be brought closer to n with modification of dx and dy # dx = 0.5*d # dy = 0.5*d
[docs] class NetworkGeneratorException(Exception): pass