1"""
2Module provides a class for implementing sensor noise (e.g. uncertainty in sensor readings).
3"""
4from typing import Optional, Callable
5from copy import deepcopy
6import numpy
7from numpy.random import default_rng
8
9from .uncertainties import Uncertainty
10from ..serialization import serializable, JsonSerializable, SENSOR_NOISE_ID
11
12
[docs]
13@serializable(SENSOR_NOISE_ID, ".epytflow_sensor_noise")
14class SensorNoise(JsonSerializable):
15 """
16 Class implementing sensor noise/uncertainty.
17
18 Parameters
19 ----------
20 global_uncertainty : :class:`~epyt_flow.uncertainty.uncertainties.Uncertainty`, optional
21 Global sensor uncertainty. If None, no global sensor uncertainties are applied.
22
23 The default is None.
24 local_uncertainties : dict[tuple[int, str], :class:`~epyt_flow.uncertainty.uncertainties.Uncertainty`], optional
25 Local (i.e. sensor specific) uncertainties.
26 If None, no local sensor uncertainties are applied.
27
28 The default is None.
29 seed : `int`, optional
30 Seed for the random number generator.
31
32 The default is None.
33 """
34 def __init__(self, global_uncertainty: Optional[Uncertainty] = None,
35 local_uncertainties: Optional[dict[int, str, Uncertainty]] = None,
36 seed: Optional[int] = None,
37 **kwds):
38 if not isinstance(global_uncertainty, Uncertainty):
39 raise TypeError("'global_uncertainty' must be an instance of " +
40 "'epyt_flow.uncertainty.Uncertainty' but not of " +
41 f"'{type(global_uncertainty)}'")
42 if local_uncertainties is not None:
43 if not isinstance(local_uncertainties, dict):
44 raise TypeError("'local_uncertainties' must be an instance of " +
45 "'dict[tuple[int, str], epyt_flow.uncertainty.Uncertainty]' "+
46 f"but not of '{type(local_uncertainties)}'")
47 if any(not isinstance(key[0], int) or not isinstance(key[1], str) or
48 not isinstance(local_uncertainties[key], Uncertainty)
49 for key in local_uncertainties.keys()):
50 raise TypeError("'local_uncertainties' must be an instance of " +
51 "'dict[tuple[int, str], epyt_flow.uncertainty.Uncertainty]'")
52
53 self._global_uncertainty = global_uncertainty
54 self._local_uncertainties = local_uncertainties
55 self._np_rand_gen = default_rng(seed)
56
57 super().__init__(**kwds)
58
59 @property
60 def global_uncertainty(self) -> Uncertainty:
61 """
62 Returns the global sensor readings uncertainty.
63
64 Returns
65 -------
66 :class:`~epyt_flow.uncertainty.uncertainties.Uncertainty`
67 Global sensor readings uncertainty.
68 """
69 return deepcopy(self._global_uncertainty)
70
71 @property
72 def local_uncertainties(self) -> dict[int, str, Uncertainty]:
73 """
74 Returns the local (i.e. sensor specific) uncertainties.
75
76 Returns
77 -------
78 dict[tuple[int, str], :class:`~epyt_flow.uncertainty.uncertainties.Uncertainty`]
79 Local (i.e. sensor specific) uncertainties.
80 """
81 return deepcopy(self._local_uncertainties)
82
[docs]
83 def get_attributes(self) -> dict:
84 return super().get_attributes() | {"global_uncertainty": self._global_uncertainty,
85 "local_uncertainties": self._local_uncertainties}
86
87 def __eq__(self, other) -> bool:
88 if not isinstance(other, SensorNoise):
89 raise TypeError("Can not compare 'SensorNoise' instance " +
90 f"with '{type(other)}' instance")
91
92 return super().__eq__(other) and self._global_uncertainty == other.global_uncertainty and \
93 self._local_uncertainties == other.local_uncertainties
94
95 def __str__(self) -> str:
96 return f"global_uncertainty: {self._global_uncertainty} " + \
97 f"local_uncertainties: {self._local_uncertainties}"
98
[docs]
99 def apply_local_uncertainty(self, map_sensor_to_idx: Callable[[int, str], int],
100 sensor_readings: numpy.ndarray) -> numpy.ndarray:
101 """
102 Applies the local (i.e. sensor specific) sensor uncertainties -- i.e. sensor readings
103 are perturbed according to the specified uncertainties.
104
105 Parameters
106 ----------
107 map_sensor_to_idx : `Callable[[int, str], int]`
108 Function mapping sensor type (int) and sensor id (e.g. node id, link id, etc.) to indices
109 in the final sensor readings.
110 sensor_readings : `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_
111 All (global) sensor readings (no matter if ther).
112
113 Returns
114 -------
115 `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_
116 Perturbed sensor readings.
117 """
118 if self._local_uncertainties is None:
119 return sensor_readings
120 else:
121 self._local_uncertainties.set_random_generator(self._np_rand_gen)
122
123 for (sensor_type, sensor_id), uncertainty in map_sensor_to_idx.items():
124 idx = map_sensor_to_idx(sensor_type, sensor_id)
125 sensor_readings[:, idx] = uncertainty.apply_batch(sensor_readings[:, idx])
126 return sensor_readings
127
[docs]
128 def apply_global_uncertainty(self, sensor_readings: numpy.ndarray) -> numpy.ndarray:
129 """
130 Applies the global sensor uncertainty to given sensor readings -- i.e. sensor readings
131 are perturbed according to the specified uncertainty.
132
133 .. note::
134 Note that state sensor readings such as valve states, pump states, etc.
135 are NOT affected by sensor noise!
136
137 Parameters
138 ----------
139 sensor_readings : `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_
140 All (global) senor readings.
141
142 Returns
143 -------
144 `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_
145 Perturbed sensor readings.
146 """
147 if self._global_uncertainty is None:
148 return sensor_readings
149 else:
150 self._global_uncertainty.set_random_generator(self._np_rand_gen)
151
152 return self._global_uncertainty.apply_batch(sensor_readings)