1"""
2Module provides functions for simulating several scenarios in parallel.
3"""
4from typing import Callable, Any
5import os
6import warnings
7import shutil
8from multiprocess import Pool, cpu_count
9import psutil
10
11from .scenario_config import ScenarioConfig
12from .scada import ScadaData
13from .scenario_simulator import ScenarioSimulator
14
15
[docs]
16def callback_save_to_file(folder_out: str = "") -> Callable[[ScadaData, ScenarioConfig, int], None]:
17 """
18 Creates a callback for storing the simulation results in a .epytflow_scada_data file.
19 The returned callback can be directly passed to
20 :func:`~epyt_flow.simulation.parallel_simulation.ParallelScenarioSimulation.run`.
21
22 Parameters
23 ----------
24 folder_out : `str`, optional
25 Path to the folder where the simulation results will be stored.
26
27 The default is the current working directory.
28
29 Returns
30 -------
31 `Callable[[ScadaData, ScenarioConfig, int], None]`
32 Callback storing the simulation results.
33 """
34 def callback(scada_data: ScadaData, _, scenario_idx: int) -> None:
35 scada_data.save_to_file(os.path.join(folder_out, f"{scenario_idx}"))
36
37 return callback
38
39
40def _run_scenario_simulation(scenario_config: ScenarioConfig, scenario_idx: int,
41 callback: Callable[[ScadaData, ScenarioConfig, int], Any]) -> Any:
42 with ScenarioSimulator(scenario_config=scenario_config) as sim:
43 return callback(sim.run_simulation(), scenario_config, scenario_idx)
44
45
[docs]
46class ParallelScenarioSimulation():
47 """
48 Class providing functions to run scenario simulations in parallel.
49 """
[docs]
50 @staticmethod
51 def run(scenarios: list[ScenarioConfig], n_jobs: int = -1,
52 max_working_memory_consumption: int = None,
53 callback: Callable[[ScadaData, ScenarioConfig, int], Any] = callback_save_to_file()
54 ) -> Any:
55 """
56 Simulates multiple scenarios in parallel.
57
58 Parameters
59 ----------
60 scenarios : list[:class:`~epyt_flow.simulation.scenario_config.ScenarioConfig`]
61 List of scenarios to be simulated.
62 n_jobs : `int`, optional
63 Number of CPUs that can be used by the simulations -- usually, this translates to
64 the number of scenarios that are simulated in parallel.
65
66 If -1, all CPUs are used.
67
68 The default is -1
69 max_working_memory_consumption : `int`, optional
70 Maximum amount of working memory in MB that can be used by the simulations.
71 Note that this might limit the number of scenarios that can be simulated in parallel.
72
73 The default is None.
74 callback: `Callable[[ScadaData, ScenarioConfig, int], None]`, optional
75 Callback that is called after the simulation of a scenario finished.
76
77 The callback gets the simulation results as a
78 :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` instance, the scenario
79 configuration as a :class:`~epyt_flow.simulation.scenario_config.ScenarioConfig`
80 instance, and the index of the scenario in 'scenarios' as arguments.
81
82 The default is :func:`~epyt_flow.simulation.parallel_simulation.callback_save_to_file`.
83 """
84 if not isinstance(scenarios, list):
85 raise TypeError("'scenarios' must be an instance of 'list[ScenarioConfig]' " +
86 f"but not of '{type(scenarios)}'")
87 if any(not isinstance(item, ScenarioConfig) for item in scenarios):
88 raise TypeError("Each item in 'scenarios' must be an instance of 'ScenarioConfig'")
89
90 if not isinstance(n_jobs, int):
91 raise TypeError(f"'n_jobs' must be an instance of 'int' but not of '{type(n_jobs)}'")
92 if not (n_jobs == -1 or n_jobs > 0):
93 raise ValueError("'n_jobs' must be either -1 or a positive integer")
94
95 if max_working_memory_consumption is not None:
96 if not isinstance(max_working_memory_consumption, int) or \
97 max_working_memory_consumption <= 0:
98 raise ValueError("'max_working_memory_consumption' must be a positive integer")
99
100 if not callable(callback):
101 raise TypeError("'callback' mut be a callable " +
102 "'Callable[[ScadaData, ScenarioConfig, int], None]'")
103
104 # Get free memory in MB
105 ram_free_memory = psutil.virtual_memory().free * .000001
106 if max_working_memory_consumption is not None:
107 ram_free_memory = max(ram_free_memory, max_working_memory_consumption)
108
109 harddisk_free_memory = shutil.disk_usage(".").free * .000001
110
111 # Check memory requirements of each scenario
112 max_memory_required = max(s_config.memory_consumption_estimate
113 if s_config.memory_consumption_estimate is not None else 0
114 for s_config in scenarios)
115 if max_memory_required > ram_free_memory:
116 raise RuntimeError("Not enough working memory avaialble! " +
117 f"Requested {max_memory_required} MB but only " +
118 f"{ram_free_memory} MB are available")
119
120 if sum(s_config.memory_consumption_estimate
121 if s_config.memory_consumption_estimate is not None else 0
122 for s_config in scenarios) >= harddisk_free_memory:
123 warnings.warn("There might not be enough free space on the hard disk " +
124 "to store all scenario results")
125
126 # Compute number of processes that can run in parallel
127 n_available_cpus = cpu_count()
128 if n_jobs != -1:
129 n_available_cpus = min(n_available_cpus, n_jobs)
130
131 required_memory_bound = min(ram_free_memory, harddisk_free_memory)
132 n_max_parallel_scenarios = n_available_cpus
133 if max_memory_required != 0:
134 n_max_parallel_scenarios = int(required_memory_bound / max_memory_required)
135
136 n_parallel_scenarios = min(n_available_cpus, n_max_parallel_scenarios)
137
138 if any(s_config.f_msx_in is not None for s_config in scenarios):
139 n_parallel_scenarios = 1
140
141 # Run scenario simulations
142 scenarios_task = []
143 for scenario_idx, scenario in enumerate(scenarios):
144 scenarios_task.append((scenario, scenario_idx, callback))
145
146 with Pool(processes=n_parallel_scenarios, maxtasksperchild=1) as pool:
147 return pool.starmap(_run_scenario_simulation, scenarios_task)