Source code for epyt_flow.simulation.parallel_simulation

  1"""
  2Module provides functions for simulating several scenarios in parallel.
  3"""
  4from typing import Callable, Any
  5import os
  6import warnings
  7import shutil
  8from multiprocess import Pool, cpu_count
  9import psutil
 10
 11from .scenario_config import ScenarioConfig
 12from .scada import ScadaData
 13from .scenario_simulator import ScenarioSimulator
 14
 15
[docs] 16def callback_save_to_file(folder_out: str = "") -> Callable[[ScadaData, ScenarioConfig, int], None]: 17 """ 18 Creates a callback for storing the simulation results in a .epytflow_scada_data file. 19 The returned callback can be directly passed to 20 :func:`~epyt_flow.simulation.parallel_simulation.ParallelScenarioSimulation.run`. 21 22 Parameters 23 ---------- 24 folder_out : `str`, optional 25 Path to the folder where the simulation results will be stored. 26 27 The default is the current working directory. 28 29 Returns 30 ------- 31 `Callable[[ScadaData, ScenarioConfig, int], None]` 32 Callback storing the simulation results. 33 """ 34 def callback(scada_data: ScadaData, _, scenario_idx: int) -> None: 35 scada_data.save_to_file(os.path.join(folder_out, f"{scenario_idx}")) 36 37 return callback
38 39 40def _run_scenario_simulation(scenario_config: ScenarioConfig, scenario_idx: int, 41 callback: Callable[[ScadaData, ScenarioConfig, int], Any]) -> Any: 42 with ScenarioSimulator(scenario_config=scenario_config) as sim: 43 return callback(sim.run_simulation(), scenario_config, scenario_idx) 44 45
[docs] 46class ParallelScenarioSimulation(): 47 """ 48 Class providing functions to run scenario simulations in parallel. 49 """
[docs] 50 @staticmethod 51 def run(scenarios: list[ScenarioConfig], n_jobs: int = -1, 52 max_working_memory_consumption: int = None, 53 callback: Callable[[ScadaData, ScenarioConfig, int], Any] = callback_save_to_file() 54 ) -> Any: 55 """ 56 Simulates multiple scenarios in parallel. 57 58 Parameters 59 ---------- 60 scenarios : list[:class:`~epyt_flow.simulation.scenario_config.ScenarioConfig`] 61 List of scenarios to be simulated. 62 n_jobs : `int`, optional 63 Number of CPUs that can be used by the simulations -- usually, this translates to 64 the number of scenarios that are simulated in parallel. 65 66 If -1, all CPUs are used. 67 68 The default is -1 69 max_working_memory_consumption : `int`, optional 70 Maximum amount of working memory in MB that can be used by the simulations. 71 Note that this might limit the number of scenarios that can be simulated in parallel. 72 73 The default is None. 74 callback: `Callable[[ScadaData, ScenarioConfig, int], None]`, optional 75 Callback that is called after the simulation of a scenario finished. 76 77 The callback gets the simulation results as a 78 :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` instance, the scenario 79 configuration as a :class:`~epyt_flow.simulation.scenario_config.ScenarioConfig` 80 instance, and the index of the scenario in 'scenarios' as arguments. 81 82 The default is :func:`~epyt_flow.simulation.parallel_simulation.callback_save_to_file`. 83 """ 84 if not isinstance(scenarios, list): 85 raise TypeError("'scenarios' must be an instance of 'list[ScenarioConfig]' " + 86 f"but not of '{type(scenarios)}'") 87 if any(not isinstance(item, ScenarioConfig) for item in scenarios): 88 raise TypeError("Each item in 'scenarios' must be an instance of 'ScenarioConfig'") 89 90 if not isinstance(n_jobs, int): 91 raise TypeError(f"'n_jobs' must be an instance of 'int' but not of '{type(n_jobs)}'") 92 if not (n_jobs == -1 or n_jobs > 0): 93 raise ValueError("'n_jobs' must be either -1 or a positive integer") 94 95 if max_working_memory_consumption is not None: 96 if not isinstance(max_working_memory_consumption, int) or \ 97 max_working_memory_consumption <= 0: 98 raise ValueError("'max_working_memory_consumption' must be a positive integer") 99 100 if not callable(callback): 101 raise TypeError("'callback' mut be a callable " + 102 "'Callable[[ScadaData, ScenarioConfig, int], None]'") 103 104 # Get free memory in MB 105 ram_free_memory = psutil.virtual_memory().free * .000001 106 if max_working_memory_consumption is not None: 107 ram_free_memory = max(ram_free_memory, max_working_memory_consumption) 108 109 harddisk_free_memory = shutil.disk_usage(".").free * .000001 110 111 # Check memory requirements of each scenario 112 max_memory_required = max(s_config.memory_consumption_estimate 113 if s_config.memory_consumption_estimate is not None else 0 114 for s_config in scenarios) 115 if max_memory_required > ram_free_memory: 116 raise RuntimeError("Not enough working memory avaialble! " + 117 f"Requested {max_memory_required} MB but only " + 118 f"{ram_free_memory} MB are available") 119 120 if sum(s_config.memory_consumption_estimate 121 if s_config.memory_consumption_estimate is not None else 0 122 for s_config in scenarios) >= harddisk_free_memory: 123 warnings.warn("There might not be enough free space on the hard disk " + 124 "to store all scenario results") 125 126 # Compute number of processes that can run in parallel 127 n_available_cpus = cpu_count() 128 if n_jobs != -1: 129 n_available_cpus = min(n_available_cpus, n_jobs) 130 131 required_memory_bound = min(ram_free_memory, harddisk_free_memory) 132 n_max_parallel_scenarios = n_available_cpus 133 if max_memory_required != 0: 134 n_max_parallel_scenarios = int(required_memory_bound / max_memory_required) 135 136 n_parallel_scenarios = min(n_available_cpus, n_max_parallel_scenarios) 137 138 if any(s_config.f_msx_in is not None for s_config in scenarios): 139 n_parallel_scenarios = 1 140 141 # Run scenario simulations 142 scenarios_task = [] 143 for scenario_idx, scenario in enumerate(scenarios): 144 scenarios_task.append((scenario, scenario_idx, callback)) 145 146 with Pool(processes=n_parallel_scenarios, maxtasksperchild=1) as pool: 147 return pool.starmap(_run_scenario_simulation, scenarios_task)