Source code for epyt_flow.simulation.events.sensor_faults

  1"""
  2Module provides classes for implementing different sensor faults.
  3"""
  4from abc import abstractmethod
  5import numpy as np
  6
  7from .sensor_reading_event import SensorReadingEvent
  8from ...serialization import serializable, JsonSerializable, SENSOR_FAULT_CONSTANT_ID, \
  9    SENSOR_FAULT_DRIFT_ID, SENSOR_FAULT_GAUSSIAN_ID, SENSOR_FAULT_PERCENTAGE_ID, \
 10    SENSOR_FAULT_STUCKATZERO_ID
 11
 12
[docs] 13class SensorFault(SensorReadingEvent): 14 """ 15 Base class for a sensor fault 16 """ 17 # Acknowledgement: This Python implementation is based on 18 # https://github.com/eldemet/sensorfaultmodels/blob/main/sensorfaultmodels.m 19 # and https://github.com/Mariosmsk/sensorfaultmodels/blob/main/sensorfaultmodels.py 20
[docs] 21 def compute_multiplier(self, cur_time: int) -> float: 22 """ 23 Computes the multiplier for a given time stamp. 24 25 Parameters 26 ---------- 27 cur_time : `int` 28 Time in seconds. 29 30 Returns 31 ------- 32 `float` 33 Multiplier. 34 """ 35 b1 = 0 36 b2 = 0 37 a1 = 1 38 a2 = 1 39 40 if cur_time >= self.start_time: 41 b1 = 1 - np.exp(- a1 * (cur_time - self.start_time)) 42 43 if cur_time >= self.end_time: 44 b2 = 1 - np.exp(- a2 * (cur_time - self.end_time)) 45 46 return b1 - b2
47
[docs] 48 @abstractmethod 49 def apply_sensor_fault(self, cur_multiplier: float, sensor_reading: float, 50 cur_time: int) -> float: 51 """ 52 Applies this sensor fault to a given single sensor reading value -- 53 i.e. the sensor reading value is perturbed by this fault. 54 55 Parameters: 56 ----------- 57 cur_multiplier : `float` 58 Current multiplier -- i.e. controls the "strength" of the fault. 59 sensor_reading : `float` 60 Sensor reading value. 61 cur_time : `int` 62 Current time stamp (in seconds) in the simulation. 63 64 Returns 65 ------- 66 `float` 67 Perturbed sensor reading value. 68 """ 69 raise NotImplementedError()
70
[docs] 71 def apply(self, sensor_readings: np.ndarray, 72 sensor_readings_time: np.ndarray) -> np.ndarray: 73 for i in range(sensor_readings.shape[0]): 74 t = sensor_readings_time[i] 75 sensor_readings[i] = self.apply_sensor_fault(self.compute_multiplier(t), 76 sensor_readings[i], t) 77 78 return sensor_readings
79 80
[docs] 81@serializable(SENSOR_FAULT_CONSTANT_ID, ".epytflow_sensorfault_constant") 82class SensorFaultConstant(SensorFault, JsonSerializable): 83 """ 84 Class implementing a constant shift sensor fault. 85 86 Parameters 87 ---------- 88 constant_shift : `float` 89 Constant that is added to the sensor reading. 90 """ 91 def __init__(self, constant_shift: float, **kwds): 92 if not isinstance(constant_shift, float): 93 raise TypeError("'constant_shift' must be an instance of 'float' but no of " + 94 f"'{type(constant_shift)}'") 95 96 self._constant_shift = constant_shift 97 98 super().__init__(**kwds) 99 100 @property 101 def constant_shift(self) -> float: 102 """ 103 Gets the Constant that is added to the sensor reading. 104 105 Returns 106 ------- 107 `float` 108 Constant that is added to the sensor reading. 109 """ 110 return self._constant_shift 111
[docs] 112 def get_attributes(self) -> dict: 113 return super().get_attributes() | {"constant_shift": self._constant_shift}
114 115 def __eq__(self, other) -> bool: 116 return super().__eq__(other) and self._constant_shift == other.constant_shift 117 118 def __str__(self) -> str: 119 return f"{type(self).__name__} {super().__str__()} constant: {self._constant_shift}" 120
[docs] 121 def apply_sensor_fault(self, cur_multiplier: float, sensor_reading: float, 122 cur_time: int) -> float: 123 return sensor_reading + cur_multiplier * self._constant_shift
124 125
[docs] 126@serializable(SENSOR_FAULT_DRIFT_ID, ".epytflow_sensorfault_drift") 127class SensorFaultDrift(SensorFault, JsonSerializable): 128 """ 129 Class implementing a drift sensor fault. 130 131 Parameters 132 ---------- 133 coef : `float` 134 Coefficient of the drift. 135 """ 136 def __init__(self, coef: float, **kwds): 137 self._coef = coef 138 139 super().__init__(**kwds) 140 141 @property 142 def coef(self) -> float: 143 """ 144 Gets the coefficient of the drift. 145 146 Returns 147 ------- 148 `float` 149 Coefficient of the drift. 150 """ 151 return self._coef 152
[docs] 153 def get_attributes(self) -> dict: 154 return super().get_attributes() | {"coef": self._coef}
155 156 def __eq__(self, other) -> bool: 157 return super().__eq__(other) and self._coef == other.coef 158 159 def __str__(self) -> str: 160 return f"{type(self).__name__} {super().__str__()} coef: {self._coef}" 161
[docs] 162 def apply_sensor_fault(self, cur_multiplier: float, sensor_reading: float, 163 cur_time: int) -> float: 164 return sensor_reading + cur_multiplier * (self._coef * (cur_time - self.start_time))
165 166
[docs] 167@serializable(SENSOR_FAULT_GAUSSIAN_ID, ".epytflow_sensorfault_gaussian") 168class SensorFaultGaussian(SensorFault, JsonSerializable): 169 """ 170 Class implementing a Gaussian shift sensor fault -- i.e. 171 adding Gaussian noise (centered at zero) to the sensor reading. 172 173 Parameters 174 ---------- 175 std : `float` 176 Standard deviation of the Gaussian noise. 177 """ 178 def __init__(self, std: float, **kwds): 179 if not isinstance(std, float) or not std > 0: 180 raise ValueError("'std' must be an instance of 'float' and be greater than 0") 181 182 self._std = std 183 184 super().__init__(**kwds) 185 186 @property 187 def std(self) -> float: 188 """ 189 Gets the standard deviation of the Gaussian noise. 190 191 Returns 192 ------- 193 `float` 194 Standard deviation of the Gaussian noise. 195 """ 196 return self._std 197
[docs] 198 def get_attributes(self) -> dict: 199 return super().get_attributes() | {"std": self._std}
200 201 def __eq__(self, other) -> bool: 202 return super().__eq__(other) and self._std == other.std 203 204 def __str__(self) -> str: 205 return f"{type(self).__name__} {super().__str__()} std: {self._std}" 206
[docs] 207 def apply_sensor_fault(self, cur_multiplier: float, sensor_reading: float, 208 cur_time: int) -> float: 209 return sensor_reading + cur_multiplier * np.random.normal(loc=0, scale=self._std)
210 211
[docs] 212@serializable(SENSOR_FAULT_PERCENTAGE_ID, ".epytflow_sensorfault_percentage",) 213class SensorFaultPercentage(SensorFault, JsonSerializable): 214 """ 215 Class implementing a percentage shift sensor fault. 216 217 Parameters 218 ---------- 219 coef : `float` 220 Coefficient (percentage) of the shift -- i.e. coef must be in (0,]. 221 """ 222 def __init__(self, coef: float, **kwds): 223 if not isinstance(coef, float) or not coef > 0: 224 raise ValueError("'coef' must be an instance of 'float' and be greater than zero.") 225 226 self._coef = coef 227 228 super().__init__(**kwds) 229 230 @property 231 def coef(self) -> float: 232 """ 233 Gets the coefficient (percentage) of the shift. 234 235 Returns 236 ------- 237 `float` 238 Coefficient (percentage) of the shift. 239 """ 240 return self._coef 241
[docs] 242 def get_attributes(self) -> dict: 243 return super().get_attributes() | {"coef": self._coef}
244 245 def __eq__(self, other) -> bool: 246 return super().__eq__(other) and self._coef == other.coef 247 248 def __str__(self) -> str: 249 return f"{type(self).__name__} {super().__str__()} coef: {self._coef}" 250
[docs] 251 def apply_sensor_fault(self, cur_multiplier: float, sensor_reading: float, 252 cur_time: int) -> float: 253 return sensor_reading + cur_multiplier * self._coef * sensor_reading
254 255
[docs] 256@serializable(SENSOR_FAULT_STUCKATZERO_ID, ".epytflow_sensorfault_zero") 257class SensorFaultStuckZero(SensorFault, JsonSerializable): 258 """ 259 Class implementing a stuck-at-zero sensor fault -- i.e. sensor reading is set to zero. 260 """ 261 262 def __str__(self) -> str: 263 return f"{type(self).__name__} {super().__str__()}" 264
[docs] 265 def apply_sensor_fault(self, cur_multiplier: float, sensor_reading: float, 266 cur_time: int) -> float: 267 return sensor_reading + cur_multiplier * (-1. * sensor_reading)