1"""
2Module provides a base class for sensor reading events such as sensor faults.
3"""
4from abc import abstractmethod
5import warnings
6import numpy
7
8from .event import Event
9from ..sensor_config import SensorConfig, SENSOR_TYPE_NODE_PRESSURE, SENSOR_TYPE_NODE_QUALITY, \
10 SENSOR_TYPE_NODE_DEMAND, SENSOR_TYPE_LINK_FLOW, SENSOR_TYPE_LINK_QUALITY, \
11 SENSOR_TYPE_VALVE_STATE, SENSOR_TYPE_PUMP_STATE, SENSOR_TYPE_TANK_VOLUME, \
12 SENSOR_TYPE_NODE_BULK_SPECIES, SENSOR_TYPE_LINK_BULK_SPECIES, SENSOR_TYPE_SURFACE_SPECIES
13
14
[docs]
15class SensorReadingEvent(Event):
16 """
17 Base class for a sensor reading event -- i.e. an event directly affecting sensor readings.
18
19 Parameters
20 ----------
21 sensor_id : `str`
22 ID of the sensor that is affected by this event.
23 sensor_type : `int`
24 Type of the sensor that is specified in 'sensor_id'.
25 Must be one of the following pre-defined constants:
26
27 - SENSOR_TYPE_NODE_PRESSURE = 1
28 - SENSOR_TYPE_NODE_QUALITY = 2
29 - SENSOR_TYPE_NODE_DEMAND = 3
30 - SENSOR_TYPE_LINK_FLOW = 4
31 - SENSOR_TYPE_LINK_QUALITY = 5
32 - SENSOR_TYPE_VALVE_STATE = 6
33 - SENSOR_TYPE_PUMP_STATE = 7
34 - SENSOR_TYPE_TANK_VOLUME = 8
35 - SENSOR_TYPE_NODE_BULK_SPECIES = 9
36 - SENSOR_TYPE_NODE_LINK_SPECIES = 10
37 - SENSOR_TYPE_SURFACE_SPECIES = 11
38 """
39 def __init__(self, sensor_id: str, sensor_type: int, **kwds):
40 if not isinstance(sensor_id, str):
41 raise TypeError("'sensor_id' must be an instance of 'str' but not of " +
42 f"'{type(sensor_id)}'")
43 if not isinstance(sensor_type, int):
44 raise TypeError("'sensor_type' mut be an instance of 'int' but not of " +
45 f"'{type(sensor_type)}'")
46 if not 1 <= sensor_type <= 10:
47 raise ValueError("Invalid value of 'sensor_type'")
48
49 self._sensor_id = sensor_id
50 self._sensor_type = sensor_type
51
52 super().__init__(**kwds)
53
[docs]
54 def validate(self, sensor_config: SensorConfig) -> None:
55 """
56 Validates this sensor reading event -- i.e. checks whether the affected
57 sensor is part of the given sensor configuration.
58
59 Parameters
60 ----------
61 sensor_config : :class:`~epyt_flow.simulation.sensor_config.SensorConfig`
62 Sensor configuration.
63 """
64 if not isinstance(sensor_config, SensorConfig):
65 raise TypeError("'sensor_config' must be an instance of " +
66 "'epyt_flow.simulation.SensorConfig' but not of " +
67 f"'{type(sensor_config)}'")
68
69 def __show_warning() -> None:
70 warnings.warn("Event does not have any effect because there is " +
71 f"no sensor at '{self._sensor_id}'")
72
73 if self._sensor_type == SENSOR_TYPE_NODE_PRESSURE:
74 if self._sensor_id not in sensor_config.pressure_sensors:
75 __show_warning()
76 elif self._sensor_type == SENSOR_TYPE_NODE_QUALITY:
77 if self._sensor_id not in sensor_config.quality_node_sensors:
78 __show_warning()
79 elif self._sensor_type == SENSOR_TYPE_NODE_DEMAND:
80 if self._sensor_id not in sensor_config.demand_sensors:
81 __show_warning()
82 elif self._sensor_type == SENSOR_TYPE_LINK_FLOW:
83 if self._sensor_id not in sensor_config.flow_sensors:
84 __show_warning()
85 elif self._sensor_type == SENSOR_TYPE_LINK_QUALITY:
86 if self._sensor_id not in sensor_config.quality_link_sensors:
87 __show_warning()
88 elif self._sensor_type == SENSOR_TYPE_VALVE_STATE:
89 if self._sensor_id not in sensor_config.valve_state_sensors:
90 __show_warning()
91 elif self._sensor_type == SENSOR_TYPE_PUMP_STATE:
92 if self._sensor_id not in sensor_config.pump_state_sensors:
93 __show_warning()
94 elif self._sensor_type == SENSOR_TYPE_TANK_VOLUME:
95 if self._sensor_id not in sensor_config.tank_volume_sensors:
96 __show_warning()
97 elif self._sensor_type == SENSOR_TYPE_NODE_BULK_SPECIES:
98 sensor_present = False
99 for _, sensors_id in sensor_config.bulk_species_node_sensors.items():
100 if self._sensor_id in sensors_id:
101 sensor_present = True
102 break
103 if sensor_present is False:
104 __show_warning()
105 elif self._sensor_type == SENSOR_TYPE_LINK_BULK_SPECIES:
106 sensor_present = False
107 for _, sensors_id in sensor_config.bulk_species_link_sensors.items():
108 if self._sensor_id in sensors_id:
109 sensor_present = True
110 break
111 if sensor_present is False:
112 __show_warning()
113 elif self._sensor_type == SENSOR_TYPE_SURFACE_SPECIES:
114 sensor_present = False
115 for _, sensors_id in sensor_config.surface_species_sensors.items():
116 if self._sensor_id in sensors_id:
117 sensor_present = True
118 break
119 if sensor_present is False:
120 __show_warning()
121
122 @property
123 def sensor_id(self) -> str:
124 """
125 Gets the ID of the node or link that is affected by this event.
126
127 Returns
128 -------
129 `str`
130 Node or link ID.
131 """
132 return self._sensor_id
133
134 @property
135 def sensor_type(self) -> int:
136 """
137 Gets the sensor type code.
138
139 Returns
140 -------
141 `int`
142 Sensor type code.
143 """
144 return self._sensor_type
145
[docs]
146 def get_attributes(self) -> dict:
147 return super().get_attributes() | {"sensor_id": self._sensor_id,
148 "sensor_type": self._sensor_type}
149
150 def __eq__(self, other) -> bool:
151 if not isinstance(other, SensorReadingEvent):
152 raise TypeError("Can not compare 'SensorReadingEvent' instance " +
153 f"with '{type(other)}' instance")
154
155 return super().__eq__(other) and self._sensor_id == other.sensor_id \
156 and self._sensor_type == other.sensor_type
157
158 def __str__(self) -> str:
159 return f"{super().__str__()} sensor_id: {self._sensor_id} " +\
160 f"sensor_type: {self._sensor_type}"
161
162 def __call__(self, sensor_readings: numpy.ndarray,
163 sensor_readings_time: numpy.ndarray) -> numpy.ndarray:
164 return self.apply(sensor_readings, sensor_readings_time)
165
[docs]
166 @abstractmethod
167 def apply(self, sensor_readings: numpy.ndarray,
168 sensor_readings_time: numpy.ndarray) -> numpy.ndarray:
169 """
170 Applies the sensor reading event to sensor reading values -- i.e.
171 modify the sensor readings.
172
173 Parameters
174 ----------
175 sensor_readings : `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_
176 Original sensor readings.
177 sensor_readings_time : `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_
178 Time (seconds since simulation start) for each sensor reading row in 'sensor_readings'.
179
180 Returns
181 -------
182 `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_
183 Modified sensor readings.
184 """
185 raise NotImplementedError()