Source code for epyt_flow.uncertainty.sensor_noise

  1"""
  2Module provides a class for implementing sensor noise (e.g. uncertainty in sensor readings).
  3"""
  4from typing import Optional, Callable
  5from copy import deepcopy
  6import numpy
  7from numpy.random import default_rng
  8
  9from .uncertainties import Uncertainty
 10from ..serialization import serializable, JsonSerializable, SENSOR_NOISE_ID
 11
 12
[docs] 13@serializable(SENSOR_NOISE_ID, ".epytflow_sensor_noise") 14class SensorNoise(JsonSerializable): 15 """ 16 Class implementing sensor noise/uncertainty. 17 18 Parameters 19 ---------- 20 global_uncertainty : :class:`~epyt_flow.uncertainty.uncertainties.Uncertainty`, optional 21 Global sensor uncertainty. If None, no global sensor uncertainties are applied. 22 23 The default is None. 24 local_uncertainties : dict[tuple[int, str], :class:`~epyt_flow.uncertainty.uncertainties.Uncertainty`], optional 25 Local (i.e. sensor specific) uncertainties. 26 If None, no local sensor uncertainties are applied. 27 28 The default is None. 29 seed : `int`, optional 30 Seed for the random number generator. 31 32 The default is None. 33 """ 34 def __init__(self, global_uncertainty: Optional[Uncertainty] = None, 35 local_uncertainties: Optional[dict[int, str, Uncertainty]] = None, 36 seed: Optional[int] = None, 37 **kwds): 38 if not isinstance(global_uncertainty, Uncertainty): 39 raise TypeError("'global_uncertainty' must be an instance of " + 40 "'epyt_flow.uncertainty.Uncertainty' but not of " + 41 f"'{type(global_uncertainty)}'") 42 if local_uncertainties is not None: 43 if not isinstance(local_uncertainties, dict): 44 raise TypeError("'local_uncertainties' must be an instance of " + 45 "'dict[tuple[int, str], epyt_flow.uncertainty.Uncertainty]' "+ 46 f"but not of '{type(local_uncertainties)}'") 47 if any(not isinstance(key[0], int) or not isinstance(key[1], str) or 48 not isinstance(local_uncertainties[key], Uncertainty) 49 for key in local_uncertainties.keys()): 50 raise TypeError("'local_uncertainties' must be an instance of " + 51 "'dict[tuple[int, str], epyt_flow.uncertainty.Uncertainty]'") 52 53 self._global_uncertainty = global_uncertainty 54 self._local_uncertainties = local_uncertainties 55 self._np_rand_gen = default_rng(seed) 56 57 super().__init__(**kwds) 58 59 @property 60 def global_uncertainty(self) -> Uncertainty: 61 """ 62 Returns the global sensor readings uncertainty. 63 64 Returns 65 ------- 66 :class:`~epyt_flow.uncertainty.uncertainties.Uncertainty` 67 Global sensor readings uncertainty. 68 """ 69 return deepcopy(self._global_uncertainty) 70 71 @property 72 def local_uncertainties(self) -> dict[int, str, Uncertainty]: 73 """ 74 Returns the local (i.e. sensor specific) uncertainties. 75 76 Returns 77 ------- 78 dict[tuple[int, str], :class:`~epyt_flow.uncertainty.uncertainties.Uncertainty`] 79 Local (i.e. sensor specific) uncertainties. 80 """ 81 return deepcopy(self._local_uncertainties) 82
[docs] 83 def get_attributes(self) -> dict: 84 return super().get_attributes() | {"global_uncertainty": self._global_uncertainty, 85 "local_uncertainties": self._local_uncertainties}
86 87 def __eq__(self, other) -> bool: 88 if not isinstance(other, SensorNoise): 89 raise TypeError("Can not compare 'SensorNoise' instance " + 90 f"with '{type(other)}' instance") 91 92 return super().__eq__(other) and self._global_uncertainty == other.global_uncertainty and \ 93 self._local_uncertainties == other.local_uncertainties 94 95 def __str__(self) -> str: 96 return f"global_uncertainty: {self._global_uncertainty} " + \ 97 f"local_uncertainties: {self._local_uncertainties}" 98
[docs] 99 def apply_local_uncertainty(self, map_sensor_to_idx: Callable[[int, str], int], 100 sensor_readings: numpy.ndarray) -> numpy.ndarray: 101 """ 102 Applies the local (i.e. sensor specific) sensor uncertainties -- i.e. sensor readings 103 are perturbed according to the specified uncertainties. 104 105 Parameters 106 ---------- 107 map_sensor_to_idx : `Callable[[int, str], int]` 108 Function mapping sensor type (int) and sensor id (e.g. node id, link id, etc.) to indices 109 in the final sensor readings. 110 sensor_readings : `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ 111 All (global) sensor readings (no matter if ther). 112 113 Returns 114 ------- 115 `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ 116 Perturbed sensor readings. 117 """ 118 if self._local_uncertainties is None: 119 return sensor_readings 120 else: 121 self._local_uncertainties.set_random_generator(self._np_rand_gen) 122 123 for (sensor_type, sensor_id), uncertainty in map_sensor_to_idx.items(): 124 idx = map_sensor_to_idx(sensor_type, sensor_id) 125 sensor_readings[:, idx] = uncertainty.apply_batch(sensor_readings[:, idx]) 126 return sensor_readings
127
[docs] 128 def apply_global_uncertainty(self, sensor_readings: numpy.ndarray) -> numpy.ndarray: 129 """ 130 Applies the global sensor uncertainty to given sensor readings -- i.e. sensor readings 131 are perturbed according to the specified uncertainty. 132 133 .. note:: 134 Note that state sensor readings such as valve states, pump states, etc. 135 are NOT affected by sensor noise! 136 137 Parameters 138 ---------- 139 sensor_readings : `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ 140 All (global) senor readings. 141 142 Returns 143 ------- 144 `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ 145 Perturbed sensor readings. 146 """ 147 if self._global_uncertainty is None: 148 return sensor_readings 149 else: 150 self._global_uncertainty.set_random_generator(self._np_rand_gen) 151 152 return self._global_uncertainty.apply_batch(sensor_readings)