1"""
2Module provides a base class for control environments.
3"""
4import os
5import uuid
6from abc import abstractmethod, ABC
7from typing import Union
8import warnings
9from epanet_plus import EpanetConstants
10
11from ..simulation import ScenarioSimulator, ScenarioConfig, ScadaData
12from ..utils import get_temp_folder
13
14
[docs]
15class ScenarioControlEnv(ABC):
16 """
17 Base class for a control environment challenge.
18
19 Parameters
20 ----------
21 scenario_config : :class:`~epyt_flow.simulation.scenario_config.ScenarioConfig`
22 Scenario configuration.
23 autoreset : `bool`, optional
24 If True, environment is automatically reset if terminated.
25
26 The default is False.
27 reapply_uncertainties_at_reset : `bool`, optional
28 If True, the uncertainties are re-applied to the original properties at each reset.
29
30 The default is False.
31
32 Attributes
33 ----------
34 _scenario_sim : :class:`~epyt_flow.simulation.scenario_simulator.ScenarioSimulator`, protected
35 Scenario simulator of the control scenario.
36 _scenario_config : :class:`~epyt_flow.simulation.scenario_config.ScenarioConfig`
37 Scenario configuration.
38 _sim_generator : Generator[Union[:class:`~epyt_flow.simulation.scada.scada_data.ScadaData`, dict], bool, None], protected
39 Generator for running the step-wise simulation.
40 _hydraulic_scada_data : :class:`~epyt_flow.simulation.scada.scada_data.ScadaData`, protected
41 SCADA data from the hydraulic simulation -- only used if EPANET-MSX is used in the control scenario.
42 """
43 def __init__(self, scenario_config: ScenarioConfig, autoreset: bool = False,
44 reapply_uncertainties_at_reset: bool = False,
45 **kwds):
46 if not isinstance(scenario_config, ScenarioConfig):
47 raise TypeError("'scenario_config' must be an instance of " +
48 "'epyt_flow.simulation.ScenarioConfig' " +
49 "but not of '{type(scenario_config)}'")
50 if not isinstance(autoreset, bool):
51 raise TypeError("'autoreset' must be an instance of 'bool' " +
52 f"but not of '{type(autoreset)}'")
53 if not isinstance(reapply_uncertainties_at_reset, bool):
54 raise TypeError("")
55
56 self._scenario_config = scenario_config
57 self._scenario_sim = None
58 self._sim_generator = None
59 self.__autoreset = autoreset
60 self._hydraulic_scada_data = None
61 self.__reapply_uncertainties_at_reset = reapply_uncertainties_at_reset
62
63 super().__init__(**kwds)
64
65 @property
66 def autoreset(self) -> bool:
67 """
68 True, if environment automatically resets after it terminated.
69
70 Returns
71 -------
72 `bool`
73 True, if environment automatically resets after it terminated.
74 """
75 return self.__autoreset
76
77 @property
78 def reapply_uncertainties_at_reset(self) -> bool:
79 """
80 True, if the uncertainties are re-applied to the original properties at each reset.
81
82 Returns
83 -------
84 `bool`
85 True, if the uncertainties are re-applied to the original properties at each reset.
86 """
87 return self.__reapply_uncertainties_at_reset
88
89 def __enter__(self):
90 return self
91
92 def __exit__(self, *args):
93 self.close()
94
[docs]
95 def close(self) -> None:
96 """
97 Frees all resources.
98 """
99 try:
100 if self._sim_generator is not None:
101 next(self._sim_generator)
102 self._sim_generator.send(True)
103 self._sim_generator = None
104 except StopIteration:
105 pass
106
107 if self._scenario_sim is not None:
108 self._scenario_sim.close()
109 self._scenario_sim = None
110
[docs]
111 def contains_events(self) -> bool:
112 """
113 Check if the scenario contains any events.
114
115 Returns
116 -------
117 `bool`
118 True is the scenario contains any events, False otherwise.
119 """
120 return len(self._scenario_config.system_events) != 0 or \
121 len(self._scenario_config.sensor_reading_events) != 0
122
[docs]
123 def reset(self) -> Union[tuple[ScadaData, bool], ScadaData]:
124 """
125 Resets the environment (i.e. simulation).
126
127 Returns
128 -------
129 :class:`~epyt_flow.simulation.scada.scada_data.ScadaData`
130 Current SCADA data (i.e. sensor readings).
131 """
132 self.close()
133
134 self._scenario_sim = ScenarioSimulator(
135 scenario_config=self._scenario_config)
136
137 if self._scenario_sim.f_msx_in is not None:
138 # Run hydraulic simulation first
139 hyd_export = os.path.join(get_temp_folder(), f"epytflow_env_MSX_{uuid.uuid4()}.hyd")
140 sim = self._scenario_sim.run_hydraulic_simulation
141 self._hydraulic_scada_data = sim(hyd_export=hyd_export,
142 reapply_uncertainties=self.__reapply_uncertainties_at_reset)
143
144 # Run advanced quality analysis (EPANET-MSX) on top of the computed hydraulics
145 gen = self._scenario_sim.run_advanced_quality_simulation_as_generator
146 self._sim_generator = gen(hyd_export, support_abort=True)
147 else:
148 gen = self._scenario_sim.run_hydraulic_simulation_as_generator
149 self._sim_generator = gen(support_abort=True,
150 reapply_uncertainties=self.__reapply_uncertainties_at_reset)
151
152 return self._next_sim_itr()
153
154 def _next_sim_itr(self) -> Union[tuple[ScadaData, bool], ScadaData]:
155 try:
156 next(self._sim_generator)
157 scada_data, terminated = self._sim_generator.send(False)
158
159 if self._scenario_sim.f_msx_in is not None:
160 cur_time = int(scada_data.sensor_readings_time[0])
161 cur_hyd_scada_data = self._hydraulic_scada_data.\
162 extract_time_window(cur_time, cur_time)
163 scada_data.join(cur_hyd_scada_data)
164
165 if self.autoreset is True:
166 return scada_data
167 else:
168 return scada_data, terminated
169 except StopIteration:
170 if self.__autoreset is True:
171 return self.reset()
172 else:
173 return None, True
174
[docs]
175 def set_pump_status(self, pump_id: str, status: int) -> None:
176 """
177 Sets the status of a pump.
178
179 Parameters
180 ----------
181 pump_id : `str`
182 ID of the pump for which the status is set.
183 status : `int`
184 New status of the pump -- either active (i.e. open) or inactive (i.e. closed).
185
186 Must be one of the following constants defined in
187 :class:`~epyt_flow.simulation.events.actuator_events.ActuatorConstants`:
188
189 - EN_CLOSED = 0
190 - EN_OPEN = 1
191 """
192 if self._scenario_sim.f_msx_in is not None:
193 raise RuntimeError("Can not execute actions affecting the hydraulics "+
194 "when running EPANET-MSX")
195
196 pump_link_idx = self._scenario_sim.epanet_api.get_link_idx(pump_id)
197 pattern_idx = self._scenario_sim.epanet_api.getlinkvalue(pump_link_idx,
198 EpanetConstants.EN_LINKPATTERN)
199 if pattern_idx != 0:
200 warnings.warn(f"Can not set pump state of pump {pump_id} because a pump pattern exists")
201 else:
202 self._scenario_sim.epanet_api.setlinkvalue(pump_link_idx, EpanetConstants.EN_STATUS,
203 status)
204
[docs]
205 def set_pump_speed(self, pump_id: str, speed: float) -> None:
206 """
207 Sets the speed of a pump.
208
209 Parameters
210 ----------
211 pump_id : `str`
212 ID of the pump for which the pump speed is set.
213 speed : `float`
214 New pump speed.
215 """
216 if self._scenario_sim.f_msx_in is not None:
217 raise RuntimeError("Can not execute actions affecting the hydraulics "+
218 "when running EPANET-MSX")
219
220 pump_idx = self._scenario_sim.epanet_api.get_link_idx(pump_id)
221 pattern_idx = self._scenario_sim.epanet_api.getlinkvalue(pump_idx,
222 EpanetConstants.EN_LINKPATTERN)
223 if pattern_idx != 0:
224 warnings.warn(f"Pump setting of pump {pump_id} will be multiplied with existing pump pattern")
225
226 self._scenario_sim.epanet_api.setlinkvalue(pump_idx, EpanetConstants.EN_SETTING, speed)
227
[docs]
228 def set_valve_status(self, valve_id: str, status: int) -> None:
229 """
230 Sets the status of a valve.
231
232 Parameters
233 ----------
234 valve_id : `str`
235 ID of the valve for which the status is set.
236 status : `int`
237 New status of the valve -- either open or closed.
238
239 Must be one of the following constants defined in
240 :class:`~epyt_flow.simulation.events.actuator_events.ActuatorConstants`:
241
242 - EN_CLOSED = 0
243 - EN_OPEN = 1
244 """
245 if self._scenario_sim.f_msx_in is not None:
246 raise RuntimeError("Can not execute actions affecting the hydraulics "+
247 "when running EPANET-MSX")
248
249 valve_link_idx = self._scenario_sim.epanet_api.get_link_idx(valve_id)
250 self._scenario_sim.epanet_api.setlinkvalue(valve_link_idx, EpanetConstants.EN_STATUS,
251 status)
252
[docs]
253 def set_node_quality_source_value(self, node_id: str, pattern_id: str,
254 qual_value: float) -> None:
255 """
256 Sets the quality source at a particular node to a specific value -- e.g.
257 setting the chlorine concentration injection to a specified value.
258
259 Parameters
260 ----------
261 node_id : `str`
262 ID of the node.
263 pattern_id : `str`
264 ID of the quality pattern at the specific node.
265 qual_value : `float`
266 New quality source value.
267 """
268 if self._scenario_sim.f_msx_in is not None:
269 raise RuntimeError("Can not execute actions affecting the hydraulics "+
270 "when running EPANET-MSX")
271
272 node_idx = self._scenario_sim.epanet_api.get_node_idx(node_id)
273 pattern_idx = self._scenario_sim.epanet_api.getpatternindex(pattern_id)
274 self._scenario_sim.epanet_api.setnodevalue(node_idx, EpanetConstants.EN_SOURCEQUAL, 1)
275 self._scenario_sim.epanet_api.set_pattern(pattern_idx, [qual_value])
276
[docs]
277 def set_node_species_source_value(self, species_id: str, node_id: str, source_type: int,
278 pattern_id: str, source_strength: float) -> None:
279 """
280 Sets the species source at a particular node to a specific value -- i.e.
281 setting the species injection amount at a particular location.
282
283 Parameters
284 ----------
285 species_id : `str`
286 ID of the species.
287 node_id : `str`
288 ID of the node.
289 source_type : `int`
290 Type of the external species injection source -- must be one of
291 the following EPANET constants:
292
293 - EN_CONCEN = 0
294 - EN_MASS = 1
295 - EN_SETPOINT = 2
296 - EN_FLOWPACED = 3
297
298 Description:
299
300 - E_CONCEN Sets the concentration of external inflow entering a node
301 - EN_MASS Injects a given mass/minute into a node
302 - EN_SETPOINT Sets the concentration leaving a node to a given value
303 - EN_FLOWPACED Adds a given value to the concentration leaving a node
304 pattern_id : `str`
305 ID of the source pattern.
306 source_strength : `float`
307 Amount of the injected species (source strength) --
308 i.e. interpreation of this number depends on `source_type`
309 """
310 if self._scenario_sim.f_msx_in is None:
311 raise RuntimeError("You are not running EPANET-MSX")
312
313 pattern_idx = self._scenario_sim.epanet_api.MSXgetindex(EpanetConstants.MSX_PATTERN,
314 pattern_id)
315 self._scenario_sim.epanet_api.MSXsetpattern(pattern_idx, [1], 1)
316
317 node_idx = self._scenario_sim.epanet_api.get_node_idx(node_id)
318 species_idx = self._scenario_sim.epanet_api.get_msx_species_idx(species_id)
319 self._scenario_sim.epanet_api.MSXsetsource(node_idx, species_idx, source_type,
320 source_strength, pattern_idx)
321
[docs]
322 @abstractmethod
323 def step(self, *actions) -> Union[tuple[ScadaData, float, bool], tuple[ScadaData, float]]:
324 """
325 Performs the next step by applying an action and observing
326 the consequences (SCADA data, reward, terminated).
327
328 Note that `terminated` is only returned if `autoreset=False` otherwise
329 only the current SCADA data and reward are returned.
330
331 Returns
332 -------
333 `(` :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` `, float, bool)` or `(` :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` `, float)`
334 Triple or tuple of observations (:class:`~epyt_flow.simulation.scada.scada_data.ScadaData`),
335 reward (`float`), and terminated (`bool`).
336 """
337 raise NotImplementedError()