1"""
2This module provides classes for implementing different types of sensor reading attacks.
3"""
4from copy import deepcopy
5import numpy as np
6
7from .sensor_reading_event import SensorReadingEvent
8from ...serialization import serializable, JsonSerializable, SENSOR_ATTACK_OVERRIDE_ID, \
9 SENSOR_ATTACK_REPLAY_ID
10
11
[docs]
12class SensorReadingAttack(SensorReadingEvent):
13 """
14 Base class of a sensor reading attack.
15 """
16
17
[docs]
18@serializable(SENSOR_ATTACK_OVERRIDE_ID, ".epytflow_sensorattack_override")
19class SensorOverrideAttack(SensorReadingAttack, JsonSerializable):
20 """
21 Class implementing a sensor override attack -- i.e. sensor reading values are overwritten
22 by pre-defined values.
23
24 If the override attack is running out of pre-defined sensor reading values,
25 it repeats the given values from the beginning onwards.
26
27 Parameters
28 ----------
29 new_sensor_values : `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_
30 New sensor reading values -- i.e. these values replace the true sensor reading values.
31 """
32 def __init__(self, new_sensor_values: np.ndarray, **kwds):
33 if not isinstance(new_sensor_values, np.ndarray):
34 raise TypeError("'new_sensor_values' must be an instance of 'numpy.ndarray' " +
35 f"but not of '{type(new_sensor_values)}'")
36 if len(new_sensor_values.shape) != 1:
37 raise ValueError("'new_sensor_values' must be a 1-dimensional array")
38 if len(new_sensor_values) == 0:
39 raise ValueError("'new_sensor_values' can not be empty")
40
41 self._new_sensor_values = new_sensor_values
42 self._cur_replay_idx = 0
43
44 super().__init__(**kwds)
45
46 @property
47 def new_sensor_values(self) -> np.ndarray:
48 """
49 Returns the new sensor reading values -- i.e. these values replace the
50 true sensor reading values.
51
52 Returns
53 -------
54 `np.ndarray`
55 New sensor readings.
56 """
57 return deepcopy(self._new_sensor_values)
58
[docs]
59 def get_attributes(self) -> dict:
60 return super().get_attributes() | {"new_sensor_values": self._new_sensor_values}
61
62 def __eq__(self, other) -> bool:
63 if not isinstance(other, SensorOverrideAttack):
64 raise TypeError("Can not compare 'SensorOverrideAttack' instance " +
65 f"with '{type(other)}' instance")
66
67 return super().__eq__(other) and np.all(self._new_sensor_values == other.new_sensor_values)
68
69 def __str__(self) -> str:
70 return f"{type(self).__name__} {super().__str__()} " +\
71 f"new_sensor_values: {self._new_sensor_values}"
72
[docs]
73 def apply(self, sensor_readings: np.ndarray,
74 sensor_readings_time: np.ndarray) -> np.ndarray:
75 for i in range(sensor_readings.shape[0]):
76 t = sensor_readings_time[i]
77
78 if self.start_time <= t <= self.end_time:
79 sensor_readings[i] = self._new_sensor_values[self._cur_replay_idx]
80 self._cur_replay_idx = (self._cur_replay_idx + 1) % len(self._new_sensor_values)
81
82 return sensor_readings
83
84
[docs]
85@serializable(SENSOR_ATTACK_REPLAY_ID, ".epytflow_sensorattack_replay")
86class SensorReplayAttack(SensorReadingAttack, JsonSerializable):
87 """
88 Class implementing a sensor replay attack -- i.e. sensor readings are replaced by
89 historical recordings.
90
91 If the provided time window of historical recordings is smaller than the time window of the
92 attack, it repeats the historical values from the beginning onwards.
93
94 Parameters
95 ----------
96 replay_data_time_window_start : `int`
97 Start (seconds since simulation start) of the time window that is used in the replay
98 of sensor readings.
99 replay_data_time_window_end : `int`
100 End (seconds since simulation start) of the time window that is used in the replay
101 of sensor readings.
102 """
103 def __init__(self, replay_data_time_window_start: int, replay_data_time_window_end: int,
104 **kwds):
105 if not isinstance(replay_data_time_window_start, int):
106 raise TypeError("'replay_data_time_window_start' must be an instance of 'int' " +
107 f"but not of {type(replay_data_time_window_start)}")
108 if not isinstance(replay_data_time_window_end, int):
109 raise TypeError("'replay_data_time_window_end' must be an instance of 'int' " +
110 f"but not of {type(replay_data_time_window_end)}")
111 if replay_data_time_window_start > replay_data_time_window_end or \
112 replay_data_time_window_start < 0:
113 raise ValueError("Invalid values for 'replay_data_time_window_start' and/or " +
114 "'replay_data_time_window_end' detected.")
115
116 self._new_sensor_values = np.zeros(replay_data_time_window_end -
117 replay_data_time_window_start)
118 self._sensor_data_time_window_start = replay_data_time_window_start
119 self._sensor_data_time_window_end = replay_data_time_window_end
120 self._cur_hist_idx = 0
121 self._cur_replay_idx = 0
122
123 super().__init__(**kwds)
124
125 if self._sensor_data_time_window_start > self.start_time:
126 raise ValueError("'replay_data_time_window_start' must be less than 'start_time'")
127
128 @property
129 def sensor_data_time_window_start(self) -> int:
130 """
131 Gets the start time (seconds since simulation start) of the time window
132 that is used in the replay of sensor readings.
133
134 Returns
135 -------
136 `int`
137 Start time.
138 """
139 return self._sensor_data_time_window_start
140
141 @property
142 def sensor_data_time_window_end(self) -> int:
143 """
144 Gets the end time (seconds since simulation start) of the time window
145 that is used in the replay of sensor readings.
146
147 Returns
148 -------
149 `int`
150 End time.
151 """
152 return self._sensor_data_time_window_end
153
154 @property
155 def new_sensor_values(self) -> np.ndarray:
156 """
157 Returns the new sensor reading values -- i.e. these values replace the
158 true sensor reading values.
159
160 Returns
161 -------
162 `np.ndarray`
163 New sensor readings.
164 """
165 return deepcopy(self._new_sensor_values)
166
[docs]
167 def get_attributes(self) -> dict:
168 my_attributes = {"new_sensor_values": self._new_sensor_values,
169 "replay_data_time_window_start": self._sensor_data_time_window_start,
170 "replay_data_time_window_end": self._sensor_data_time_window_end}
171
172 return super().get_attributes() | my_attributes
173
174 def __eq__(self, other) -> bool:
175 if not isinstance(other, SensorReplayAttack):
176 raise TypeError("Can not compare 'SensorReplayAttack' instance " +
177 f"with '{type(other)}' instance")
178
179 return super().__eq__(other) and np.all(self._new_sensor_values == other.new_sensor_values)
180
181 def __str__(self) -> str:
182 return f"{type(self).__name__} {super().__str__()} " +\
183 f"new_sensor_values: {self._new_sensor_values}"
184
[docs]
185 def apply(self, sensor_readings: np.ndarray,
186 sensor_readings_time: np.ndarray) -> np.ndarray:
187 for i in range(sensor_readings.shape[0]):
188 t = sensor_readings_time[i]
189
190 if self._sensor_data_time_window_start <= t <= self._sensor_data_time_window_end:
191 self._new_sensor_values[self._cur_hist_idx] = sensor_readings[i]
192 self._cur_hist_idx += 1
193
194 if self.start_time <= t <= self.end_time:
195 sensor_readings[i] = self._new_sensor_values[self._cur_replay_idx]
196 self._cur_replay_idx = (self._cur_replay_idx + 1) % len(self._new_sensor_values)
197
198 return sensor_readings