Source code for epyt_flow.simulation.scada.custom_control

  1"""
  2Module provides a base class for custom control modules.
  3"""
  4from abc import abstractmethod, ABC
  5import warnings
  6import numpy as np
  7from epanet_plus import EPyT, EpanetConstants
  8
  9from . import ScadaData
 10
 11
[docs] 12class CustomControlModule(ABC): 13 """ 14 Base class for a custom control module. 15 16 Attributes 17 ---------- 18 epanet_api : `epanet_plus.EPyT <https://epanet-plus.readthedocs.io/en/stable/api.html#epanet_plus.epanet_toolkit.EPyT>`_ 19 API to EPANET and EPANET-MSX. Is set in :func:`init`. 20 """ 21 def __init__(self, **kwds): 22 self._epanet_api = None 23 24 super().__init__(**kwds) 25
[docs] 26 def init(self, epanet_api: EPyT) -> None: 27 """ 28 Initializes the control module. 29 30 Parameters 31 ---------- 32 epanet_api : `epanet_plus.EPyT <https://epanet-plus.readthedocs.io/en/stable/api.html#epanet_plus.epanet_toolkit.EPyT>`_ 33 API to EPANET for implementing the control module. 34 """ 35 if not isinstance(epanet_api, EPyT): 36 raise TypeError("'epanet_api' must be an instance of 'epanet_plus.EPyT' but not of " + 37 f"'{type(epanet_api)}'") 38 39 self._epanet_api = epanet_api
40
[docs] 41 def set_pump_status(self, pump_id: str, status: int) -> None: 42 """ 43 Sets the status of a pump. 44 45 Parameters 46 ---------- 47 pump_id : `str` 48 ID of the pump for which the status is set. 49 status : `int` 50 New status of the pump -- either active (i.e. open) or inactive (i.e. closed). 51 52 Must be one of the following constants defined in 53 :class:`~epyt_flow.simulation.events.actuator_events.ActuatorConstants`: 54 55 - EN_CLOSED = 0 56 - EN_OPEN = 1 57 """ 58 pump_link_idx = self._epanet_api.get_link_idx(pump_id) 59 self._epanet_api.setlinkvalue(pump_link_idx, EpanetConstants.EN_STATUS, status)
60
[docs] 61 def set_pump_speed(self, pump_id: str, speed: float) -> None: 62 """ 63 Sets the speed of a pump. 64 65 Parameters 66 ---------- 67 pump_id : `str` 68 ID of the pump for which the pump speed is set. 69 speed : `float` 70 New pump speed. 71 """ 72 pump_idx = self._epanet_api.get_link_idx(pump_id) 73 pattern_idx = self._epanet_api.getlinkvalue(pump_idx, EpanetConstants.EN_LINKPATTERN) 74 75 if pattern_idx == 0: 76 warnings.warn(f"No pattern for pump '{pump_id}' found -- a new pattern is created") 77 78 pattern_id = f"pump_speed_{pump_id}" 79 self._epanet_api.add_pattern(pattern_id, [speed]) 80 pattern_idx = self._epanet_api.getpatternindex(pattern_id) 81 self._epanet_api.setlinkvalue(pump_idx, EpanetConstants.EN_LINKPATTERN, pattern_idx) 82 83 self._epanet_api.setpattern(pattern_idx, [speed], 1)
84
[docs] 85 def set_valve_status(self, valve_id: str, status: int) -> None: 86 """ 87 Sets the status of a valve. 88 89 Parameters 90 ---------- 91 valve_id : `str` 92 ID of the valve for which the status is set. 93 status : `int` 94 New status of the valve -- either open or closed. 95 96 Must be one of the following constants defined in 97 :class:`~epyt_flow.simulation.events.actuator_events.ActuatorConstants`: 98 99 - EN_CLOSED = 0 100 - EN_OPEN = 1 101 """ 102 valve_link_idx = self._epanet_api.get_link_idx(valve_id) 103 self._epanet_api.setlinkvalue(valve_link_idx, EpanetConstants.EN_STATUS, status)
104
[docs] 105 def set_node_quality_source_value(self, node_id: str, pattern_id: str, 106 qual_value: float) -> None: 107 """ 108 Sets the quality source at a particular node to a specific value -- e.g. 109 setting the chlorine concentration injection to a specified value. 110 111 Parameters 112 ---------- 113 node_id : `str` 114 ID of the node. 115 pattern_id : `str` 116 ID of the quality pattern at the specific node. 117 qual_value : `float` 118 New quality source value. 119 """ 120 node_idx = self._epanet_api.get_node_idx(node_id) 121 pattern_idx = self._epanet_api.getpatternindex(pattern_id) 122 self._epanet_api.setnodevalue(node_idx, EpanetConstants.EN_SOURCEQUAL, 1) 123 self._epanet_api.set_pattern(pattern_idx, np.array([qual_value]))
124
[docs] 125 @abstractmethod 126 def step(self, scada_data: ScadaData) -> None: 127 """ 128 Implements the control algorithm -- i.e. mapping of sensor reading to actions. 129 130 Parameters 131 ---------- 132 scada_data : :class:`~epyt_flow.simulation.scada.scada_data.ScadaData` 133 Sensor readings. 134 """ 135 raise NotImplementedError()