1"""
2Module provides classes for implementing different sensor faults.
3"""
4from abc import abstractmethod
5import numpy as np
6
7from .sensor_reading_event import SensorReadingEvent
8from ...serialization import serializable, JsonSerializable, SENSOR_FAULT_CONSTANT_ID, \
9 SENSOR_FAULT_DRIFT_ID, SENSOR_FAULT_GAUSSIAN_ID, SENSOR_FAULT_PERCENTAGE_ID, \
10 SENSOR_FAULT_STUCKATZERO_ID
11
12
[docs]
13class SensorFault(SensorReadingEvent):
14 """
15 Base class for a sensor fault
16 """
17 # Acknowledgement: This Python implementation is based on
18 # https://github.com/eldemet/sensorfaultmodels/blob/main/sensorfaultmodels.m
19 # and https://github.com/Mariosmsk/sensorfaultmodels/blob/main/sensorfaultmodels.py
20
[docs]
21 def compute_multiplier(self, cur_time: int) -> float:
22 """
23 Computes the multiplier for a given time stamp.
24
25 Parameters
26 ----------
27 cur_time : `int`
28 Time in seconds.
29
30 Returns
31 -------
32 `float`
33 Multiplier.
34 """
35 b1 = 0
36 b2 = 0
37 a1 = 1
38 a2 = 1
39
40 if cur_time >= self.start_time:
41 b1 = 1 - np.exp(- a1 * (cur_time - self.start_time))
42
43 if cur_time >= self.end_time:
44 b2 = 1 - np.exp(- a2 * (cur_time - self.end_time))
45
46 return b1 - b2
47
[docs]
48 @abstractmethod
49 def apply_sensor_fault(self, cur_multiplier: float, sensor_reading: float,
50 cur_time: int) -> float:
51 """
52 Applies this sensor fault to a given single sensor reading value --
53 i.e. the sensor reading value is perturbed by this fault.
54
55 Parameters:
56 -----------
57 cur_multiplier : `float`
58 Current multiplier -- i.e. controls the "strength" of the fault.
59 sensor_reading : `float`
60 Sensor reading value.
61 cur_time : `int`
62 Current time stamp (in seconds) in the simulation.
63
64 Returns
65 -------
66 `float`
67 Perturbed sensor reading value.
68 """
69 raise NotImplementedError()
70
[docs]
71 def apply(self, sensor_readings: np.ndarray,
72 sensor_readings_time: np.ndarray) -> np.ndarray:
73 for i in range(sensor_readings.shape[0]):
74 t = sensor_readings_time[i]
75 sensor_readings[i] = self.apply_sensor_fault(self.compute_multiplier(t),
76 sensor_readings[i], t)
77
78 return sensor_readings
79
80
[docs]
81@serializable(SENSOR_FAULT_CONSTANT_ID, ".epytflow_sensorfault_constant")
82class SensorFaultConstant(SensorFault, JsonSerializable):
83 """
84 Class implementing a constant shift sensor fault.
85
86 Parameters
87 ----------
88 constant_shift : `float`
89 Constant that is added to the sensor reading.
90 """
91 def __init__(self, constant_shift: float, **kwds):
92 if not isinstance(constant_shift, float):
93 raise TypeError("'constant_shift' must be an instance of 'float' but no of " +
94 f"'{type(constant_shift)}'")
95
96 self._constant_shift = constant_shift
97
98 super().__init__(**kwds)
99
100 @property
101 def constant_shift(self) -> float:
102 """
103 Gets the Constant that is added to the sensor reading.
104
105 Returns
106 -------
107 `float`
108 Constant that is added to the sensor reading.
109 """
110 return self._constant_shift
111
[docs]
112 def get_attributes(self) -> dict:
113 return super().get_attributes() | {"constant_shift": self._constant_shift}
114
115 def __eq__(self, other) -> bool:
116 return super().__eq__(other) and self._constant_shift == other.constant_shift
117
118 def __str__(self) -> str:
119 return f"{type(self).__name__} {super().__str__()} constant: {self._constant_shift}"
120
[docs]
121 def apply_sensor_fault(self, cur_multiplier: float, sensor_reading: float,
122 cur_time: int) -> float:
123 return sensor_reading + cur_multiplier * self._constant_shift
124
125
[docs]
126@serializable(SENSOR_FAULT_DRIFT_ID, ".epytflow_sensorfault_drift")
127class SensorFaultDrift(SensorFault, JsonSerializable):
128 """
129 Class implementing a drift sensor fault.
130
131 Parameters
132 ----------
133 coef : `float`
134 Coefficient of the drift.
135 """
136 def __init__(self, coef: float, **kwds):
137 self._coef = coef
138
139 super().__init__(**kwds)
140
141 @property
142 def coef(self) -> float:
143 """
144 Gets the coefficient of the drift.
145
146 Returns
147 -------
148 `float`
149 Coefficient of the drift.
150 """
151 return self._coef
152
[docs]
153 def get_attributes(self) -> dict:
154 return super().get_attributes() | {"coef": self._coef}
155
156 def __eq__(self, other) -> bool:
157 return super().__eq__(other) and self._coef == other.coef
158
159 def __str__(self) -> str:
160 return f"{type(self).__name__} {super().__str__()} coef: {self._coef}"
161
[docs]
162 def apply_sensor_fault(self, cur_multiplier: float, sensor_reading: float,
163 cur_time: int) -> float:
164 return sensor_reading + cur_multiplier * (self._coef * (cur_time - self.start_time))
165
166
[docs]
167@serializable(SENSOR_FAULT_GAUSSIAN_ID, ".epytflow_sensorfault_gaussian")
168class SensorFaultGaussian(SensorFault, JsonSerializable):
169 """
170 Class implementing a Gaussian shift sensor fault -- i.e.
171 adding Gaussian noise (centered at zero) to the sensor reading.
172
173 Parameters
174 ----------
175 std : `float`
176 Standard deviation of the Gaussian noise.
177 """
178 def __init__(self, std: float, **kwds):
179 if not isinstance(std, float) or not std > 0:
180 raise ValueError("'std' must be an instance of 'float' and be greater than 0")
181
182 self._std = std
183
184 super().__init__(**kwds)
185
186 @property
187 def std(self) -> float:
188 """
189 Gets the standard deviation of the Gaussian noise.
190
191 Returns
192 -------
193 `float`
194 Standard deviation of the Gaussian noise.
195 """
196 return self._std
197
[docs]
198 def get_attributes(self) -> dict:
199 return super().get_attributes() | {"std": self._std}
200
201 def __eq__(self, other) -> bool:
202 return super().__eq__(other) and self._std == other.std
203
204 def __str__(self) -> str:
205 return f"{type(self).__name__} {super().__str__()} std: {self._std}"
206
[docs]
207 def apply_sensor_fault(self, cur_multiplier: float, sensor_reading: float,
208 cur_time: int) -> float:
209 return sensor_reading + cur_multiplier * np.random.normal(loc=0, scale=self._std)
210
211
[docs]
212@serializable(SENSOR_FAULT_PERCENTAGE_ID, ".epytflow_sensorfault_percentage",)
213class SensorFaultPercentage(SensorFault, JsonSerializable):
214 """
215 Class implementing a percentage shift sensor fault.
216
217 Parameters
218 ----------
219 coef : `float`
220 Coefficient (percentage) of the shift -- i.e. coef must be in (0,].
221 """
222 def __init__(self, coef: float, **kwds):
223 if not isinstance(coef, float) or not coef > 0:
224 raise ValueError("'coef' must be an instance of 'float' and be greater than zero.")
225
226 self._coef = coef
227
228 super().__init__(**kwds)
229
230 @property
231 def coef(self) -> float:
232 """
233 Gets the coefficient (percentage) of the shift.
234
235 Returns
236 -------
237 `float`
238 Coefficient (percentage) of the shift.
239 """
240 return self._coef
241
[docs]
242 def get_attributes(self) -> dict:
243 return super().get_attributes() | {"coef": self._coef}
244
245 def __eq__(self, other) -> bool:
246 return super().__eq__(other) and self._coef == other.coef
247
248 def __str__(self) -> str:
249 return f"{type(self).__name__} {super().__str__()} coef: {self._coef}"
250
[docs]
251 def apply_sensor_fault(self, cur_multiplier: float, sensor_reading: float,
252 cur_time: int) -> float:
253 return sensor_reading + cur_multiplier * self._coef * sensor_reading
254
255
[docs]
256@serializable(SENSOR_FAULT_STUCKATZERO_ID, ".epytflow_sensorfault_zero")
257class SensorFaultStuckZero(SensorFault, JsonSerializable):
258 """
259 Class implementing a stuck-at-zero sensor fault -- i.e. sensor reading is set to zero.
260 """
261
262 def __str__(self) -> str:
263 return f"{type(self).__name__} {super().__str__()}"
264
[docs]
265 def apply_sensor_fault(self, cur_multiplier: float, sensor_reading: float,
266 cur_time: int) -> float:
267 return sensor_reading + cur_multiplier * (-1. * sensor_reading)