1"""
2Module provides classes for implementing leakages.
3"""
4from copy import deepcopy
5import math
6import numpy as np
7from epanet_plus import EPyT, EpanetConstants
8
9from .system_event import SystemEvent
10from ...serialization import serializable, JsonSerializable, \
11 LEAKAGE_ID, ABRUPT_LEAKAGE_ID, INCIPIENT_LEAKAGE_ID
12
13
[docs]
14@serializable(LEAKAGE_ID, ".epytflow_leakage")
15class Leakage(SystemEvent, JsonSerializable):
16 """
17 Base class for a leakage.
18
19 Parameters
20 ----------
21 link_id : `str`
22 ID of the link at which the leak is placed.
23 Note that if the leak is placed at a node, then 'link_id' must be None and the
24 ID of the node must be set in 'node_id'
25 diameter : `float`, optional
26 Diameter of this leak in either *foot* or *meter* (depending on the used flow units).
27
28 Alternatively, 'area' can be used for specifying the size of this leakage --
29 in this case, 'diameter' must be set to 'None'.
30
31 The default is None.
32 area : `float`, optional
33 Area of this leak in either *foot^2* or *meter^2* (depending on the used flow units).
34
35 Alternatively, 'diameter' can be used for specifying the size of this leakage --
36 in this case, 'area' must be set to 'None'.
37
38 The default is None.
39 profile : `numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_
40 Pattern of this leak.
41 node_id : `str`, optional
42 ID of the node at which the leak is placed.
43 This parameter must only be set if the leak is placed at a node instead of a link.
44 In this case, 'link_id' must be None.
45
46 The default is None.
47 """
48 def __init__(self, link_id: str, profile: np.ndarray, diameter: float = None,
49 area: float = None, node_id: str = None, **kwds):
50 if link_id is not None and node_id is not None:
51 raise ValueError("Leak can not be placed at a link and node at the same time")
52 if link_id is None and node_id is None:
53 raise ValueError("Leak must be placed at either a link or a node -- " +
54 "expecting either 'link_id' or 'node_id' but both are None")
55 if link_id is not None:
56 if not isinstance(link_id, str):
57 raise TypeError("'link_id' must be an instance of 'str' " +
58 f"but not of '{type(link_id)}'")
59 if area is None and diameter is None:
60 raise ValueError("Either 'diameter' or 'area' must be given")
61 if area is not None and diameter is not None:
62 raise ValueError("Either 'diameter' or 'area' must be given, " +
63 "but not both at the same time")
64 if diameter is not None:
65 if not isinstance(diameter, float):
66 raise TypeError("'diameter must be an instance of 'float' but " +
67 f"not of '{type(diameter)}'")
68 if diameter <= 0:
69 raise ValueError("'diameter' must be greater than zero")
70 if area is not None:
71 if not isinstance(area, float):
72 raise TypeError("'area must be an instance of 'float' but " +
73 f"not of '{type(area)}'")
74 if area <= 0:
75 raise ValueError("'area' must be greater than zero")
76 if profile is not None:
77 if not isinstance(profile, np.ndarray):
78 raise TypeError("'profile' must be an instance of 'numpy.ndarray' but " +
79 f"not of '{type(profile)}'")
80 if len(profile.shape) > 1:
81 raise ValueError("'profile' must be a one-dimensional array " +
82 f"but not of shape '{profile.shape}'")
83 if node_id is not None:
84 if not isinstance(node_id, str):
85 raise TypeError("'node_id' must be an instance of 'str' " +
86 f"but not of '{type(node_id)}'")
87
88 self._link_id = link_id
89 self._node_id = node_id
90 self._diameter = diameter
91 self._area = area
92 self._profile = profile
93
94 self._leaky_node_idx = None
95 self._leak_emitter_coef = None
96 self._time_pattern_idx = 0
97
98 super().__init__(**kwds)
99
100 @property
101 def link_id(self) -> str:
102 """
103 Gets the ID of the link at which the leak is placed.
104
105 Returns
106 -------
107 `str`
108 ID of the link at which the leak is placed.
109 """
110 return self._link_id
111
112 @property
113 def node_id(self) -> str:
114 """
115 Gets the ID of the node at which the leak is placed.
116
117 Returns
118 -------
119 `str`
120 ID of the node at which the leak is placed.
121 """
122 return self._node_id
123
124 @property
125 def diameter(self) -> float:
126 """
127 Gets the diameter of the leak in either *foot* or *meter*
128 (depending on the sued flow units).
129
130 Returns
131 -------
132 `float`
133 Diameter (*foot* or *meter*) of the leak.
134 """
135 return self._diameter
136
137 @property
138 def area(self) -> float:
139 """
140 Gets the area of the leak in either *foot^2* or *meter^2*
141 (depending on the sued flow units).
142
143 Returns
144 -------
145 `float`
146 Area of the leak.
147 """
148 return self._area if self._area is not None else self.compute_leak_area(self._diameter)
149
150 @property
151 def profile(self) -> np.ndarray:
152 """
153 Gets the pattern of the leak.
154
155 Returns
156 -------
157 `numpy.ndarray`
158 Pattern of the leak.
159 """
160 return deepcopy(self._profile)
161
162 @profile.setter
163 def profile(self, pattern: np.ndarray):
164 if not isinstance(pattern, np.ndarray):
165 raise TypeError("'profile' must be an instance of 'numpy.ndarray' but " +
166 f"not of '{type(pattern)}'")
167 if len(pattern.shape) > 1:
168 raise ValueError("'profile' must be a one-dimensional array " +
169 f"but not of shape '{pattern.shape}'")
170
171 self._profile = pattern
172
[docs]
173 def get_attributes(self) -> dict:
174 return super().get_attributes() | {"link_id": self._link_id, "diameter": self._diameter,
175 "area": self._area, "profile": self._profile,
176 "node_id": self._node_id
177 if self._link_id is None else None}
178
179 def __eq__(self, other) -> bool:
180 if not isinstance(other, Leakage):
181 raise TypeError(f"Can not compare 'Leakage' instance with '{type(other)}' instance")
182
183 return super().__eq__(other) and self._link_id == other.link_id \
184 and self._diameter == other.diameter and np.all(self._profile == other.profile) \
185 and self._node_id == other.node_id and self.area == other.area
186
187 def __str__(self) -> str:
188 return f"{super().__str__()} link_id: {self._link_id} diameter: {self._diameter} " +\
189 f"area: {self._area} profile: {self._profile} node_id: {self._node_id}"
190
[docs]
191 def compute_leak_area(self, diameter: float) -> float:
192 """
193 Computes the leak area given the diameter.
194
195 leak_area = pi * (diameter * .5)^2
196
197 Parameters
198 ----------
199 diameter : `float`
200 Diameter (*foot* or *meter*) of the leak.
201
202 Returns
203 -------
204 `float`
205 Leak area in *foot^2* or *meter^2*.
206 """
207 return np.pi * (diameter / 2) ** 2
208
[docs]
209 def compute_leak_emitter_coefficient(self, area: float, discharge_coef: float = .75) -> float:
210 """
211 Computes the leak emitter coefficient.
212
213 emitter_coef = discharge_coef * area * sqrt(2*g)
214 where g is the gravitational constant, and discharge_coef = .75
215
216 leak_demand = emitter_coef * pressure^alpha where alpha = .5
217
218 Parameters
219 ----------
220 area : `float`
221 Leak area (foot^2 or meter^2) as computed in
222 :func:`~epyt_flow.simulation.events.leakages.Leakage.compute_leak_area`.
223 discharge_coef : `float`, optional
224 Discharge coefficient.
225
226 The default is set to 0.75
227
228 Returns
229 -------
230 `float`
231 Leak emitter coefficient.
232 """
233 flow_unit = self._epanet_api.getflowunits()
234 if flow_unit == EpanetConstants.EN_CMH:
235 g = 127137600 # m/h^2
236 elif flow_unit == EpanetConstants.EN_CFS:
237 g = 32.17405 # feet/s^2
238 else:
239 raise ValueError("Leakages are only implemented for the following flow units:\n" +
240 " EN_CMH (cubic meter/hr)\n EN_CFS (foot/sec)")
241
242 return discharge_coef * area * np.sqrt(2. * g)
243
244 def _get_new_link_id(self) -> str:
245 return f"leak_pipe_{self._link_id}"
246
247 def _get_new_node_id(self) -> str:
248 return f"leak_node_{self._link_id}"
249
[docs]
250 def init(self, epanet_api: EPyT) -> None:
251 super().init(epanet_api)
252
253 # Split pipe if leak is placed at a link/pipe
254 if self._link_id is not None:
255 if self._link_id not in self._epanet_api.get_all_links_id():
256 raise ValueError(f"Unknown link/pipe '{self._link_id}'")
257
258 new_link_id = self._get_new_link_id()
259 new_node_id = self._get_new_node_id()
260
261 all_nodes_id = self._epanet_api.get_all_nodes_id()
262 if new_node_id in all_nodes_id:
263 raise ValueError(f"There is already a leak at pipe {self.link_id}")
264
265 self._epanet_api.split_pipe(self.link_id, new_link_id, new_node_id)
266 self._leaky_node_idx = self._epanet_api.get_node_idx(new_node_id)
267 else:
268 if self._node_id not in self._epanet_api.get_all_nodes_id():
269 raise ValueError(f"Unknown node '{self._node_id}'")
270
271 self._leaky_node_idx = self._epanet_api.get_node_idx(self._node_id)
272
273 self._epanet_api.setnodevalue(self._leaky_node_idx, EpanetConstants.EN_EMITTER, 0.)
274
275 # Compute leak emitter coefficient
276 self._leak_emitter_coef = self.compute_leak_emitter_coefficient(
277 self.compute_leak_area(self.area))
278
[docs]
279 def cleanup(self) -> None:
280 if self._link_id is not None:
281 pipe_idx = self._epanet_api.get_link_idx(self._link_id)
282 link_diameter = self._epanet_api.get_link_diameter(pipe_idx)
283 link_length = self._epanet_api.get_link_length(pipe_idx)
284 link_roughness_coeff = self._epanet_api.get_link_roughness(pipe_idx)
285 link_minor_loss_coeff = self._epanet_api.get_link_minorloss(pipe_idx)
286 link_initial_status = self._epanet_api.get_link_init_status(pipe_idx)
287 link_initial_setting = self._epanet_api.get_link_init_setting(pipe_idx)
288 link_bulk_reaction_coeff = self._epanet_api.get_link_bulk_raction_coeff(pipe_idx)
289 link_wall_reaction_coeff = self._epanet_api.get_link_wall_raction_coeff(pipe_idx)
290
291 node_a_idx = int(self._epanet_api.getlinknodes(pipe_idx)[0])
292 node_b_idx = int(self._epanet_api.getlinknodes(self._epanet_api.get_link_idx(self._get_new_link_id()))[1])
293
294 self._epanet_api.deletelink(self._epanet_api.get_link_idx(self._get_new_link_id()),
295 EpanetConstants.EN_UNCONDITIONAL)
296 self._epanet_api.deletelink(self._epanet_api.get_link_idx(self._link_id),
297 EpanetConstants.EN_UNCONDITIONAL)
298 self._epanet_api.deletenode(self._epanet_api.get_node_idx(self._get_new_node_id()),
299 EpanetConstants.EN_UNCONDITIONAL)
300
301 self._epanet_api.addlink(self._link_id, EpanetConstants.EN_PIPE,
302 self._epanet_api.get_node_id(node_a_idx),
303 self._epanet_api.get_node_id(node_b_idx))
304 link_idx = self._epanet_api.get_link_idx(self._link_id)
305 self._epanet_api.setlinknodes(link_idx, node_a_idx, node_b_idx)
306 self._epanet_api.setlinktype(link_idx, EpanetConstants.EN_PIPE,
307 EpanetConstants.EN_UNCONDITIONAL)
308 self._epanet_api.setpipedata(link_idx, link_length, link_diameter, link_roughness_coeff,
309 link_minor_loss_coeff)
310 self._epanet_api.setlinkvalue(link_idx, EpanetConstants.EN_INITSTATUS,
311 link_initial_status)
312 self._epanet_api.setlinkvalue(link_idx, EpanetConstants.EN_INITSETTING,
313 link_initial_setting)
314 self._epanet_api.setlinkvalue(link_idx, EpanetConstants.EN_KBULK,
315 link_bulk_reaction_coeff)
316 self._epanet_api.setlinkvalue(link_idx, EpanetConstants.EN_KWALL,
317 link_wall_reaction_coeff)
318
[docs]
319 def reset(self) -> None:
320 self._time_pattern_idx = 0
321
[docs]
322 def exit(self, cur_time) -> None:
323 self._epanet_api.setnodevalue(self._leaky_node_idx, EpanetConstants.EN_EMITTER, 0.)
324
[docs]
325 def apply(self, cur_time: int) -> None:
326 self._epanet_api.setnodevalue(self._leaky_node_idx, EpanetConstants.EN_EMITTER,
327 self._leak_emitter_coef *
328 self._profile[self._time_pattern_idx])
329 self._time_pattern_idx += 1
330
331
[docs]
332@serializable(ABRUPT_LEAKAGE_ID, ".epytflow_leakage_abrupt")
333class AbruptLeakage(Leakage):
334 """
335 Class implementing an abrupt leakage event.
336
337 Parameters
338 ----------
339 link_id : `str`
340 ID of the link at which the leak is placed.
341 diameter : `float`, optional
342 Diameter of the leak.
343
344 Alternatively, 'area' can be used to specify the size of this leak --
345 in this case, 'diameter' must be set to None.
346
347 The default is None.
348 area : `float`, optional
349 Area of the leakd.
350
351 Alternatively, 'diameter' can be used to specify the size of this leak --
352 in this case, 'area' must be set to None.
353
354 The default is None.
355 """
356 def __init__(self, link_id: str, diameter: float = None, area: float = None, **kwds):
357 if "profile" not in kwds:
358 super().__init__(link_id=link_id, diameter=diameter, area=area, profile=None, **kwds)
359 else:
360 super().__init__(link_id=link_id, diameter=diameter, area=area, **kwds)
361
[docs]
362 def init(self, epanet_api: EPyT) -> None:
363 super().init(epanet_api)
364
365 # Set pattern
366 total_sim_duration = self._epanet_api.get_simulation_duration()
367 time_step = self._epanet_api.get_hydraulic_time_step()
368
369 if self.end_time is not None:
370 n_leaky_time_points = math.ceil((self.end_time - self.start_time) / time_step)
371 else:
372 n_leaky_time_points = math.ceil((total_sim_duration - self.start_time) / time_step)
373
374 self.profile = np.ones(n_leaky_time_points)
375
376
[docs]
377@serializable(INCIPIENT_LEAKAGE_ID, ".epytflow_leakage_incipient")
378class IncipientLeakage(Leakage):
379 """
380 Class implementing an incipient leakage event.
381
382 Parameters
383 ----------
384 link_id : `str`
385 ID of the link at which the leak is placed.
386 diameter : `float`, optional
387 Maximum diameter of the leak -- i.e. small leak diameter in the beginning,
388 growing over time until peak time is reached.
389
390 Alternatively, 'area' can be used to specify the size of this leak --
391 in this case, 'diameter' must be set to None.
392
393 The default is None.
394 area : `float`, optional
395 Maximum area of the leak -- i.e. small leak area in the beginning,
396 growing over time until peak time is reached.
397
398 Alternatively, 'diameter' can be used to specify the size of this leak --
399 in this case, 'area' must be set to None.
400
401 The default is None.
402 peak_time : `int`
403 Time (seconds since the simulation start) when this leak reaches
404 its larges size (leak diameter).
405 """
406 def __init__(self, link_id: str, peak_time: int, diameter: float = None,
407 area: float = None, **kwds):
408 if peak_time < kwds["start_time"] or (kwds["end_time"] is not None and
409 peak_time > kwds["end_time"]):
410 raise ValueError("'peak_time' must be greater than 'start_time' and " +
411 "smaller than 'end_time'")
412
413 self.__peak_time = peak_time
414
415 if "profile" not in kwds:
416 super().__init__(link_id=link_id, diameter=diameter, area=area, profile=None, **kwds)
417 else:
418 super().__init__(link_id=link_id, diameter=diameter, area=area, **kwds)
419
420 @property
421 def peak_time(self) -> int:
422 """
423 Gets the peak time (seconds since the simulation start) of the leak.
424
425 Returns
426 -------
427 `int`
428 Peak time of the leak.
429 """
430 return self.__peak_time
431
[docs]
432 def get_attributes(self) -> dict:
433 return super().get_attributes() | {"peak_time": self.peak_time}
434
435 def __eq__(self, other) -> bool:
436 return super().__eq__(other) and self.peak_time == other.peak_time
437
438 def __str__(self) -> str:
439 return f"{super().__str__()} peak_time: {self.peak_time}"
440
[docs]
441 def init(self, epanet_api: EPyT) -> None:
442 super().init(epanet_api)
443
444 # Set pattern
445 total_sim_duration = self._epanet_api.get_simulation_duration()
446 time_step = self._epanet_api.get_hydraulic_time_step()
447
448 if self.end_time is not None:
449 n_leaky_time_points = math.ceil((self.end_time - self.start_time) / time_step)
450 else:
451 n_leaky_time_points = math.ceil((total_sim_duration - self.start_time) / time_step)
452
453 profile = np.ones(n_leaky_time_points)
454
455 coeff = int((self.peak_time - self.start_time) / time_step)
456 for t in range(coeff):
457 profile[t] = (1. / coeff) + ((1. / coeff) * t) # Linear interpolation!
458
459 self.profile = profile