Source code for epyt_flow.gym.scenario_control_env

  1"""
  2Module provides a base class for control environments.
  3"""
  4import os
  5import uuid
  6from abc import abstractmethod, ABC
  7from typing import Union
  8import warnings
  9from epanet_plus import EpanetConstants
 10
 11from ..simulation import ScenarioSimulator, ScenarioConfig, ScadaData
 12from ..utils import get_temp_folder
 13
 14
[docs] 15class ScenarioControlEnv(ABC): 16 """ 17 Base class for a control environment challenge. 18 19 Parameters 20 ---------- 21 scenario_config : :class:`~epyt_flow.simulation.scenario_config.ScenarioConfig` 22 Scenario configuration. 23 autoreset : `bool`, optional 24 If True, environment is automatically reset if terminated. 25 26 The default is False. 27 reapply_uncertainties_at_reset : `bool`, optional 28 If True, the uncertainties are re-applied to the original properties at each reset. 29 30 The default is False. 31 32 Attributes 33 ---------- 34 _scenario_sim : :class:`~epyt_flow.simulation.scenario_simulator.ScenarioSimulator`, protected 35 Scenario simulator of the control scenario. 36 _scenario_config : :class:`~epyt_flow.simulation.scenario_config.ScenarioConfig` 37 Scenario configuration. 38 _sim_generator : Generator[Union[:class:`~epyt_flow.simulation.scada.scada_data.ScadaData`, dict], bool, None], protected 39 Generator for running the step-wise simulation. 40 _hydraulic_scada_data : :class:`~epyt_flow.simulation.scada.scada_data.ScadaData`, protected 41 SCADA data from the hydraulic simulation -- only used if EPANET-MSX is used in the control scenario. 42 """ 43 def __init__(self, scenario_config: ScenarioConfig, autoreset: bool = False, 44 reapply_uncertainties_at_reset: bool = False, 45 **kwds): 46 if not isinstance(scenario_config, ScenarioConfig): 47 raise TypeError("'scenario_config' must be an instance of " + 48 "'epyt_flow.simulation.ScenarioConfig' " + 49 "but not of '{type(scenario_config)}'") 50 if not isinstance(autoreset, bool): 51 raise TypeError("'autoreset' must be an instance of 'bool' " + 52 f"but not of '{type(autoreset)}'") 53 if not isinstance(reapply_uncertainties_at_reset, bool): 54 raise TypeError("") 55 56 self._scenario_config = scenario_config 57 self._scenario_sim = None 58 self._sim_generator = None 59 self.__autoreset = autoreset 60 self._hydraulic_scada_data = None 61 self.__reapply_uncertainties_at_reset = reapply_uncertainties_at_reset 62 63 super().__init__(**kwds) 64 65 @property 66 def autoreset(self) -> bool: 67 """ 68 True, if environment automatically resets after it terminated. 69 70 Returns 71 ------- 72 `bool` 73 True, if environment automatically resets after it terminated. 74 """ 75 return self.__autoreset 76 77 @property 78 def reapply_uncertainties_at_reset(self) -> bool: 79 """ 80 True, if the uncertainties are re-applied to the original properties at each reset. 81 82 Returns 83 ------- 84 `bool` 85 True, if the uncertainties are re-applied to the original properties at each reset. 86 """ 87 return self.__reapply_uncertainties_at_reset 88 89 def __enter__(self): 90 return self 91 92 def __exit__(self, *args): 93 self.close() 94
[docs] 95 def close(self) -> None: 96 """ 97 Frees all resources. 98 """ 99 try: 100 if self._sim_generator is not None: 101 next(self._sim_generator) 102 self._sim_generator.send(True) 103 self._sim_generator = None 104 except StopIteration: 105 pass 106 107 if self._scenario_sim is not None: 108 self._scenario_sim.close() 109 self._scenario_sim = None
110
[docs] 111 def contains_events(self) -> bool: 112 """ 113 Check if the scenario contains any events. 114 115 Returns 116 ------- 117 `bool` 118 True is the scenario contains any events, False otherwise. 119 """ 120 return len(self._scenario_config.system_events) != 0 or \ 121 len(self._scenario_config.sensor_reading_events) != 0
122
[docs] 123 def reset(self) -> Union[tuple[ScadaData, bool], ScadaData]: 124 """ 125 Resets the environment (i.e. simulation). 126 127 Returns 128 ------- 129 :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` 130 Current SCADA data (i.e. sensor readings). 131 """ 132 self.close() 133 134 self._scenario_sim = ScenarioSimulator( 135 scenario_config=self._scenario_config) 136 137 if self._scenario_sim.f_msx_in is not None: 138 # Run hydraulic simulation first 139 hyd_export = os.path.join(get_temp_folder(), f"epytflow_env_MSX_{uuid.uuid4()}.hyd") 140 sim = self._scenario_sim.run_hydraulic_simulation 141 self._hydraulic_scada_data = sim(hyd_export=hyd_export, 142 reapply_uncertainties=self.__reapply_uncertainties_at_reset) 143 144 # Run advanced quality analysis (EPANET-MSX) on top of the computed hydraulics 145 gen = self._scenario_sim.run_advanced_quality_simulation_as_generator 146 self._sim_generator = gen(hyd_export, support_abort=True) 147 else: 148 gen = self._scenario_sim.run_hydraulic_simulation_as_generator 149 self._sim_generator = gen(support_abort=True, 150 reapply_uncertainties=self.__reapply_uncertainties_at_reset) 151 152 return self._next_sim_itr()
153 154 def _next_sim_itr(self) -> Union[tuple[ScadaData, bool], ScadaData]: 155 try: 156 next(self._sim_generator) 157 scada_data, terminated = self._sim_generator.send(False) 158 159 if self._scenario_sim.f_msx_in is not None: 160 cur_time = int(scada_data.sensor_readings_time[0]) 161 cur_hyd_scada_data = self._hydraulic_scada_data.\ 162 extract_time_window(cur_time, cur_time) 163 scada_data.join(cur_hyd_scada_data) 164 165 if self.autoreset is True: 166 return scada_data 167 else: 168 return scada_data, terminated 169 except StopIteration: 170 if self.__autoreset is True: 171 return self.reset() 172 else: 173 return None, True 174
[docs] 175 def set_pump_status(self, pump_id: str, status: int) -> None: 176 """ 177 Sets the status of a pump. 178 179 Parameters 180 ---------- 181 pump_id : `str` 182 ID of the pump for which the status is set. 183 status : `int` 184 New status of the pump -- either active (i.e. open) or inactive (i.e. closed). 185 186 Must be one of the following constants defined in 187 :class:`~epyt_flow.simulation.events.actuator_events.ActuatorConstants`: 188 189 - EN_CLOSED = 0 190 - EN_OPEN = 1 191 """ 192 if self._scenario_sim.f_msx_in is not None: 193 raise RuntimeError("Can not execute actions affecting the hydraulics "+ 194 "when running EPANET-MSX") 195 196 pump_link_idx = self._scenario_sim.epanet_api.get_link_idx(pump_id) 197 pattern_idx = self._scenario_sim.epanet_api.getlinkvalue(pump_link_idx, 198 EpanetConstants.EN_LINKPATTERN) 199 if pattern_idx != 0: 200 warnings.warn(f"Can not set pump state of pump {pump_id} because a pump pattern exists") 201 else: 202 self._scenario_sim.epanet_api.setlinkvalue(pump_link_idx, EpanetConstants.EN_STATUS, 203 status)
204
[docs] 205 def set_pump_speed(self, pump_id: str, speed: float) -> None: 206 """ 207 Sets the speed of a pump. 208 209 Parameters 210 ---------- 211 pump_id : `str` 212 ID of the pump for which the pump speed is set. 213 speed : `float` 214 New pump speed. 215 """ 216 if self._scenario_sim.f_msx_in is not None: 217 raise RuntimeError("Can not execute actions affecting the hydraulics "+ 218 "when running EPANET-MSX") 219 220 pump_idx = self._scenario_sim.epanet_api.get_link_idx(pump_id) 221 pattern_idx = self._scenario_sim.epanet_api.getlinkvalue(pump_idx, 222 EpanetConstants.EN_LINKPATTERN) 223 if pattern_idx != 0: 224 warnings.warn(f"Pump setting of pump {pump_id} will be multiplied with existing pump pattern") 225 226 self._scenario_sim.epanet_api.setlinkvalue(pump_idx, EpanetConstants.EN_SETTING, speed)
227
[docs] 228 def set_valve_status(self, valve_id: str, status: int) -> None: 229 """ 230 Sets the status of a valve. 231 232 Parameters 233 ---------- 234 valve_id : `str` 235 ID of the valve for which the status is set. 236 status : `int` 237 New status of the valve -- either open or closed. 238 239 Must be one of the following constants defined in 240 :class:`~epyt_flow.simulation.events.actuator_events.ActuatorConstants`: 241 242 - EN_CLOSED = 0 243 - EN_OPEN = 1 244 """ 245 if self._scenario_sim.f_msx_in is not None: 246 raise RuntimeError("Can not execute actions affecting the hydraulics "+ 247 "when running EPANET-MSX") 248 249 valve_link_idx = self._scenario_sim.epanet_api.get_link_idx(valve_id) 250 self._scenario_sim.epanet_api.setlinkvalue(valve_link_idx, EpanetConstants.EN_STATUS, 251 status)
252
[docs] 253 def set_node_quality_source_value(self, node_id: str, pattern_id: str, 254 qual_value: float) -> None: 255 """ 256 Sets the quality source at a particular node to a specific value -- e.g. 257 setting the chlorine concentration injection to a specified value. 258 259 Parameters 260 ---------- 261 node_id : `str` 262 ID of the node. 263 pattern_id : `str` 264 ID of the quality pattern at the specific node. 265 qual_value : `float` 266 New quality source value. 267 """ 268 if self._scenario_sim.f_msx_in is not None: 269 raise RuntimeError("Can not execute actions affecting the hydraulics "+ 270 "when running EPANET-MSX") 271 272 node_idx = self._scenario_sim.epanet_api.get_node_idx(node_id) 273 pattern_idx = self._scenario_sim.epanet_api.getpatternindex(pattern_id) 274 self._scenario_sim.epanet_api.setnodevalue(node_idx, EpanetConstants.EN_SOURCEQUAL, 1) 275 self._scenario_sim.epanet_api.set_pattern(pattern_idx, [qual_value])
276
[docs] 277 def set_node_species_source_value(self, species_id: str, node_id: str, source_type: int, 278 pattern_id: str, source_strength: float) -> None: 279 """ 280 Sets the species source at a particular node to a specific value -- i.e. 281 setting the species injection amount at a particular location. 282 283 Parameters 284 ---------- 285 species_id : `str` 286 ID of the species. 287 node_id : `str` 288 ID of the node. 289 source_type : `int` 290 Type of the external species injection source -- must be one of 291 the following EPANET constants: 292 293 - EN_CONCEN = 0 294 - EN_MASS = 1 295 - EN_SETPOINT = 2 296 - EN_FLOWPACED = 3 297 298 Description: 299 300 - E_CONCEN Sets the concentration of external inflow entering a node 301 - EN_MASS Injects a given mass/minute into a node 302 - EN_SETPOINT Sets the concentration leaving a node to a given value 303 - EN_FLOWPACED Adds a given value to the concentration leaving a node 304 pattern_id : `str` 305 ID of the source pattern. 306 source_strength : `float` 307 Amount of the injected species (source strength) -- 308 i.e. interpreation of this number depends on `source_type` 309 """ 310 if self._scenario_sim.f_msx_in is None: 311 raise RuntimeError("You are not running EPANET-MSX") 312 313 pattern_idx = self._scenario_sim.epanet_api.MSXgetindex(EpanetConstants.MSX_PATTERN, 314 pattern_id) 315 self._scenario_sim.epanet_api.MSXsetpattern(pattern_idx, [1], 1) 316 317 node_idx = self._scenario_sim.epanet_api.get_node_idx(node_id) 318 species_idx = self._scenario_sim.epanet_api.get_msx_species_idx(species_id) 319 self._scenario_sim.epanet_api.MSXsetsource(node_idx, species_idx, source_type, 320 source_strength, pattern_idx)
321
[docs] 322 @abstractmethod 323 def step(self, *actions) -> Union[tuple[ScadaData, float, bool], tuple[ScadaData, float]]: 324 """ 325 Performs the next step by applying an action and observing 326 the consequences (SCADA data, reward, terminated). 327 328 Note that `terminated` is only returned if `autoreset=False` otherwise 329 only the current SCADA data and reward are returned. 330 331 Returns 332 ------- 333 `(` :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` `, float, bool)` or `(` :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` `, float)` 334 Triple or tuple of observations (:class:`~epyt_flow.simulation.scada.scada_data.ScadaData`), 335 reward (`float`), and terminated (`bool`). 336 """ 337 raise NotImplementedError()