Source code for epyt_flow.simulation.events.sensor_reading_attack

  1"""
  2This module provides classes for implementing different types of sensor reading attacks.
  3"""
  4from copy import deepcopy
  5import numpy as np
  6
  7from .sensor_reading_event import SensorReadingEvent
  8from ...serialization import serializable, JsonSerializable, SENSOR_ATTACK_OVERRIDE_ID, \
  9    SENSOR_ATTACK_REPLAY_ID
 10
 11
[docs] 12class SensorReadingAttack(SensorReadingEvent): 13 """ 14 Base class of a sensor reading attack. 15 """
16 17
[docs] 18@serializable(SENSOR_ATTACK_OVERRIDE_ID, ".epytflow_sensorattack_override") 19class SensorOverrideAttack(SensorReadingAttack, JsonSerializable): 20 """ 21 Class implementing a sensor override attack -- i.e. sensor reading values are overwritten 22 by pre-defined values. 23 24 If the override attack is running out of pre-defined sensor reading values, 25 it repeats the given values from the beginning onwards. 26 27 Parameters 28 ---------- 29 new_sensor_values : `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ 30 New sensor reading values -- i.e. these values replace the true sensor reading values. 31 """ 32 def __init__(self, new_sensor_values: np.ndarray, **kwds): 33 if not isinstance(new_sensor_values, np.ndarray): 34 raise TypeError("'new_sensor_values' must be an instance of 'numpy.ndarray' " + 35 f"but not of '{type(new_sensor_values)}'") 36 if len(new_sensor_values.shape) != 1: 37 raise ValueError("'new_sensor_values' must be a 1-dimensional array") 38 if len(new_sensor_values) == 0: 39 raise ValueError("'new_sensor_values' can not be empty") 40 41 self._new_sensor_values = new_sensor_values 42 self._cur_replay_idx = 0 43 44 super().__init__(**kwds) 45 46 @property 47 def new_sensor_values(self) -> np.ndarray: 48 """ 49 Returns the new sensor reading values -- i.e. these values replace the 50 true sensor reading values. 51 52 Returns 53 ------- 54 `np.ndarray` 55 New sensor readings. 56 """ 57 return deepcopy(self._new_sensor_values) 58
[docs] 59 def get_attributes(self) -> dict: 60 return super().get_attributes() | {"new_sensor_values": self._new_sensor_values}
61 62 def __eq__(self, other) -> bool: 63 if not isinstance(other, SensorOverrideAttack): 64 raise TypeError("Can not compare 'SensorOverrideAttack' instance " + 65 f"with '{type(other)}' instance") 66 67 return super().__eq__(other) and np.all(self._new_sensor_values == other.new_sensor_values) 68 69 def __str__(self) -> str: 70 return f"{type(self).__name__} {super().__str__()} " +\ 71 f"new_sensor_values: {self._new_sensor_values}" 72
[docs] 73 def apply(self, sensor_readings: np.ndarray, 74 sensor_readings_time: np.ndarray) -> np.ndarray: 75 for i in range(sensor_readings.shape[0]): 76 t = sensor_readings_time[i] 77 78 if self.start_time <= t <= self.end_time: 79 sensor_readings[i] = self._new_sensor_values[self._cur_replay_idx] 80 self._cur_replay_idx = (self._cur_replay_idx + 1) % len(self._new_sensor_values) 81 82 return sensor_readings
83 84
[docs] 85@serializable(SENSOR_ATTACK_REPLAY_ID, ".epytflow_sensorattack_replay") 86class SensorReplayAttack(SensorReadingAttack, JsonSerializable): 87 """ 88 Class implementing a sensor replay attack -- i.e. sensor readings are replaced by 89 historical recordings. 90 91 If the provided time window of historical recordings is smaller than the time window of the 92 attack, it repeats the historical values from the beginning onwards. 93 94 Parameters 95 ---------- 96 replay_data_time_window_start : `int` 97 Start (seconds since simulation start) of the time window that is used in the replay 98 of sensor readings. 99 replay_data_time_window_end : `int` 100 End (seconds since simulation start) of the time window that is used in the replay 101 of sensor readings. 102 """ 103 def __init__(self, replay_data_time_window_start: int, replay_data_time_window_end: int, 104 **kwds): 105 if not isinstance(replay_data_time_window_start, int): 106 raise TypeError("'replay_data_time_window_start' must be an instance of 'int' " + 107 f"but not of {type(replay_data_time_window_start)}") 108 if not isinstance(replay_data_time_window_end, int): 109 raise TypeError("'replay_data_time_window_end' must be an instance of 'int' " + 110 f"but not of {type(replay_data_time_window_end)}") 111 if replay_data_time_window_start > replay_data_time_window_end or \ 112 replay_data_time_window_start < 0: 113 raise ValueError("Invalid values for 'replay_data_time_window_start' and/or " + 114 "'replay_data_time_window_end' detected.") 115 116 self._new_sensor_values = np.zeros(replay_data_time_window_end - 117 replay_data_time_window_start) 118 self._sensor_data_time_window_start = replay_data_time_window_start 119 self._sensor_data_time_window_end = replay_data_time_window_end 120 self._cur_hist_idx = 0 121 self._cur_replay_idx = 0 122 123 super().__init__(**kwds) 124 125 if self._sensor_data_time_window_start > self.start_time: 126 raise ValueError("'replay_data_time_window_start' must be less than 'start_time'") 127 128 @property 129 def sensor_data_time_window_start(self) -> int: 130 """ 131 Gets the start time (seconds since simulation start) of the time window 132 that is used in the replay of sensor readings. 133 134 Returns 135 ------- 136 `int` 137 Start time. 138 """ 139 return self._sensor_data_time_window_start 140 141 @property 142 def sensor_data_time_window_end(self) -> int: 143 """ 144 Gets the end time (seconds since simulation start) of the time window 145 that is used in the replay of sensor readings. 146 147 Returns 148 ------- 149 `int` 150 End time. 151 """ 152 return self._sensor_data_time_window_end 153 154 @property 155 def new_sensor_values(self) -> np.ndarray: 156 """ 157 Returns the new sensor reading values -- i.e. these values replace the 158 true sensor reading values. 159 160 Returns 161 ------- 162 `np.ndarray` 163 New sensor readings. 164 """ 165 return deepcopy(self._new_sensor_values) 166
[docs] 167 def get_attributes(self) -> dict: 168 my_attributes = {"new_sensor_values": self._new_sensor_values, 169 "replay_data_time_window_start": self._sensor_data_time_window_start, 170 "replay_data_time_window_end": self._sensor_data_time_window_end} 171 172 return super().get_attributes() | my_attributes
173 174 def __eq__(self, other) -> bool: 175 if not isinstance(other, SensorReplayAttack): 176 raise TypeError("Can not compare 'SensorReplayAttack' instance " + 177 f"with '{type(other)}' instance") 178 179 return super().__eq__(other) and np.all(self._new_sensor_values == other.new_sensor_values) 180 181 def __str__(self) -> str: 182 return f"{type(self).__name__} {super().__str__()} " +\ 183 f"new_sensor_values: {self._new_sensor_values}" 184
[docs] 185 def apply(self, sensor_readings: np.ndarray, 186 sensor_readings_time: np.ndarray) -> np.ndarray: 187 for i in range(sensor_readings.shape[0]): 188 t = sensor_readings_time[i] 189 190 if self._sensor_data_time_window_start <= t <= self._sensor_data_time_window_end: 191 self._new_sensor_values[self._cur_hist_idx] = sensor_readings[i] 192 self._cur_hist_idx += 1 193 194 if self.start_time <= t <= self.end_time: 195 sensor_readings[i] = self._new_sensor_values[self._cur_replay_idx] 196 self._cur_replay_idx = (self._cur_replay_idx + 1) % len(self._new_sensor_values) 197 198 return sensor_readings