Source code for pydsol.model.source

""" """
"""
Created on: 22-7-2021 10:19

@author: IvS
"""
import numpy as np
import itertools

from pydsol.model.entities import Entity
from pydsol.model.basic_logger import get_module_logger

logger = get_module_logger(__name__)

__all__ = ["Source"]


[docs]class Source(object): """This class defines a basic source for a discrete event simulation model. A Source is the start-station of each entity, meaning the entities enter the system via the source. Entities are created at a source, given a specific interarrival time.""" id_iter = itertools.count(1) def __init__(self, simulator, interarrival_time="default", num_entities=1, entity_type=Entity, **kwargs): """ Parameters ---------- simulator: Simulator object simulator of the model. interarrival_time: time between the creation of entities. Default is np.random.exponential(0.25). num_entities: int number of entities to create at once. Default is 1. kwargs: kwargs are the keyword arguments that are used to expand the source class. *name: str user-specified source for the link """ self.simulator = simulator self.num_entities = num_entities self.entity_type = entity_type self.next = None # this should be the next process self.id = next(self.id_iter) self.name = "{0} {1}".format(self.__class__.__name__, str(self.id)) if "name" in kwargs: self.name = kwargs["name"] self.interarrival_time = interarrival_time if "distribution" in kwargs: self.distribution = kwargs["distribution"] self.kwargs = kwargs self.simulator.schedule_event_now(self, "create_entities")
[docs] def create_entities(self, **kwargs): """ Create entities via SimEvent, given the interarrival time and the number of entities. Parameters ---------- entity_type: class class where to make instances of, for example class Entity kwargs: kwargs are the keyword arguments that are used to invoke the method or expand the function. """ # To make it work with the set seed of the simulator if self.interarrival_time == "default": self.interarrival_time = np.random.exponential(0.25) # Create new entity for _ in range(self.num_entities): entity = self.entity_type(self.simulator, self.simulator.simulator_time, **kwargs) logger.info("Time {0:.2f}: {1} is created at {2}".format(self.simulator.simulator_time, entity.name, self.name)) self.exit_source(entity) if "distribution" in self.__dict__: try: interarrival_time = self.distribution(*self.interarrival_time) except TypeError: try: interarrival_time = self.distribution(self.interarrival_time) except TypeError: raise "Wrong types for transfer in time ({1}) and/or distribution ({0}) for scheduling an event".format( self.interarrival_time, self.distribution) else: interarrival_time = self.interarrival_time relative_delay = interarrival_time # Schedule event to create next entity according to the interarrival time self.simulator.schedule_event_rel(relative_delay, self, "create_entities")
[docs] def exit_source(self, entity, **kwargs): """Schedules the event to exit the source and enter the output node. Parameters ---------- entity: object the target on which a state change is scheduled. kwargs: kwargs are the keyword arguments that are used to expand the function. """ self.simulator.schedule_event_now(self, "enter_output_node", entity=entity)
[docs] def enter_output_node(self, entity, **kwargs): """Combine the entity with a Vehicle if an vehicle type is given. Combined or not, it schedules an event for exiting the output node. Parameters ---------- entity: object the target on which a state change is scheduled. kwargs: kwargs are the keyword arguments that are used to expand the function. *vehicle_type: Vehicle class subclass of Vehicle on which the entity should travel to the next destination. *vehicle_speed: int, optional speed of the vehicle. """ if "vehicle_type" in self.kwargs: if "vehicle_speed" in self.kwargs: vehicle = self.kwargs["vehicle_type"](self.simulator, self.kwargs["vehicle_speed"]) else: vehicle = self.kwargs["vehicle_type"](self.simulator) vehicle.entities_on_vehicle.append(entity) logger.info("Time {0:.2f}: {1} loaded on {2}".format(self.simulator.simulator_time, entity.name, vehicle.name)) entity = vehicle self.simulator.schedule_event_now(self, "exit_output_node", entity=entity)
[docs] def exit_output_node(self, entity, **kwargs): """Exit the resource by selecting a link on which the entity should travel to the next destination by weighted choice. Parameters ---------- entity: object the target on which a state change is scheduled. kwargs: kwargs are the keyword arguments that are used to expand the function. """ try: # Selection based on weights in links next_list = self.next if isinstance(self.next, list) else [self.next] weights = [link.selection_weight for link in next_list] link_by_weight = np.random.choice(np.array(next_list), p=weights / np.sum(weights)) link_by_weight.enter_input_node(entity) except AttributeError: try: if len(next_list) > 1: next_process = np.random.choice(np.array(next_list)) next_process.enter_input_node(entity) if len(next_list) == 1: self.next.enter_input_node(entity) except AttributeError: raise AttributeError("{0} has no next process assigned".format(self.name))