Source code for epyt_flow.simulation.events.leakages

  1"""
  2Module provides classes for implementing leakages.
  3"""
  4from copy import deepcopy
  5import math
  6import numpy as np
  7from epanet_plus import EPyT, EpanetConstants
  8
  9from .system_event import SystemEvent
 10from ...serialization import serializable, JsonSerializable, \
 11    LEAKAGE_ID, ABRUPT_LEAKAGE_ID, INCIPIENT_LEAKAGE_ID
 12
 13
[docs] 14@serializable(LEAKAGE_ID, ".epytflow_leakage") 15class Leakage(SystemEvent, JsonSerializable): 16 """ 17 Base class for a leakage. 18 19 Parameters 20 ---------- 21 link_id : `str` 22 ID of the link at which the leak is placed. 23 Note that if the leak is placed at a node, then 'link_id' must be None and the 24 ID of the node must be set in 'node_id' 25 diameter : `float`, optional 26 Diameter of this leak in either *foot* or *meter* (depending on the used flow units). 27 28 Alternatively, 'area' can be used for specifying the size of this leakage -- 29 in this case, 'diameter' must be set to 'None'. 30 31 The default is None. 32 area : `float`, optional 33 Area of this leak in either *foot^2* or *meter^2* (depending on the used flow units). 34 35 Alternatively, 'diameter' can be used for specifying the size of this leakage -- 36 in this case, 'area' must be set to 'None'. 37 38 The default is None. 39 profile : `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_ 40 Pattern of this leak. 41 node_id : `str`, optional 42 ID of the node at which the leak is placed. 43 This parameter must only be set if the leak is placed at a node instead of a link. 44 In this case, 'link_id' must be None. 45 46 The default is None. 47 """ 48 def __init__(self, link_id: str, profile: np.ndarray, diameter: float = None, 49 area: float = None, node_id: str = None, **kwds): 50 if link_id is not None and node_id is not None: 51 raise ValueError("Leak can not be placed at a link and node at the same time") 52 if link_id is None and node_id is None: 53 raise ValueError("Leak must be placed at either a link or a node -- " + 54 "expecting either 'link_id' or 'node_id' but both are None") 55 if link_id is not None: 56 if not isinstance(link_id, str): 57 raise TypeError("'link_id' must be an instance of 'str' " + 58 f"but not of '{type(link_id)}'") 59 if area is None and diameter is None: 60 raise ValueError("Either 'diameter' or 'area' must be given") 61 if area is not None and diameter is not None: 62 raise ValueError("Either 'diameter' or 'area' must be given, " + 63 "but not both at the same time") 64 if diameter is not None: 65 if not isinstance(diameter, float): 66 raise TypeError("'diameter must be an instance of 'float' but " + 67 f"not of '{type(diameter)}'") 68 if diameter <= 0: 69 raise ValueError("'diameter' must be greater than zero") 70 if area is not None: 71 if not isinstance(area, float): 72 raise TypeError("'area must be an instance of 'float' but " + 73 f"not of '{type(area)}'") 74 if area <= 0: 75 raise ValueError("'area' must be greater than zero") 76 if profile is not None: 77 if not isinstance(profile, np.ndarray): 78 raise TypeError("'profile' must be an instance of 'numpy.ndarray' but " + 79 f"not of '{type(profile)}'") 80 if len(profile.shape) > 1: 81 raise ValueError("'profile' must be a one-dimensional array " + 82 f"but not of shape '{profile.shape}'") 83 if node_id is not None: 84 if not isinstance(node_id, str): 85 raise TypeError("'node_id' must be an instance of 'str' " + 86 f"but not of '{type(node_id)}'") 87 88 self._link_id = link_id 89 self._node_id = node_id 90 self._diameter = diameter 91 self._area = area 92 self._profile = profile 93 94 self._leaky_node_idx = None 95 self._leak_emitter_coef = None 96 self._time_pattern_idx = 0 97 98 super().__init__(**kwds) 99 100 @property 101 def link_id(self) -> str: 102 """ 103 Gets the ID of the link at which the leak is placed. 104 105 Returns 106 ------- 107 `str` 108 ID of the link at which the leak is placed. 109 """ 110 return self._link_id 111 112 @property 113 def node_id(self) -> str: 114 """ 115 Gets the ID of the node at which the leak is placed. 116 117 Returns 118 ------- 119 `str` 120 ID of the node at which the leak is placed. 121 """ 122 return self._node_id 123 124 @property 125 def diameter(self) -> float: 126 """ 127 Gets the diameter of the leak in either *foot* or *meter* 128 (depending on the sued flow units). 129 130 Returns 131 ------- 132 `float` 133 Diameter (*foot* or *meter*) of the leak. 134 """ 135 return self._diameter 136 137 @property 138 def area(self) -> float: 139 """ 140 Gets the area of the leak in either *foot^2* or *meter^2* 141 (depending on the sued flow units). 142 143 Returns 144 ------- 145 `float` 146 Area of the leak. 147 """ 148 return self._area if self._area is not None else self.compute_leak_area(self._diameter) 149 150 @property 151 def profile(self) -> np.ndarray: 152 """ 153 Gets the pattern of the leak. 154 155 Returns 156 ------- 157 `numpy.ndarray` 158 Pattern of the leak. 159 """ 160 return deepcopy(self._profile) 161 162 @profile.setter 163 def profile(self, pattern: np.ndarray): 164 if not isinstance(pattern, np.ndarray): 165 raise TypeError("'profile' must be an instance of 'numpy.ndarray' but " + 166 f"not of '{type(pattern)}'") 167 if len(pattern.shape) > 1: 168 raise ValueError("'profile' must be a one-dimensional array " + 169 f"but not of shape '{pattern.shape}'") 170 171 self._profile = pattern 172
[docs] 173 def get_attributes(self) -> dict: 174 return super().get_attributes() | {"link_id": self._link_id, "diameter": self._diameter, 175 "area": self._area, "profile": self._profile, 176 "node_id": self._node_id 177 if self._link_id is None else None}
178 179 def __eq__(self, other) -> bool: 180 if not isinstance(other, Leakage): 181 raise TypeError(f"Can not compare 'Leakage' instance with '{type(other)}' instance") 182 183 return super().__eq__(other) and self._link_id == other.link_id \ 184 and self._diameter == other.diameter and np.all(self._profile == other.profile) \ 185 and self._node_id == other.node_id and self.area == other.area 186 187 def __str__(self) -> str: 188 return f"{super().__str__()} link_id: {self._link_id} diameter: {self._diameter} " +\ 189 f"area: {self._area} profile: {self._profile} node_id: {self._node_id}" 190
[docs] 191 def compute_leak_area(self, diameter: float) -> float: 192 """ 193 Computes the leak area given the diameter. 194 195 leak_area = pi * (diameter * .5)^2 196 197 Parameters 198 ---------- 199 diameter : `float` 200 Diameter (*foot* or *meter*) of the leak. 201 202 Returns 203 ------- 204 `float` 205 Leak area in *foot^2* or *meter^2*. 206 """ 207 return np.pi * (diameter / 2) ** 2
208
[docs] 209 def compute_leak_emitter_coefficient(self, area: float, discharge_coef: float = .75) -> float: 210 """ 211 Computes the leak emitter coefficient. 212 213 emitter_coef = discharge_coef * area * sqrt(2*g) 214 where g is the gravitational constant, and discharge_coef = .75 215 216 leak_demand = emitter_coef * pressure^alpha where alpha = .5 217 218 Parameters 219 ---------- 220 area : `float` 221 Leak area (foot^2 or meter^2) as computed in 222 :func:`~epyt_flow.simulation.events.leakages.Leakage.compute_leak_area`. 223 discharge_coef : `float`, optional 224 Discharge coefficient. 225 226 The default is set to 0.75 227 228 Returns 229 ------- 230 `float` 231 Leak emitter coefficient. 232 """ 233 flow_unit = self._epanet_api.getflowunits() 234 if flow_unit == EpanetConstants.EN_CMH: 235 g = 127137600 # m/h^2 236 elif flow_unit == EpanetConstants.EN_CFS: 237 g = 32.17405 # feet/s^2 238 else: 239 raise ValueError("Leakages are only implemented for the following flow units:\n" + 240 " EN_CMH (cubic meter/hr)\n EN_CFS (foot/sec)") 241 242 return discharge_coef * area * np.sqrt(2. * g)
243 244 def _get_new_link_id(self) -> str: 245 return f"leak_pipe_{self._link_id}" 246 247 def _get_new_node_id(self) -> str: 248 return f"leak_node_{self._link_id}" 249
[docs] 250 def init(self, epanet_api: EPyT) -> None: 251 super().init(epanet_api) 252 253 # Split pipe if leak is placed at a link/pipe 254 if self._link_id is not None: 255 if self._link_id not in self._epanet_api.get_all_links_id(): 256 raise ValueError(f"Unknown link/pipe '{self._link_id}'") 257 258 new_link_id = self._get_new_link_id() 259 new_node_id = self._get_new_node_id() 260 261 all_nodes_id = self._epanet_api.get_all_nodes_id() 262 if new_node_id in all_nodes_id: 263 raise ValueError(f"There is already a leak at pipe {self.link_id}") 264 265 self._epanet_api.split_pipe(self.link_id, new_link_id, new_node_id) 266 self._leaky_node_idx = self._epanet_api.get_node_idx(new_node_id) 267 else: 268 if self._node_id not in self._epanet_api.get_all_nodes_id(): 269 raise ValueError(f"Unknown node '{self._node_id}'") 270 271 self._leaky_node_idx = self._epanet_api.get_node_idx(self._node_id) 272 273 self._epanet_api.setnodevalue(self._leaky_node_idx, EpanetConstants.EN_EMITTER, 0.) 274 275 # Compute leak emitter coefficient 276 self._leak_emitter_coef = self.compute_leak_emitter_coefficient( 277 self.compute_leak_area(self.area))
278
[docs] 279 def cleanup(self) -> None: 280 if self._link_id is not None: 281 pipe_idx = self._epanet_api.get_link_idx(self._link_id) 282 link_diameter = self._epanet_api.get_link_diameter(pipe_idx) 283 link_length = self._epanet_api.get_link_length(pipe_idx) 284 link_roughness_coeff = self._epanet_api.get_link_roughness(pipe_idx) 285 link_minor_loss_coeff = self._epanet_api.get_link_minorloss(pipe_idx) 286 link_initial_status = self._epanet_api.get_link_init_status(pipe_idx) 287 link_initial_setting = self._epanet_api.get_link_init_setting(pipe_idx) 288 link_bulk_reaction_coeff = self._epanet_api.get_link_bulk_raction_coeff(pipe_idx) 289 link_wall_reaction_coeff = self._epanet_api.get_link_wall_raction_coeff(pipe_idx) 290 291 node_a_idx = int(self._epanet_api.getlinknodes(pipe_idx)[0]) 292 node_b_idx = int(self._epanet_api.getlinknodes(self._epanet_api.get_link_idx(self._get_new_link_id()))[1]) 293 294 self._epanet_api.deletelink(self._epanet_api.get_link_idx(self._get_new_link_id()), 295 EpanetConstants.EN_UNCONDITIONAL) 296 self._epanet_api.deletelink(self._epanet_api.get_link_idx(self._link_id), 297 EpanetConstants.EN_UNCONDITIONAL) 298 self._epanet_api.deletenode(self._epanet_api.get_node_idx(self._get_new_node_id()), 299 EpanetConstants.EN_UNCONDITIONAL) 300 301 self._epanet_api.addlink(self._link_id, EpanetConstants.EN_PIPE, 302 self._epanet_api.get_node_id(node_a_idx), 303 self._epanet_api.get_node_id(node_b_idx)) 304 link_idx = self._epanet_api.get_link_idx(self._link_id) 305 self._epanet_api.setlinknodes(link_idx, node_a_idx, node_b_idx) 306 self._epanet_api.setlinktype(link_idx, EpanetConstants.EN_PIPE, 307 EpanetConstants.EN_UNCONDITIONAL) 308 self._epanet_api.setpipedata(link_idx, link_length, link_diameter, link_roughness_coeff, 309 link_minor_loss_coeff) 310 self._epanet_api.setlinkvalue(link_idx, EpanetConstants.EN_INITSTATUS, 311 link_initial_status) 312 self._epanet_api.setlinkvalue(link_idx, EpanetConstants.EN_INITSETTING, 313 link_initial_setting) 314 self._epanet_api.setlinkvalue(link_idx, EpanetConstants.EN_KBULK, 315 link_bulk_reaction_coeff) 316 self._epanet_api.setlinkvalue(link_idx, EpanetConstants.EN_KWALL, 317 link_wall_reaction_coeff)
318
[docs] 319 def reset(self) -> None: 320 self._time_pattern_idx = 0
321
[docs] 322 def exit(self, cur_time) -> None: 323 self._epanet_api.setnodevalue(self._leaky_node_idx, EpanetConstants.EN_EMITTER, 0.)
324
[docs] 325 def apply(self, cur_time: int) -> None: 326 self._epanet_api.setnodevalue(self._leaky_node_idx, EpanetConstants.EN_EMITTER, 327 self._leak_emitter_coef * 328 self._profile[self._time_pattern_idx]) 329 self._time_pattern_idx += 1
330 331
[docs] 332@serializable(ABRUPT_LEAKAGE_ID, ".epytflow_leakage_abrupt") 333class AbruptLeakage(Leakage): 334 """ 335 Class implementing an abrupt leakage event. 336 337 Parameters 338 ---------- 339 link_id : `str` 340 ID of the link at which the leak is placed. 341 diameter : `float`, optional 342 Diameter of the leak. 343 344 Alternatively, 'area' can be used to specify the size of this leak -- 345 in this case, 'diameter' must be set to None. 346 347 The default is None. 348 area : `float`, optional 349 Area of the leakd. 350 351 Alternatively, 'diameter' can be used to specify the size of this leak -- 352 in this case, 'area' must be set to None. 353 354 The default is None. 355 """ 356 def __init__(self, link_id: str, diameter: float = None, area: float = None, **kwds): 357 if "profile" not in kwds: 358 super().__init__(link_id=link_id, diameter=diameter, area=area, profile=None, **kwds) 359 else: 360 super().__init__(link_id=link_id, diameter=diameter, area=area, **kwds) 361
[docs] 362 def init(self, epanet_api: EPyT) -> None: 363 super().init(epanet_api) 364 365 # Set pattern 366 total_sim_duration = self._epanet_api.get_simulation_duration() 367 time_step = self._epanet_api.get_hydraulic_time_step() 368 369 if self.end_time is not None: 370 n_leaky_time_points = math.ceil((self.end_time - self.start_time) / time_step) 371 else: 372 n_leaky_time_points = math.ceil((total_sim_duration - self.start_time) / time_step) 373 374 self.profile = np.ones(n_leaky_time_points)
375 376
[docs] 377@serializable(INCIPIENT_LEAKAGE_ID, ".epytflow_leakage_incipient") 378class IncipientLeakage(Leakage): 379 """ 380 Class implementing an incipient leakage event. 381 382 Parameters 383 ---------- 384 link_id : `str` 385 ID of the link at which the leak is placed. 386 diameter : `float`, optional 387 Maximum diameter of the leak -- i.e. small leak diameter in the beginning, 388 growing over time until peak time is reached. 389 390 Alternatively, 'area' can be used to specify the size of this leak -- 391 in this case, 'diameter' must be set to None. 392 393 The default is None. 394 area : `float`, optional 395 Maximum area of the leak -- i.e. small leak area in the beginning, 396 growing over time until peak time is reached. 397 398 Alternatively, 'diameter' can be used to specify the size of this leak -- 399 in this case, 'area' must be set to None. 400 401 The default is None. 402 peak_time : `int` 403 Time (seconds since the simulation start) when this leak reaches 404 its larges size (leak diameter). 405 """ 406 def __init__(self, link_id: str, peak_time: int, diameter: float = None, 407 area: float = None, **kwds): 408 if peak_time < kwds["start_time"] or (kwds["end_time"] is not None and 409 peak_time > kwds["end_time"]): 410 raise ValueError("'peak_time' must be greater than 'start_time' and " + 411 "smaller than 'end_time'") 412 413 self.__peak_time = peak_time 414 415 if "profile" not in kwds: 416 super().__init__(link_id=link_id, diameter=diameter, area=area, profile=None, **kwds) 417 else: 418 super().__init__(link_id=link_id, diameter=diameter, area=area, **kwds) 419 420 @property 421 def peak_time(self) -> int: 422 """ 423 Gets the peak time (seconds since the simulation start) of the leak. 424 425 Returns 426 ------- 427 `int` 428 Peak time of the leak. 429 """ 430 return self.__peak_time 431
[docs] 432 def get_attributes(self) -> dict: 433 return super().get_attributes() | {"peak_time": self.peak_time}
434 435 def __eq__(self, other) -> bool: 436 return super().__eq__(other) and self.peak_time == other.peak_time 437 438 def __str__(self) -> str: 439 return f"{super().__str__()} peak_time: {self.peak_time}" 440
[docs] 441 def init(self, epanet_api: EPyT) -> None: 442 super().init(epanet_api) 443 444 # Set pattern 445 total_sim_duration = self._epanet_api.get_simulation_duration() 446 time_step = self._epanet_api.get_hydraulic_time_step() 447 448 if self.end_time is not None: 449 n_leaky_time_points = math.ceil((self.end_time - self.start_time) / time_step) 450 else: 451 n_leaky_time_points = math.ceil((total_sim_duration - self.start_time) / time_step) 452 453 profile = np.ones(n_leaky_time_points) 454 455 coeff = int((self.peak_time - self.start_time) / time_step) 456 for t in range(coeff): 457 profile[t] = (1. / coeff) + ((1. / coeff) * t) # Linear interpolation! 458 459 self.profile = profile