1"""
2Module provides a base class for custom control modules.
3"""
4from abc import abstractmethod, ABC
5import warnings
6import numpy as np
7from epanet_plus import EPyT, EpanetConstants
8
9from . import ScadaData
10
11
[docs]
12class CustomControlModule(ABC):
13 """
14 Base class for a custom control module.
15
16 Attributes
17 ----------
18 epanet_api : `epanet_plus.EPyT <https://epanet-plus.readthedocs.io/en/stable/api.html#epanet_plus.epanet_toolkit.EPyT>`_
19 API to EPANET and EPANET-MSX. Is set in :func:`init`.
20 """
21 def __init__(self, **kwds):
22 self._epanet_api = None
23
24 super().__init__(**kwds)
25
[docs]
26 def init(self, epanet_api: EPyT) -> None:
27 """
28 Initializes the control module.
29
30 Parameters
31 ----------
32 epanet_api : `epanet_plus.EPyT <https://epanet-plus.readthedocs.io/en/stable/api.html#epanet_plus.epanet_toolkit.EPyT>`_
33 API to EPANET for implementing the control module.
34 """
35 if not isinstance(epanet_api, EPyT):
36 raise TypeError("'epanet_api' must be an instance of 'epanet_plus.EPyT' but not of " +
37 f"'{type(epanet_api)}'")
38
39 self._epanet_api = epanet_api
40
[docs]
41 def set_pump_status(self, pump_id: str, status: int) -> None:
42 """
43 Sets the status of a pump.
44
45 Parameters
46 ----------
47 pump_id : `str`
48 ID of the pump for which the status is set.
49 status : `int`
50 New status of the pump -- either active (i.e. open) or inactive (i.e. closed).
51
52 Must be one of the following constants defined in
53 :class:`~epyt_flow.simulation.events.actuator_events.ActuatorConstants`:
54
55 - EN_CLOSED = 0
56 - EN_OPEN = 1
57 """
58 pump_link_idx = self._epanet_api.get_link_idx(pump_id)
59 self._epanet_api.setlinkvalue(pump_link_idx, EpanetConstants.EN_STATUS, status)
60
[docs]
61 def set_pump_speed(self, pump_id: str, speed: float) -> None:
62 """
63 Sets the speed of a pump.
64
65 Parameters
66 ----------
67 pump_id : `str`
68 ID of the pump for which the pump speed is set.
69 speed : `float`
70 New pump speed.
71 """
72 pump_idx = self._epanet_api.get_link_idx(pump_id)
73 pattern_idx = self._epanet_api.getlinkvalue(pump_idx, EpanetConstants.EN_LINKPATTERN)
74
75 if pattern_idx == 0:
76 warnings.warn(f"No pattern for pump '{pump_id}' found -- a new pattern is created")
77
78 pattern_id = f"pump_speed_{pump_id}"
79 self._epanet_api.add_pattern(pattern_id, [speed])
80 pattern_idx = self._epanet_api.getpatternindex(pattern_id)
81 self._epanet_api.setlinkvalue(pump_idx, EpanetConstants.EN_LINKPATTERN, pattern_idx)
82
83 self._epanet_api.setpattern(pattern_idx, [speed], 1)
84
[docs]
85 def set_valve_status(self, valve_id: str, status: int) -> None:
86 """
87 Sets the status of a valve.
88
89 Parameters
90 ----------
91 valve_id : `str`
92 ID of the valve for which the status is set.
93 status : `int`
94 New status of the valve -- either open or closed.
95
96 Must be one of the following constants defined in
97 :class:`~epyt_flow.simulation.events.actuator_events.ActuatorConstants`:
98
99 - EN_CLOSED = 0
100 - EN_OPEN = 1
101 """
102 valve_link_idx = self._epanet_api.get_link_idx(valve_id)
103 self._epanet_api.setlinkvalue(valve_link_idx, EpanetConstants.EN_STATUS, status)
104
[docs]
105 def set_node_quality_source_value(self, node_id: str, pattern_id: str,
106 qual_value: float) -> None:
107 """
108 Sets the quality source at a particular node to a specific value -- e.g.
109 setting the chlorine concentration injection to a specified value.
110
111 Parameters
112 ----------
113 node_id : `str`
114 ID of the node.
115 pattern_id : `str`
116 ID of the quality pattern at the specific node.
117 qual_value : `float`
118 New quality source value.
119 """
120 node_idx = self._epanet_api.get_node_idx(node_id)
121 pattern_idx = self._epanet_api.getpatternindex(pattern_id)
122 self._epanet_api.setnodevalue(node_idx, EpanetConstants.EN_SOURCEQUAL, 1)
123 self._epanet_api.set_pattern(pattern_idx, np.array([qual_value]))
124
[docs]
125 @abstractmethod
126 def step(self, scada_data: ScadaData) -> None:
127 """
128 Implements the control algorithm -- i.e. mapping of sensor reading to actions.
129
130 Parameters
131 ----------
132 scada_data : :class:`~epyt_flow.simulation.scada.scada_data.ScadaData`
133 Sensor readings.
134 """
135 raise NotImplementedError()