1"""
2The Battle of the Leakage Detection and Isolation Methods (*BattLeDIM*) 2020, organized by
3S. G. Vrachimis, D. G. Eliades, R. Taormina, Z. Kapelan, A. Ostfeld, S. Liu, M. Kyriakou,
4P. Pavlou, M. Qiu, and M. M. Polycarpou, as part of the 2nd International CCWI/WDSA Joint
5Conference in Beijing, China, aims at objectively comparing the performance of methods for
6the detection and localization of leakage events, relying on SCADA measurements of flow and
7pressure sensors installed within water distribution networks.
8
9See https://github.com/KIOS-Research/BattLeDIM for details.
10
11This module provides functions for loading the original BattLeDIM data set
12:func:`~epyt_flow.data.benchmarks.battledim.load_data`, as well as methods for loading the scenarios
13:func:`~epyt_flow.data.benchmarks.battledim.load_scenario` and pre-generated SCADA data
14:func:`~epyt_flow.data.benchmarks.battledim.load_scada_data`.
15The official scoring/evaluation is implemented in
16:func:`~epyt_flow.data.benchmarks.battledim.compute_evaluation_score` -- i.e. those results can be
17directly compared to the official leaderboard results.
18Besides this, the user can choose to evaluate predictions using any other metric from
19:mod:`~epyt_flow.metrics`.
20"""
21from typing import Any, Union
22import os
23import math
24from datetime import datetime
25import functools
26import scipy
27import pandas as pd
28import numpy as np
29from scipy.sparse import bsr_array
30
31from .battledim_data import START_TIME_TEST, START_TIME_TRAIN, LEAKS_CONFIG_TEST, \
32 LEAKS_CONFIG_TRAIN
33from ..networks import load_ltown
34from ...simulation.events import AbruptLeakage, IncipientLeakage, Leakage
35from ...simulation import ScenarioConfig, EpanetConstants
36from ...topology import NetworkTopology
37from ...simulation.scada import ScadaData
38from ...utils import get_temp_folder, to_seconds, create_path_if_not_exist, download_if_necessary
39
40
41def __parse_leak_config(start_time: str, leaks_config: str) -> list[Leakage]:
42 leakages = []
43 for leak in leaks_config.splitlines():
44 # Parse entry
45 items = [i.strip() for i in leak.split(",")]
46 leaky_pipe_id = items[0]
47 leak_start_time = int((datetime.strptime(items[1], "%Y-%m-%d %H:%M") - start_time)
48 .total_seconds())
49 leak_end_time = int((datetime.strptime(items[2], "%Y-%m-%d %H:%M") - start_time)
50 .total_seconds())
51 leak_diameter = float(items[3])
52 leak_type = items[4]
53 leak_peak_time = int((datetime.strptime(items[5], "%Y-%m-%d %H:%M") - start_time)
54 .total_seconds())
55
56 # Create leak config
57 if leak_type == "incipient":
58 leak = IncipientLeakage(link_id=leaky_pipe_id, diameter=leak_diameter,
59 start_time=leak_start_time, end_time=leak_end_time,
60 peak_time=leak_peak_time)
61 elif leak_type == "abrupt":
62 leak = AbruptLeakage(link_id=leaky_pipe_id, diameter=leak_diameter,
63 start_time=leak_start_time, end_time=leak_end_time)
64
65 leakages.append(leak)
66
67 return leakages
68
69
70def __create_labels(n_time_steps: int, return_test_scenario: bool,
71 links: list[str]) -> tuple[np.ndarray, scipy.sparse.bsr_array]:
72 y = np.zeros(n_time_steps)
73
74 start_time = START_TIME_TEST if return_test_scenario is True else START_TIME_TRAIN
75 leaks_config = LEAKS_CONFIG_TEST if return_test_scenario is True else LEAKS_CONFIG_TRAIN
76 leakages = __parse_leak_config(start_time, leaks_config)
77
78 def leak_time_to_idx(t: int, round_up: bool = False):
79 if round_up is False:
80 return math.floor(t / 300)
81 else:
82 return math.ceil(t / 300)
83
84 leak_locations_row = []
85 leak_locations_col = []
86 for leak in leakages:
87 t_idx_start = leak_time_to_idx(leak.start_time)
88 t_idx_end = leak_time_to_idx(leak.end_time, round_up=True)
89 y[t_idx_start:t_idx_end] = 1
90
91 leak_link_idx = links.index(leak.link_id)
92 for t in range(t_idx_end - t_idx_start):
93 leak_locations_row.append(t_idx_start + t)
94 leak_locations_col.append(leak_link_idx)
95
96 y_leak_locations = bsr_array(
97 (np.ones(len(leak_locations_row)), (leak_locations_row, leak_locations_col)),
98 shape=(n_time_steps, len(links)))
99
100 return y, y_leak_locations
101
102
[docs]
103def compute_evaluation_score(y_leak_locations_pred: list[tuple[str, int]],
104 test_scenario: bool, verbose: bool = True) -> dict:
105 """
106 Evaluates the predictions (i.e. start time and location of leakages) as it was done in the
107 BattLeDIM competition -- i.e. the output of this functions can be directly compared
108 to the official leaderboard results.
109
110 Parameters
111 ----------
112 y_leak_locations_pred : `list[tuple[str, int]]`
113 Predictions of location (link/pipe ID) and start time
114 (in seconds since simulation start) of leakages.
115 test_scenario : `bool`
116 True if the given predictions are made for the test scenario, False otherwise.
117 verbose : `bool`, optional
118 If True, a progress bar is shown while downloading files.
119
120 The default is True.
121
122 Returns
123 -------
124 `dict`
125 Dictionary containing the true positive rate, true positives, false positives,
126 false negatives, and total monetary (Euro) savings (only available if `test_scenario`
127 is True).
128 """
129 # Original MATLAB implementation:
130 # https://github.com/KIOS-Research/BattLeDIM/blob/master/Scoring%20Algorithm/Scoring_Algorithm.m
131
132 # Scoring parameters
133 dist_max = 300 # Max pipe distance for leakage detection (meters)
134 cost_water = .8 # Cost of water per m3 (Euro)
135 cost_crew = 500 # Max repair crew cost per assignment (Euro)
136
137 hydraulic_time_step = to_seconds(minutes=5)
138
139 # Get WDN topology and find minimum topological distance (using the pipe lengths)
140 # between all nodes
141 f_topology_in = os.path.join(get_temp_folder(), "BattLeDIM", "ltown.epytflow_topology")
142 url_topology = "https://filedn.com/lumBFq2P9S74PNoLPWtzxG4/EPyT-Flow/BattLeDIM/" +\
143 "ltown.epytflow_topology"
144
145 download_if_necessary(f_topology_in, url_topology, verbose)
146 topology = NetworkTopology.load_from_file(f_topology_in)
147
148 all_pairs_shortest_path_length = topology.get_all_pairs_shortest_path_length()
149
150 # Load ground truth
151 sim_start_time = START_TIME_TEST if test_scenario is True else START_TIME_TRAIN
152 leaks_config = LEAKS_CONFIG_TEST if test_scenario is True else LEAKS_CONFIG_TRAIN
153 leakages = __parse_leak_config(sim_start_time, leaks_config)
154 n_leakages = len(leakages)
155
156 leak_demands = {}
157 if test_scenario is True:
158 # Download leak demands
159 for leak in leakages:
160 f_in = f"Leak_{leak.link_id}.xlsx"
161 url = "https://raw.githubusercontent.com/KIOS-Research/BattLeDIM/master/" + \
162 f"Scoring%20Algorithm/competition_leakages/{f_in}"
163
164 f_local_in = os.path.join(get_temp_folder(), "BattLeDIM", f_in)
165 download_if_necessary(f_local_in, url, verbose)
166
167 df_leak_demand = pd.read_excel(f_local_in, sheet_name="Demand (m3_h)")
168 leak_demand = df_leak_demand[leak.link_id].to_numpy()
169 leak_demands[leak.link_id] = leak_demand
170
171 # Evaluate given predictions/alarms
172 total_savings = 0
173 true_positives = 0
174 false_positives = 0
175 detected_leaks = []
176
177 leak_data = []
178 for leak in leakages:
179 leak_data.append((leak.link_id, leak.start_time, leak.end_time))
180
181 def __find_closest_leaky_pipe(link_id) -> tuple[str, float, int]:
182 closest_leaky_pipe_id = None
183 closest_dist = float("inf")
184 closest_start_time = None
185 closest_end_time = None
186
187 node_a, node_b = topology.get_link_info(link_id)["nodes"]
188
189 for leak_pipe_id, start_time_leak, end_time_leak in leak_data:
190 link_info = topology.get_link_info(leak_pipe_id)
191 end_node_a, end_node_b = link_info["nodes"]
192 link_length = link_info["length"]
193
194 dists = []
195 dists.append(all_pairs_shortest_path_length[node_a][end_node_a] + .5 * link_length)
196 dists.append(all_pairs_shortest_path_length[node_a][end_node_b] + .5 * link_length)
197 dists.append(all_pairs_shortest_path_length[node_b][end_node_a] + .5 * link_length)
198 dists.append(all_pairs_shortest_path_length[node_b][end_node_b] + .5 * link_length)
199 if min(dists) < closest_dist:
200 closest_dist = min(dists)
201 closest_leaky_pipe_id = leak_pipe_id
202 closest_start_time = start_time_leak
203 closest_end_time = end_time_leak
204
205 return closest_leaky_pipe_id, closest_dist, closest_start_time, closest_end_time
206
207 for pipe_id, start_time in y_leak_locations_pred:
208 # Check if leakages was found and if so, how far away it is from the ground truth
209 leaky_pipe_dist = None
210 leaky_pipe = None
211 if any(pipe_id == leaky_pipe_id and start_time >= start_time_leak and
212 start_time <= end_time_leak and
213 pipe_id not in detected_leaks
214 for leaky_pipe_id, start_time_leak, end_time_leak in leak_data):
215 leaky_pipe_dist = 0
216 leaky_pipe = pipe_id
217 else:
218 closest_leaky_pipe_id, dist, start_time_leak, end_time_leak = \
219 __find_closest_leaky_pipe(pipe_id)
220 if start_time >= start_time_leak and start_time <= end_time_leak:
221 leaky_pipe_dist = dist
222 leaky_pipe = closest_leaky_pipe_id
223
224 # Compute score of current alarm
225 if leaky_pipe is not None:
226 detected_leaks.append(leaky_pipe)
227 true_positives += 1
228
229 water_saved = 0
230 if leaky_pipe in leak_demands:
231 leak_demand = leak_demands[leaky_pipe]
232 start_time_idx = math.ceil(start_time / hydraulic_time_step)
233 water_saved = np.sum(leak_demand[start_time_idx:])
234 total_savings += water_saved * cost_water - (leaky_pipe_dist / dist_max) * cost_crew
235 else:
236 false_positives += 1
237 total_savings += -1. * cost_crew
238
239 # Compute final scores
240 false_negatives = n_leakages - true_positives
241 true_positive_rate = true_positives / (true_positives + false_negatives)
242
243 return {"true_positive_rate": true_positive_rate, "true_positives": true_positives,
244 "false_positives": false_positives, "false_negatives": false_negatives,
245 "total_savings": total_savings if test_scenario is True else None}
246
247
[docs]
248def load_data(return_test_scenario: bool, download_dir: str = None, return_X_y: bool = False,
249 return_features_desc: bool = False, return_leak_locations: bool = False,
250 verbose: bool = True) -> Union[pd.DataFrame, Any]:
251 """
252 Loads the original BattLeDIM benchmark data set.
253 Note that the data set exists in two different version --
254 a training version and an evaluation/test version.
255
256 Parameters
257 ----------
258 return_test_scenario : `bool`
259 If True, the evaluation/test data set is returned, otherwise the historical
260 (i.e. training) data set is returned.
261 download_dir : `str`, optional
262 Path to the data files -- if None, the temp folder will be used.
263 If the path does not exist, the data files will be downloaded to the given path.
264
265 The default is None.
266 return_X_y : `bool`, optional
267 If True, the data is returned together with the labels (presence of a leakage) as
268 two Numpy arrays, otherwise, the data is returned as a
269 :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` instance.
270
271 The default is False.
272 return_features_desc : `bool`, optional
273 If True and if `return_X_y` is True, the returned dictionary contains the
274 features' descriptions (i.e. names) under the key "features_desc".
275
276 The default is False.
277 return_leak_locations : `bool`
278 If True, the leak locations are returned as well --
279 as an instance of `scipy.sparse.bsr_array`.
280
281 The default is False.
282 verbose : `bool`, optional
283 If True, a progress bar is shown while downloading files.
284
285 The default is True.
286
287 Returns
288 -------
289 Either a `pandas.DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`_ instance or a tuple of `Numpy arrays <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_.
290 Benchmark data set.
291 """
292 # Download data files if necessary
293 if return_test_scenario is True:
294 url_data = "https://zenodo.org/records/4017659/files/2018_SCADA.xlsx?download=1"
295 f_in = "2018_SCADA.xlsx"
296 else:
297 url_data = "https://zenodo.org/records/4017659/files/2019_SCADA.xlsx?download=1"
298 f_in = "2019_SCADA.xlsx"
299
300 download_dir = download_dir if download_dir is not None else get_temp_folder()
301 download_dir = os.path.join(download_dir, "BattLeDIM")
302 create_path_if_not_exist(download_dir)
303 f_in = os.path.join(download_dir, f_in)
304
305 download_if_necessary(f_in, url_data, verbose)
306
307 # Load and parse data files
308 df_pressures = pd.read_excel(f_in, sheet_name="Pressures (m)")
309 df_pressures.columns = ["Timestamp"] + [f"Pressure_{n_id}" for n_id in df_pressures.columns[1:]]
310
311 df_demands = pd.read_excel(f_in, sheet_name="Demands (L_h)")
312 df_demands.columns = ["Timestamp"] + [f"Demand_{n_id}" for n_id in df_demands.columns[1:]]
313
314 df_flows = pd.read_excel(f_in, sheet_name="Flows (m3_h)")
315 df_flows.columns = ["Timestamp"] + [f"Flow_{l_id}" for l_id in df_flows.columns[1:]]
316
317 df_levels = pd.read_excel(f_in, sheet_name="Levels (m)")
318 df_levels.columns = ["Timestamp"] + [f"Level_{t_id}" for t_id in df_levels.columns[1:]]
319
320 df_final = functools.reduce(lambda left, right: pd.merge(left, right, on="Timestamp"),
321 [df_pressures, df_flows, df_levels, df_demands])
322
323 # Prepare and return final data
324 if return_X_y is True:
325 features_desc = list(df_final.columns)
326 features_desc.remove("Timestamp")
327
328 network_config = load_ltown(download_dir)
329 links = network_config.sensor_config.links
330
331 X = df_final[features_desc].to_numpy()
332 y, y_leak_locations = __create_labels(X.shape[0], return_test_scenario, links)
333
334 if return_features_desc is True:
335 if return_leak_locations is True:
336 return X, y, features_desc, y_leak_locations
337 else:
338 return X, y, features_desc
339 else:
340 if return_leak_locations is True:
341 return X, y, y_leak_locations
342 else:
343 return X, y
344 else:
345 return df_final
346
347
[docs]
348def load_scada_data(return_test_scenario: bool, download_dir: str = None,
349 return_X_y: bool = False, return_leak_locations: bool = False,
350 verbose: bool = True) -> list[Union[ScadaData, Any]]:
351 """
352 Loads the SCADA data of the simulated BattLeDIM benchmark scenario -- note that due to
353 randomness, these differ from the original data set which can be loaded by calling
354 :func:`~epyt_flow.data.benchmarks.battledim.load_data`.
355
356 .. warning::
357
358 A large file (approx. 4GB) will be downloaded and loaded into memory --
359 this might take some time.
360
361 Parameters
362 ----------
363 return_test_scenario : `bool`
364 If True, the evaluation/test scenario is returned, otherwise the historical
365 (i.e. training) scenario is returned.
366 download_dir : `str`, optional
367 Path to the data files -- if None, the temp folder will be used.
368 If the path does not exist, the data files will be downloaded to the given path.
369
370 The default is None.
371 return_X_y : `bool`, optional
372 If True, the data is returned together with the labels (presence of a leakage) as
373 two Numpy arrays, otherwise, the data is returned as a
374 :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` instance.
375
376 The default is False.
377 return_leak_locations : `bool`
378 If True, the leak locations are returned as well --
379 as an instance of `scipy.sparse.bsr_array`.
380
381 The default is False.
382 verbose : `bool`, optional
383 If True, a progress bar is shown while downloading files.
384
385 The default is True.
386
387 Returns
388 -------
389 :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` or `list[tuple[numpy.ndarray, numpy.ndarray]]`
390 The simulated benchmark scenario as either a
391 :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` instance or as a tuple of
392 (X, y) `Numpy arrays <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_.
393 If 'return_leak_locations' is True, the leak locations are included
394 as an instance of `scipy.sparse.bsr_array <https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.bsr_array.html>`_ as well.
395 """
396 download_dir = download_dir if download_dir is not None else get_temp_folder()
397
398 url_data = "https://filedn.com/lumBFq2P9S74PNoLPWtzxG4/EPyT-Flow/BattLeDIM/"
399
400 f_in = f"{'battledim_test' if return_test_scenario else 'battledim_train'}.epytflow_scada_data"
401 download_if_necessary(os.path.join(download_dir, f_in), url_data + f_in, verbose)
402
403 data = ScadaData.load_from_file(os.path.join(download_dir, f_in))
404
405 X = data.get_data()
406 y, y_leak_locations = __create_labels(X.shape[0], return_test_scenario,
407 data.sensor_config.links)
408
409 if return_X_y is True:
410 if return_leak_locations is True:
411 return X, y, y_leak_locations
412 else:
413 return X, y
414 else:
415 if return_leak_locations is True:
416 return data, y_leak_locations
417 else:
418 return data
419
420
[docs]
421def load_scenario(return_test_scenario: bool, download_dir: str = None,
422 verbose: bool = True) -> ScenarioConfig:
423 """
424 Creates and returns the BattLeDIM scenario -- it can be either modified or
425 passed directly to the simulator
426 :class:`~epyt_flow.simulation.scenario_simulator.ScenarioSimulator`.
427
428 .. note::
429
430 Note that due to randomness, the simulation results differ from the original data set which
431 can be loaded by calling :func:`~epyt_flow.data.benchmarks.battledim.load_data`.
432
433 Parameters
434 ----------
435 return_test_scenario : `bool`
436 If True, the evaluation/test scenario is returned, otherwise the historical
437 (i.e. training) scenario is returned.
438 download_dir : `str`, optional
439 Path to the L-TOWN.inp file -- if None, the temp folder will be used.
440 If the path does not exist, the .inp will be downloaded to the given path.
441
442 The default is None.
443 verbose : `bool`, optional
444 If True, a progress bar is shown while downloading files.
445
446 The default is True.
447
448 Returns
449 -------
450 :class:`~epyt_flow.simulation.scenario_config.ScenarioConfig`
451 Complete scenario configuration of the BattLeDIM benchmark scenario.
452 """
453
454 # Load L-Town network including the sensor placement
455 if download_dir is not None:
456 ltown_config = load_ltown(download_dir=download_dir, use_realistic_demands=True,
457 include_default_sensor_placement=True, verbose=verbose)
458 else:
459 ltown_config = load_ltown(use_realistic_demands=True, include_default_sensor_placement=True,
460 verbose=verbose)
461
462 # Set simulation duration and other general parameters such as the demand model
463 general_params = {"simulation_duration": to_seconds(days=365), # One year
464 "hydraulic_time_step": to_seconds(minutes=5), # 5min time steps
465 "reporting_time_step": to_seconds(minutes=5),
466 "demand_model": {"type": EpanetConstants.EN_PDA, "pressure_min": 0,
467 "pressure_required": 0.1,
468 "pressure_exponent": 0.5}
469 } | ltown_config.general_params
470
471 # Add events
472 start_time = START_TIME_TEST if return_test_scenario is True else START_TIME_TRAIN
473 leaks_config = LEAKS_CONFIG_TEST if return_test_scenario is True else LEAKS_CONFIG_TRAIN
474 leakages = __parse_leak_config(start_time, leaks_config)
475
476 # Build final scenario
477 return ScenarioConfig(f_inp_in=ltown_config.f_inp_in, general_params=general_params,
478 sensor_config=ltown_config.sensor_config, system_events=leakages)