Source code for epyt_flow.simulation.events.sensor_reading_event

  1"""
  2Module provides a base class for sensor reading events such as sensor faults.
  3"""
  4from abc import abstractmethod
  5import warnings
  6import numpy
  7
  8from .event import Event
  9from ..sensor_config import SensorConfig, SENSOR_TYPE_NODE_PRESSURE, SENSOR_TYPE_NODE_QUALITY, \
 10    SENSOR_TYPE_NODE_DEMAND, SENSOR_TYPE_LINK_FLOW, SENSOR_TYPE_LINK_QUALITY, \
 11    SENSOR_TYPE_VALVE_STATE, SENSOR_TYPE_PUMP_STATE, SENSOR_TYPE_TANK_VOLUME, \
 12    SENSOR_TYPE_NODE_BULK_SPECIES, SENSOR_TYPE_LINK_BULK_SPECIES, SENSOR_TYPE_SURFACE_SPECIES
 13
 14
[docs] 15class SensorReadingEvent(Event): 16 """ 17 Base class for a sensor reading event -- i.e. an event directly affecting sensor readings. 18 19 Parameters 20 ---------- 21 sensor_id : `str` 22 ID of the sensor that is affected by this event. 23 sensor_type : `int` 24 Type of the sensor that is specified in 'sensor_id'. 25 Must be one of the following pre-defined constants: 26 27 - SENSOR_TYPE_NODE_PRESSURE = 1 28 - SENSOR_TYPE_NODE_QUALITY = 2 29 - SENSOR_TYPE_NODE_DEMAND = 3 30 - SENSOR_TYPE_LINK_FLOW = 4 31 - SENSOR_TYPE_LINK_QUALITY = 5 32 - SENSOR_TYPE_VALVE_STATE = 6 33 - SENSOR_TYPE_PUMP_STATE = 7 34 - SENSOR_TYPE_TANK_VOLUME = 8 35 - SENSOR_TYPE_NODE_BULK_SPECIES = 9 36 - SENSOR_TYPE_NODE_LINK_SPECIES = 10 37 - SENSOR_TYPE_SURFACE_SPECIES = 11 38 """ 39 def __init__(self, sensor_id: str, sensor_type: int, **kwds): 40 if not isinstance(sensor_id, str): 41 raise TypeError("'sensor_id' must be an instance of 'str' but not of " + 42 f"'{type(sensor_id)}'") 43 if not isinstance(sensor_type, int): 44 raise TypeError("'sensor_type' mut be an instance of 'int' but not of " + 45 f"'{type(sensor_type)}'") 46 if not 1 <= sensor_type <= 10: 47 raise ValueError("Invalid value of 'sensor_type'") 48 49 self._sensor_id = sensor_id 50 self._sensor_type = sensor_type 51 52 super().__init__(**kwds) 53
[docs] 54 def validate(self, sensor_config: SensorConfig) -> None: 55 """ 56 Validates this sensor reading event -- i.e. checks whether the affected 57 sensor is part of the given sensor configuration. 58 59 Parameters 60 ---------- 61 sensor_config : :class:`~epyt_flow.simulation.sensor_config.SensorConfig` 62 Sensor configuration. 63 """ 64 if not isinstance(sensor_config, SensorConfig): 65 raise TypeError("'sensor_config' must be an instance of " + 66 "'epyt_flow.simulation.SensorConfig' but not of " + 67 f"'{type(sensor_config)}'") 68 69 def __show_warning() -> None: 70 warnings.warn("Event does not have any effect because there is " + 71 f"no sensor at '{self._sensor_id}'") 72 73 if self._sensor_type == SENSOR_TYPE_NODE_PRESSURE: 74 if self._sensor_id not in sensor_config.pressure_sensors: 75 __show_warning() 76 elif self._sensor_type == SENSOR_TYPE_NODE_QUALITY: 77 if self._sensor_id not in sensor_config.quality_node_sensors: 78 __show_warning() 79 elif self._sensor_type == SENSOR_TYPE_NODE_DEMAND: 80 if self._sensor_id not in sensor_config.demand_sensors: 81 __show_warning() 82 elif self._sensor_type == SENSOR_TYPE_LINK_FLOW: 83 if self._sensor_id not in sensor_config.flow_sensors: 84 __show_warning() 85 elif self._sensor_type == SENSOR_TYPE_LINK_QUALITY: 86 if self._sensor_id not in sensor_config.quality_link_sensors: 87 __show_warning() 88 elif self._sensor_type == SENSOR_TYPE_VALVE_STATE: 89 if self._sensor_id not in sensor_config.valve_state_sensors: 90 __show_warning() 91 elif self._sensor_type == SENSOR_TYPE_PUMP_STATE: 92 if self._sensor_id not in sensor_config.pump_state_sensors: 93 __show_warning() 94 elif self._sensor_type == SENSOR_TYPE_TANK_VOLUME: 95 if self._sensor_id not in sensor_config.tank_volume_sensors: 96 __show_warning() 97 elif self._sensor_type == SENSOR_TYPE_NODE_BULK_SPECIES: 98 sensor_present = False 99 for _, sensors_id in sensor_config.bulk_species_node_sensors.items(): 100 if self._sensor_id in sensors_id: 101 sensor_present = True 102 break 103 if sensor_present is False: 104 __show_warning() 105 elif self._sensor_type == SENSOR_TYPE_LINK_BULK_SPECIES: 106 sensor_present = False 107 for _, sensors_id in sensor_config.bulk_species_link_sensors.items(): 108 if self._sensor_id in sensors_id: 109 sensor_present = True 110 break 111 if sensor_present is False: 112 __show_warning() 113 elif self._sensor_type == SENSOR_TYPE_SURFACE_SPECIES: 114 sensor_present = False 115 for _, sensors_id in sensor_config.surface_species_sensors.items(): 116 if self._sensor_id in sensors_id: 117 sensor_present = True 118 break 119 if sensor_present is False: 120 __show_warning()
121 122 @property 123 def sensor_id(self) -> str: 124 """ 125 Gets the ID of the node or link that is affected by this event. 126 127 Returns 128 ------- 129 `str` 130 Node or link ID. 131 """ 132 return self._sensor_id 133 134 @property 135 def sensor_type(self) -> int: 136 """ 137 Gets the sensor type code. 138 139 Returns 140 ------- 141 `int` 142 Sensor type code. 143 """ 144 return self._sensor_type 145
[docs] 146 def get_attributes(self) -> dict: 147 return super().get_attributes() | {"sensor_id": self._sensor_id, 148 "sensor_type": self._sensor_type}
149 150 def __eq__(self, other) -> bool: 151 if not isinstance(other, SensorReadingEvent): 152 raise TypeError("Can not compare 'SensorReadingEvent' instance " + 153 f"with '{type(other)}' instance") 154 155 return super().__eq__(other) and self._sensor_id == other.sensor_id \ 156 and self._sensor_type == other.sensor_type 157 158 def __str__(self) -> str: 159 return f"{super().__str__()} sensor_id: {self._sensor_id} " +\ 160 f"sensor_type: {self._sensor_type}" 161 162 def __call__(self, sensor_readings: numpy.ndarray, 163 sensor_readings_time: numpy.ndarray) -> numpy.ndarray: 164 return self.apply(sensor_readings, sensor_readings_time) 165
[docs] 166 @abstractmethod 167 def apply(self, sensor_readings: numpy.ndarray, 168 sensor_readings_time: numpy.ndarray) -> numpy.ndarray: 169 """ 170 Applies the sensor reading event to sensor reading values -- i.e. 171 modify the sensor readings. 172 173 Parameters 174 ---------- 175 sensor_readings : `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ 176 Original sensor readings. 177 sensor_readings_time : `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ 178 Time (seconds since simulation start) for each sensor reading row in 'sensor_readings'. 179 180 Returns 181 ------- 182 `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ 183 Modified sensor readings. 184 """ 185 raise NotImplementedError()