Source code for pydistsim.demo_algorithms.santoro2007.yoyo

from pydistsim.algorithm import NodeAlgorithm, StatusValues
from pydistsim.message import Message
from pydistsim.restrictions.communication import BidirectionalLinks
from pydistsim.restrictions.knowledge import InitialDistinctValues
from pydistsim.restrictions.reliability import TotalReliability
from pydistsim.restrictions.topological import Connectivity


[docs] class YoYo(NodeAlgorithm): default_params = { "neighborsKey": "Neighbors", "inNeighborsKey": "InNeighbors", "outNeighborsKey": "OutNeighbors", }
[docs] class Status(StatusValues): INITIATOR = "INITIATOR" IDLE = "IDLE" SOURCE = "SOURCE" INTERMEDIATE = "INTERMEDIATE" SINK = "SINK" PRUNED = "PRUNED" LEADER = "LEADER"
S_init = (Status.INITIATOR, Status.IDLE) S_term = (Status.LEADER, Status.PRUNED) algorithm_restrictions = ( BidirectionalLinks, TotalReliability, Connectivity, InitialDistinctValues, ) # Store assigned id (assigned in SetupYoYo) ID_KEY = InitialDistinctValues.KEY # Store received ids, I'll use a dict {id_value: [source_nodes]} RECEIVED_IDS_KEY = "received_ids" # Store received ids that were received while waiting for responses, # I'll use a dict {id_value: [source_nodes]} RECEIVED_IDS_WHILE_WAITING_RESPONSE_KEY = "received_ids_while_waiting" # Store received responses, # I'll use a dict {response_value: [source_nodes]} RECEIVED_RESPONSES_KEY = "received_responses" # Store nodes that requested pruning, list [source_nodes] REQUESTED_PRUNING_KEY = "requested_pruning" # Store number of sent ids SENT_IDS_KEY = "sent_ids" # This is sent when a PRUNE is requested PRUNE_REQUEST = "prune"
[docs] def initializer(self): InitialDistinctValues.apply(self.network) for node in self.network.nodes(): node.memory[self.inNeighborsKey] = [] node.memory[self.outNeighborsKey] = [] node.status = self.Status.INITIATOR node.memory[self.RECEIVED_IDS_KEY] = {} node.memory[self.RECEIVED_RESPONSES_KEY] = {} node.memory[self.REQUESTED_PRUNING_KEY] = [] node.memory[self.SENT_IDS_KEY] = 0 node.memory[self.RECEIVED_IDS_WHILE_WAITING_RESPONSE_KEY] = {} node.push_to_inbox(Message(meta_header=NodeAlgorithm.INI, destination=node))
def invert_edges(self, node, nodes_to_process, invert_from): for node_to_process in nodes_to_process: if invert_from == "outNeighbors": node.memory[self.outNeighborsKey].remove(node_to_process) node.memory[self.inNeighborsKey].append(node_to_process) elif invert_from == "inNeighbors": node.memory[self.inNeighborsKey].remove(node_to_process) node.memory[self.outNeighborsKey].append(node_to_process) def prune_nodes(self, node, nodes_to_process, prune_from): for node_to_process in nodes_to_process: if prune_from == "outNeighbors": node.memory[self.outNeighborsKey].remove(node_to_process) elif prune_from == "inNeighbors": node.memory[self.inNeighborsKey].remove(node_to_process) def send_responses(self, node, no_received=False): # Find min id min_id = min(node.memory[self.RECEIVED_IDS_KEY]) no_nodes = [] prune_nodes = [] received_ids = node.memory[self.RECEIVED_IDS_KEY] for received_id in received_ids: if no_received: # Send NO to all node that sent an id self.send( node, destination=received_ids[received_id], header="response", data=(False,), ) no_nodes.extend(received_ids[received_id]) elif received_id == min_id: # Send YES responses to all inNeighbors that send min_id if len(received_ids) == 1 and len(node.memory[self.outNeighborsKey]) == 0: # If node received only min_id and has no outNeighbors # remaining after inverting and pruning # it will become a LEAF SINK, # so send a PRUNE request with the YES response as well self.send( node, destination=received_ids[received_id][0], header="response", data=(True, self.PRUNE_REQUEST), ) prune_nodes.append(received_ids[received_id][0]) else: self.send( node, destination=received_ids[received_id][0], header="response", data=(True,), ) # Send PRUNE request to extra nodes that sent min_id # and add them to prune_nodes to be pruned self.send( node, destination=received_ids[received_id][1:], header="response", data=(True, self.PRUNE_REQUEST), ) prune_nodes.extend(received_ids[received_id][1:]) else: # Send NO responses to all inNeighbors that didn't # send min_id and add them to no_nodes to be inverted self.send( node, destination=received_ids[received_id], header="response", data=(False,), ) no_nodes.extend(received_ids[received_id]) return no_nodes, prune_nodes def change_status(self, node): if node.status == self.Status.SOURCE: if len(node.memory[self.inNeighborsKey]) == 0: if len(node.memory[self.outNeighborsKey]) == 0: node.status = self.Status.LEADER else: if len(node.memory[self.outNeighborsKey]) > 0: node.status = self.Status.INTERMEDIATE else: node.status = self.Status.SINK elif node.status == self.Status.INTERMEDIATE: if len(node.memory[self.outNeighborsKey]) == 0: if len(node.memory[self.inNeighborsKey]) == 0: node.status = self.Status.PRUNED elif node.status == self.Status.SINK: if len(node.memory[self.inNeighborsKey]) == 0: node.status = self.Status.PRUNED else: node.status = self.Status.INTERMEDIATE elif node.status == self.Status.IDLE: if node.memory[self.inNeighborsKey]: node.status = self.Status.SINK if node.memory[self.outNeighborsKey]: node.status = self.Status.INTERMEDIATE else: node.status = self.Status.SOURCE self.do(node) def receive_id(self, node, message): if node.memory[self.SENT_IDS_KEY]: ids = node.memory[self.RECEIVED_IDS_WHILE_WAITING_RESPONSE_KEY] if message.data in ids: ids[message.data].append(message.source) else: ids[message.data] = [message.source] else: ids = node.memory[self.RECEIVED_IDS_KEY] if message.data in ids: ids[message.data].append(message.source) else: ids[message.data] = [message.source] def receive_response(self, node, message): responses = node.memory[self.RECEIVED_RESPONSES_KEY] response = message.data[0] if response in responses: responses[response].append(message.source) else: responses[response] = [message.source] if self.PRUNE_REQUEST in message.data: node.memory[self.REQUESTED_PRUNING_KEY].append(message.source) def do_source(self, node): if node.memory[self.SENT_IDS_KEY] == 0: node.memory[self.RECEIVED_RESPONSES_KEY] = {} node.memory[self.REQUESTED_PRUNING_KEY] = [] self.send( node, destination=node.memory[self.outNeighborsKey], header="id", data=node.memory[self.ID_KEY], ) node.memory[self.SENT_IDS_KEY] = len(node.memory[self.outNeighborsKey]) else: responses = node.memory[self.RECEIVED_RESPONSES_KEY] # If responses received for all sent ids handle them num_of_responses = sum([len(sources) for sources in list(responses.values())]) if num_of_responses >= node.memory[self.SENT_IDS_KEY]: if False in responses: nodes_to_invert = responses[False] else: nodes_to_invert = [] nodes_to_prune = node.memory[self.REQUESTED_PRUNING_KEY] # Invert edges self.invert_edges(node, nodes_to_invert, "outNeighbors") # Prune nodes self.prune_nodes(node, nodes_to_prune, "outNeighbors") node.memory[self.RECEIVED_RESPONSES_KEY] = {} node.memory[self.REQUESTED_PRUNING_KEY] = [] node.memory[self.SENT_IDS_KEY] = 0 node.memory[self.RECEIVED_IDS_KEY] = node.memory[self.RECEIVED_IDS_WHILE_WAITING_RESPONSE_KEY] node.memory[self.RECEIVED_IDS_WHILE_WAITING_RESPONSE_KEY] = {} # End iteration and change status if needed self.change_status(node) def do_intermediate(self, node): if node.memory[self.SENT_IDS_KEY] == 0: ids = node.memory[self.RECEIVED_IDS_KEY] # If ids received from all inNeighbors handle them num_of_ids = sum([len(sources) for sources in list(ids.values())]) if num_of_ids >= len(node.memory[self.inNeighborsKey]): node.memory[self.RECEIVED_RESPONSES_KEY] = {} node.memory[self.REQUESTED_PRUNING_KEY] = [] # Find min id min_id = min(ids) # Forward min id to outNeighbors self.send( node, destination=node.memory[self.outNeighborsKey], header="id", data=min_id, ) node.memory[self.SENT_IDS_KEY] = len(node.memory[self.outNeighborsKey]) else: responses = node.memory[self.RECEIVED_RESPONSES_KEY] # If responses received for all sent ids handle them num_of_responses = sum([len(sources) for sources in list(responses.values())]) if num_of_responses >= node.memory[self.SENT_IDS_KEY]: if False in responses: nodes_to_invert = responses[False] no_received = True else: nodes_to_invert = [] no_received = False nodes_to_prune = node.memory[self.REQUESTED_PRUNING_KEY] # Invert edges self.invert_edges(node, nodes_to_invert, "outNeighbors") # Prune nodes self.prune_nodes(node, nodes_to_prune, "outNeighbors") # Add edges to invert from NO responses sent # Add nodes to prune from PRUNE requests sent no_response_nodes, prune_nodes = self.send_responses(node, no_received=no_received) # Invert edges self.invert_edges(node, no_response_nodes, "inNeighbors") # Prune nodes self.prune_nodes(node, prune_nodes, "inNeighbors") node.memory[self.RECEIVED_RESPONSES_KEY] = {} node.memory[self.REQUESTED_PRUNING_KEY] = [] node.memory[self.RECEIVED_IDS_KEY] = node.memory[self.RECEIVED_IDS_WHILE_WAITING_RESPONSE_KEY] node.memory[self.RECEIVED_IDS_WHILE_WAITING_RESPONSE_KEY] = {} node.memory[self.SENT_IDS_KEY] = 0 # End iteration and change status if needed self.change_status(node) def do_sink(self, node): ids = node.memory[self.RECEIVED_IDS_KEY] # If ids received from all inNeighbors handle them num_of_ids = sum([len(sources) for sources in list(ids.values())]) if num_of_ids >= len(node.memory[self.inNeighborsKey]): no_response_nodes, prune_nodes = self.send_responses(node) # Invert edges self.invert_edges(node, no_response_nodes, "inNeighbors") # Prune nodes self.prune_nodes(node, prune_nodes, "inNeighbors") node.memory[self.RECEIVED_IDS_KEY] = node.memory[self.RECEIVED_IDS_WHILE_WAITING_RESPONSE_KEY] node.memory[self.RECEIVED_IDS_WHILE_WAITING_RESPONSE_KEY] = {} node.memory[self.SENT_IDS_KEY] = 0 # End iteration and change status if needed self.change_status(node) def do(self, node): if node.status == self.Status.SOURCE: self.do_source(node) elif node.status == self.Status.INTERMEDIATE: self.do_intermediate(node) elif node.status == self.Status.SINK: self.do_sink(node) @Status.INITIATOR def spontaneously(self, node, message): # Special case. Only one node in graph # If node has no neighbors set its status to LEADER if not node.neighbors(): node.status = self.Status.LEADER return self.send( node, header="init_id", data=node.memory[self.ID_KEY], destination=node.neighbors(), ) node.status = self.Status.IDLE @Status.IDLE def receiving(self, node, message): if message.header == "init_id": if message.data < node.memory[self.ID_KEY]: node.memory[self.inNeighborsKey].append(message.source) else: node.memory[self.outNeighborsKey].append(message.source) num_of_in_neighbors = len(node.memory[self.inNeighborsKey]) num_of_out_neighbors = len(node.memory[self.outNeighborsKey]) if num_of_in_neighbors + num_of_out_neighbors >= len( node.neighbors() ): # if all neighbors have sent their ids self.change_status(node) elif message.header == "id": self.receive_id(node, message) @Status.SOURCE def receiving(self, node, message): if message.header == "response": self.receive_response(node, message) elif message.header == "id": self.receive_id(node, message) self.do_source(node) @Status.INTERMEDIATE def receiving(self, node, message): if message.header == "id": self.receive_id(node, message) elif message.header == "response": self.receive_response(node, message) self.do_intermediate(node) @Status.SINK def receiving(self, node, message): if message.header == "id": self.receive_id(node, message) elif message.header == "response": self.receive_response(node, message) self.do_sink(node) @Status.PRUNED def default(self, node, message): pass @Status.LEADER def default(self, node, message): pass