Source code for epyt_flow.data.benchmarks.battledim

  1"""
  2The Battle of the Leakage Detection and Isolation Methods (*BattLeDIM*) 2020, organized by
  3S. G. Vrachimis, D. G. Eliades, R. Taormina, Z. Kapelan, A. Ostfeld, S. Liu, M. Kyriakou,
  4P. Pavlou, M. Qiu, and M. M. Polycarpou, as part of the 2nd International CCWI/WDSA Joint
  5Conference in Beijing, China, aims at objectively comparing the performance of methods for
  6the detection and localization of leakage events, relying on SCADA measurements of flow and
  7pressure sensors installed within water distribution networks.
  8
  9See https://github.com/KIOS-Research/BattLeDIM for details.
 10
 11This module provides functions for loading the original BattLeDIM data set
 12:func:`~epyt_flow.data.benchmarks.battledim.load_data`, as well as methods for loading the scenarios
 13:func:`~epyt_flow.data.benchmarks.battledim.load_scenario` and pre-generated SCADA data
 14:func:`~epyt_flow.data.benchmarks.battledim.load_scada_data`.
 15The official scoring/evaluation is implemented in
 16:func:`~epyt_flow.data.benchmarks.battledim.compute_evaluation_score` -- i.e. those results can be
 17directly compared to the official leaderboard results.
 18Besides this, the user can choose to evaluate predictions using any other metric from
 19:mod:`~epyt_flow.metrics`.
 20"""
 21from typing import Any, Union
 22import os
 23import math
 24from datetime import datetime
 25import functools
 26import scipy
 27import pandas as pd
 28import numpy as np
 29from scipy.sparse import bsr_array
 30
 31from .battledim_data import START_TIME_TEST, START_TIME_TRAIN, LEAKS_CONFIG_TEST, \
 32    LEAKS_CONFIG_TRAIN
 33from ..networks import load_ltown
 34from ...simulation.events import AbruptLeakage, IncipientLeakage, Leakage
 35from ...simulation import ScenarioConfig, EpanetConstants
 36from ...topology import NetworkTopology
 37from ...simulation.scada import ScadaData
 38from ...utils import get_temp_folder, to_seconds, create_path_if_not_exist, download_if_necessary
 39
 40
 41def __parse_leak_config(start_time: str, leaks_config: str) -> list[Leakage]:
 42    leakages = []
 43    for leak in leaks_config.splitlines():
 44        # Parse entry
 45        items = [i.strip() for i in leak.split(",")]
 46        leaky_pipe_id = items[0]
 47        leak_start_time = int((datetime.strptime(items[1], "%Y-%m-%d %H:%M") - start_time)
 48                              .total_seconds())
 49        leak_end_time = int((datetime.strptime(items[2], "%Y-%m-%d %H:%M") - start_time)
 50                            .total_seconds())
 51        leak_diameter = float(items[3])
 52        leak_type = items[4]
 53        leak_peak_time = int((datetime.strptime(items[5], "%Y-%m-%d %H:%M") - start_time)
 54                             .total_seconds())
 55
 56        # Create leak config
 57        if leak_type == "incipient":
 58            leak = IncipientLeakage(link_id=leaky_pipe_id, diameter=leak_diameter,
 59                                    start_time=leak_start_time, end_time=leak_end_time,
 60                                    peak_time=leak_peak_time)
 61        elif leak_type == "abrupt":
 62            leak = AbruptLeakage(link_id=leaky_pipe_id, diameter=leak_diameter,
 63                                 start_time=leak_start_time, end_time=leak_end_time)
 64
 65        leakages.append(leak)
 66
 67    return leakages
 68
 69
 70def __create_labels(n_time_steps: int, return_test_scenario: bool,
 71                    links: list[str]) -> tuple[np.ndarray, scipy.sparse.bsr_array]:
 72    y = np.zeros(n_time_steps)
 73
 74    start_time = START_TIME_TEST if return_test_scenario is True else START_TIME_TRAIN
 75    leaks_config = LEAKS_CONFIG_TEST if return_test_scenario is True else LEAKS_CONFIG_TRAIN
 76    leakages = __parse_leak_config(start_time, leaks_config)
 77
 78    def leak_time_to_idx(t: int, round_up: bool = False):
 79        if round_up is False:
 80            return math.floor(t / 300)
 81        else:
 82            return math.ceil(t / 300)
 83
 84    leak_locations_row = []
 85    leak_locations_col = []
 86    for leak in leakages:
 87        t_idx_start = leak_time_to_idx(leak.start_time)
 88        t_idx_end = leak_time_to_idx(leak.end_time, round_up=True)
 89        y[t_idx_start:t_idx_end] = 1
 90
 91        leak_link_idx = links.index(leak.link_id)
 92        for t in range(t_idx_end - t_idx_start):
 93            leak_locations_row.append(t_idx_start + t)
 94            leak_locations_col.append(leak_link_idx)
 95
 96    y_leak_locations = bsr_array(
 97        (np.ones(len(leak_locations_row)), (leak_locations_row, leak_locations_col)),
 98        shape=(n_time_steps, len(links)))
 99
100    return y, y_leak_locations
101
102
[docs] 103def compute_evaluation_score(y_leak_locations_pred: list[tuple[str, int]], 104 test_scenario: bool, verbose: bool = True) -> dict: 105 """ 106 Evaluates the predictions (i.e. start time and location of leakages) as it was done in the 107 BattLeDIM competition -- i.e. the output of this functions can be directly compared 108 to the official leaderboard results. 109 110 Parameters 111 ---------- 112 y_leak_locations_pred : `list[tuple[str, int]]` 113 Predictions of location (link/pipe ID) and start time 114 (in seconds since simulation start) of leakages. 115 test_scenario : `bool` 116 True if the given predictions are made for the test scenario, False otherwise. 117 verbose : `bool`, optional 118 If True, a progress bar is shown while downloading files. 119 120 The default is True. 121 122 Returns 123 ------- 124 `dict` 125 Dictionary containing the true positive rate, true positives, false positives, 126 false negatives, and total monetary (Euro) savings (only available if `test_scenario` 127 is True). 128 """ 129 # Original MATLAB implementation: 130 # https://github.com/KIOS-Research/BattLeDIM/blob/master/Scoring%20Algorithm/Scoring_Algorithm.m 131 132 # Scoring parameters 133 dist_max = 300 # Max pipe distance for leakage detection (meters) 134 cost_water = .8 # Cost of water per m3 (Euro) 135 cost_crew = 500 # Max repair crew cost per assignment (Euro) 136 137 hydraulic_time_step = to_seconds(minutes=5) 138 139 # Get WDN topology and find minimum topological distance (using the pipe lengths) 140 # between all nodes 141 f_topology_in = os.path.join(get_temp_folder(), "BattLeDIM", "ltown.epytflow_topology") 142 url_topology = "https://filedn.com/lumBFq2P9S74PNoLPWtzxG4/EPyT-Flow/BattLeDIM/" +\ 143 "ltown.epytflow_topology" 144 145 download_if_necessary(f_topology_in, url_topology, verbose) 146 topology = NetworkTopology.load_from_file(f_topology_in) 147 148 all_pairs_shortest_path_length = topology.get_all_pairs_shortest_path_length() 149 150 # Load ground truth 151 sim_start_time = START_TIME_TEST if test_scenario is True else START_TIME_TRAIN 152 leaks_config = LEAKS_CONFIG_TEST if test_scenario is True else LEAKS_CONFIG_TRAIN 153 leakages = __parse_leak_config(sim_start_time, leaks_config) 154 n_leakages = len(leakages) 155 156 leak_demands = {} 157 if test_scenario is True: 158 # Download leak demands 159 for leak in leakages: 160 f_in = f"Leak_{leak.link_id}.xlsx" 161 url = "https://raw.githubusercontent.com/KIOS-Research/BattLeDIM/master/" + \ 162 f"Scoring%20Algorithm/competition_leakages/{f_in}" 163 164 f_local_in = os.path.join(get_temp_folder(), "BattLeDIM", f_in) 165 download_if_necessary(f_local_in, url, verbose) 166 167 df_leak_demand = pd.read_excel(f_local_in, sheet_name="Demand (m3_h)") 168 leak_demand = df_leak_demand[leak.link_id].to_numpy() 169 leak_demands[leak.link_id] = leak_demand 170 171 # Evaluate given predictions/alarms 172 total_savings = 0 173 true_positives = 0 174 false_positives = 0 175 detected_leaks = [] 176 177 leak_data = [] 178 for leak in leakages: 179 leak_data.append((leak.link_id, leak.start_time, leak.end_time)) 180 181 def __find_closest_leaky_pipe(link_id) -> tuple[str, float, int]: 182 closest_leaky_pipe_id = None 183 closest_dist = float("inf") 184 closest_start_time = None 185 closest_end_time = None 186 187 node_a, node_b = topology.get_link_info(link_id)["nodes"] 188 189 for leak_pipe_id, start_time_leak, end_time_leak in leak_data: 190 link_info = topology.get_link_info(leak_pipe_id) 191 end_node_a, end_node_b = link_info["nodes"] 192 link_length = link_info["length"] 193 194 dists = [] 195 dists.append(all_pairs_shortest_path_length[node_a][end_node_a] + .5 * link_length) 196 dists.append(all_pairs_shortest_path_length[node_a][end_node_b] + .5 * link_length) 197 dists.append(all_pairs_shortest_path_length[node_b][end_node_a] + .5 * link_length) 198 dists.append(all_pairs_shortest_path_length[node_b][end_node_b] + .5 * link_length) 199 if min(dists) < closest_dist: 200 closest_dist = min(dists) 201 closest_leaky_pipe_id = leak_pipe_id 202 closest_start_time = start_time_leak 203 closest_end_time = end_time_leak 204 205 return closest_leaky_pipe_id, closest_dist, closest_start_time, closest_end_time 206 207 for pipe_id, start_time in y_leak_locations_pred: 208 # Check if leakages was found and if so, how far away it is from the ground truth 209 leaky_pipe_dist = None 210 leaky_pipe = None 211 if any(pipe_id == leaky_pipe_id and start_time >= start_time_leak and 212 start_time <= end_time_leak and 213 pipe_id not in detected_leaks 214 for leaky_pipe_id, start_time_leak, end_time_leak in leak_data): 215 leaky_pipe_dist = 0 216 leaky_pipe = pipe_id 217 else: 218 closest_leaky_pipe_id, dist, start_time_leak, end_time_leak = \ 219 __find_closest_leaky_pipe(pipe_id) 220 if start_time >= start_time_leak and start_time <= end_time_leak: 221 leaky_pipe_dist = dist 222 leaky_pipe = closest_leaky_pipe_id 223 224 # Compute score of current alarm 225 if leaky_pipe is not None: 226 detected_leaks.append(leaky_pipe) 227 true_positives += 1 228 229 water_saved = 0 230 if leaky_pipe in leak_demands: 231 leak_demand = leak_demands[leaky_pipe] 232 start_time_idx = math.ceil(start_time / hydraulic_time_step) 233 water_saved = np.sum(leak_demand[start_time_idx:]) 234 total_savings += water_saved * cost_water - (leaky_pipe_dist / dist_max) * cost_crew 235 else: 236 false_positives += 1 237 total_savings += -1. * cost_crew 238 239 # Compute final scores 240 false_negatives = n_leakages - true_positives 241 true_positive_rate = true_positives / (true_positives + false_negatives) 242 243 return {"true_positive_rate": true_positive_rate, "true_positives": true_positives, 244 "false_positives": false_positives, "false_negatives": false_negatives, 245 "total_savings": total_savings if test_scenario is True else None}
246 247
[docs] 248def load_data(return_test_scenario: bool, download_dir: str = None, return_X_y: bool = False, 249 return_features_desc: bool = False, return_leak_locations: bool = False, 250 verbose: bool = True) -> Union[pd.DataFrame, Any]: 251 """ 252 Loads the original BattLeDIM benchmark data set. 253 Note that the data set exists in two different version -- 254 a training version and an evaluation/test version. 255 256 Parameters 257 ---------- 258 return_test_scenario : `bool` 259 If True, the evaluation/test data set is returned, otherwise the historical 260 (i.e. training) data set is returned. 261 download_dir : `str`, optional 262 Path to the data files -- if None, the temp folder will be used. 263 If the path does not exist, the data files will be downloaded to the given path. 264 265 The default is None. 266 return_X_y : `bool`, optional 267 If True, the data is returned together with the labels (presence of a leakage) as 268 two Numpy arrays, otherwise, the data is returned as a 269 :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` instance. 270 271 The default is False. 272 return_features_desc : `bool`, optional 273 If True and if `return_X_y` is True, the returned dictionary contains the 274 features' descriptions (i.e. names) under the key "features_desc". 275 276 The default is False. 277 return_leak_locations : `bool` 278 If True, the leak locations are returned as well -- 279 as an instance of `scipy.sparse.bsr_array`. 280 281 The default is False. 282 verbose : `bool`, optional 283 If True, a progress bar is shown while downloading files. 284 285 The default is True. 286 287 Returns 288 ------- 289 Either a `pandas.DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`_ instance or a tuple of `Numpy arrays <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_. 290 Benchmark data set. 291 """ 292 # Download data files if necessary 293 if return_test_scenario is True: 294 url_data = "https://zenodo.org/records/4017659/files/2018_SCADA.xlsx?download=1" 295 f_in = "2018_SCADA.xlsx" 296 else: 297 url_data = "https://zenodo.org/records/4017659/files/2019_SCADA.xlsx?download=1" 298 f_in = "2019_SCADA.xlsx" 299 300 download_dir = download_dir if download_dir is not None else get_temp_folder() 301 download_dir = os.path.join(download_dir, "BattLeDIM") 302 create_path_if_not_exist(download_dir) 303 f_in = os.path.join(download_dir, f_in) 304 305 download_if_necessary(f_in, url_data, verbose) 306 307 # Load and parse data files 308 df_pressures = pd.read_excel(f_in, sheet_name="Pressures (m)") 309 df_pressures.columns = ["Timestamp"] + [f"Pressure_{n_id}" for n_id in df_pressures.columns[1:]] 310 311 df_demands = pd.read_excel(f_in, sheet_name="Demands (L_h)") 312 df_demands.columns = ["Timestamp"] + [f"Demand_{n_id}" for n_id in df_demands.columns[1:]] 313 314 df_flows = pd.read_excel(f_in, sheet_name="Flows (m3_h)") 315 df_flows.columns = ["Timestamp"] + [f"Flow_{l_id}" for l_id in df_flows.columns[1:]] 316 317 df_levels = pd.read_excel(f_in, sheet_name="Levels (m)") 318 df_levels.columns = ["Timestamp"] + [f"Level_{t_id}" for t_id in df_levels.columns[1:]] 319 320 df_final = functools.reduce(lambda left, right: pd.merge(left, right, on="Timestamp"), 321 [df_pressures, df_flows, df_levels, df_demands]) 322 323 # Prepare and return final data 324 if return_X_y is True: 325 features_desc = list(df_final.columns) 326 features_desc.remove("Timestamp") 327 328 network_config = load_ltown(download_dir) 329 links = network_config.sensor_config.links 330 331 X = df_final[features_desc].to_numpy() 332 y, y_leak_locations = __create_labels(X.shape[0], return_test_scenario, links) 333 334 if return_features_desc is True: 335 if return_leak_locations is True: 336 return X, y, features_desc, y_leak_locations 337 else: 338 return X, y, features_desc 339 else: 340 if return_leak_locations is True: 341 return X, y, y_leak_locations 342 else: 343 return X, y 344 else: 345 return df_final
346 347
[docs] 348def load_scada_data(return_test_scenario: bool, download_dir: str = None, 349 return_X_y: bool = False, return_leak_locations: bool = False, 350 verbose: bool = True) -> list[Union[ScadaData, Any]]: 351 """ 352 Loads the SCADA data of the simulated BattLeDIM benchmark scenario -- note that due to 353 randomness, these differ from the original data set which can be loaded by calling 354 :func:`~epyt_flow.data.benchmarks.battledim.load_data`. 355 356 .. warning:: 357 358 A large file (approx. 4GB) will be downloaded and loaded into memory -- 359 this might take some time. 360 361 Parameters 362 ---------- 363 return_test_scenario : `bool` 364 If True, the evaluation/test scenario is returned, otherwise the historical 365 (i.e. training) scenario is returned. 366 download_dir : `str`, optional 367 Path to the data files -- if None, the temp folder will be used. 368 If the path does not exist, the data files will be downloaded to the given path. 369 370 The default is None. 371 return_X_y : `bool`, optional 372 If True, the data is returned together with the labels (presence of a leakage) as 373 two Numpy arrays, otherwise, the data is returned as a 374 :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` instance. 375 376 The default is False. 377 return_leak_locations : `bool` 378 If True, the leak locations are returned as well -- 379 as an instance of `scipy.sparse.bsr_array`. 380 381 The default is False. 382 verbose : `bool`, optional 383 If True, a progress bar is shown while downloading files. 384 385 The default is True. 386 387 Returns 388 ------- 389 :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` or `list[tuple[numpy.ndarray, numpy.ndarray]]` 390 The simulated benchmark scenario as either a 391 :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` instance or as a tuple of 392 (X, y) `Numpy arrays <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`_. 393 If 'return_leak_locations' is True, the leak locations are included 394 as an instance of `scipy.sparse.bsr_array <https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.bsr_array.html>`_ as well. 395 """ 396 download_dir = download_dir if download_dir is not None else get_temp_folder() 397 398 url_data = "https://filedn.com/lumBFq2P9S74PNoLPWtzxG4/EPyT-Flow/BattLeDIM/" 399 400 f_in = f"{'battledim_test' if return_test_scenario else 'battledim_train'}.epytflow_scada_data" 401 download_if_necessary(os.path.join(download_dir, f_in), url_data + f_in, verbose) 402 403 data = ScadaData.load_from_file(os.path.join(download_dir, f_in)) 404 405 X = data.get_data() 406 y, y_leak_locations = __create_labels(X.shape[0], return_test_scenario, 407 data.sensor_config.links) 408 409 if return_X_y is True: 410 if return_leak_locations is True: 411 return X, y, y_leak_locations 412 else: 413 return X, y 414 else: 415 if return_leak_locations is True: 416 return data, y_leak_locations 417 else: 418 return data
419 420
[docs] 421def load_scenario(return_test_scenario: bool, download_dir: str = None, 422 verbose: bool = True) -> ScenarioConfig: 423 """ 424 Creates and returns the BattLeDIM scenario -- it can be either modified or 425 passed directly to the simulator 426 :class:`~epyt_flow.simulation.scenario_simulator.ScenarioSimulator`. 427 428 .. note:: 429 430 Note that due to randomness, the simulation results differ from the original data set which 431 can be loaded by calling :func:`~epyt_flow.data.benchmarks.battledim.load_data`. 432 433 Parameters 434 ---------- 435 return_test_scenario : `bool` 436 If True, the evaluation/test scenario is returned, otherwise the historical 437 (i.e. training) scenario is returned. 438 download_dir : `str`, optional 439 Path to the L-TOWN.inp file -- if None, the temp folder will be used. 440 If the path does not exist, the .inp will be downloaded to the given path. 441 442 The default is None. 443 verbose : `bool`, optional 444 If True, a progress bar is shown while downloading files. 445 446 The default is True. 447 448 Returns 449 ------- 450 :class:`~epyt_flow.simulation.scenario_config.ScenarioConfig` 451 Complete scenario configuration of the BattLeDIM benchmark scenario. 452 """ 453 454 # Load L-Town network including the sensor placement 455 if download_dir is not None: 456 ltown_config = load_ltown(download_dir=download_dir, use_realistic_demands=True, 457 include_default_sensor_placement=True, verbose=verbose) 458 else: 459 ltown_config = load_ltown(use_realistic_demands=True, include_default_sensor_placement=True, 460 verbose=verbose) 461 462 # Set simulation duration and other general parameters such as the demand model 463 general_params = {"simulation_duration": to_seconds(days=365), # One year 464 "hydraulic_time_step": to_seconds(minutes=5), # 5min time steps 465 "reporting_time_step": to_seconds(minutes=5), 466 "demand_model": {"type": EpanetConstants.EN_PDA, "pressure_min": 0, 467 "pressure_required": 0.1, 468 "pressure_exponent": 0.5} 469 } | ltown_config.general_params 470 471 # Add events 472 start_time = START_TIME_TEST if return_test_scenario is True else START_TIME_TRAIN 473 leaks_config = LEAKS_CONFIG_TEST if return_test_scenario is True else LEAKS_CONFIG_TRAIN 474 leakages = __parse_leak_config(start_time, leaks_config) 475 476 # Build final scenario 477 return ScenarioConfig(f_inp_in=ltown_config.f_inp_in, general_params=general_params, 478 sensor_config=ltown_config.sensor_config, system_events=leakages)