Source code for epyt_flow.serialization

  1"""
  2Module provides functions and classes for serialization.
  3"""
  4from typing import Any, Union
  5from abc import abstractmethod, ABC
  6from io import BufferedIOBase
  7import pathlib
  8import json
  9import gzip
 10import umsgpack
 11import numpy as np
 12import networkx
 13import scipy
 14
 15
 16SCIPY_BSRARRAY_ID                       = -3
 17NETWORKX_GRAPH_ID                       = -2
 18NUMPY_ARRAY_ID                          = -1
 19SENSOR_CONFIG_ID                        = 0
 20SCENARIO_CONFIG_ID                      = 1
 21MODEL_UNCERTAINTY_ID                    = 2
 22SENSOR_NOISE_ID                         = 3
 23ABSOLUTE_GAUSSIAN_UNCERTAINTY_ID        = 4
 24RELATIVE_GAUSSIAN_UNCERTAINTY_ID        = 5
 25ABSOLUTE_UNIFORM_UNCERTAINTY_ID         = 6
 26RELATIVE_UNIFORM_UNCERTAINTY_ID         = 7
 27PERCENTAGE_DEVIATON_UNCERTAINTY_ID      = 8
 28ABSOLUTE_DEEP_UNIFORM_UNCERTAINTY_ID    = 9
 29RELATIVE_DEEP_UNIFORM_UNCERTAINTY_ID    = 10
 30ABSOLUTE_DEEP_GAUSSIAN_UNCERTAINTY_ID   = 11
 31RELATIVE_DEEP_GAUSSIAN_UNCERTAINTY_ID   = 12
 32ABSOLUTE_DEEP_UNCERTAINTY_ID            = 13
 33RELATIVE_DEEP_UNCERTAINTY_ID            = 14
 34SENSOR_FAULT_CONSTANT_ID                = 15
 35SENSOR_FAULT_DRIFT_ID                   = 16
 36SENSOR_FAULT_GAUSSIAN_ID                = 17
 37SENSOR_FAULT_PERCENTAGE_ID              = 18
 38SENSOR_FAULT_STUCKATZERO_ID             = 19
 39LEAKAGE_ID                              = 20
 40ABRUPT_LEAKAGE_ID                       = 21
 41INCIPIENT_LEAKAGE_ID                    = 22
 42SCADA_DATA_ID                           = 23
 43SENSOR_ATTACK_OVERRIDE_ID               = 24
 44SENSOR_ATTACK_REPLAY_ID                 = 25
 45NETWORK_TOPOLOGY_ID                     = 26
 46PUMP_STATE_EVENT_ID                     = 28
 47PUMP_SPEED_EVENT_ID                     = 29
 48VALVE_STATE_EVENT_ID                    = 30
 49SPECIESINJECTION_EVENT_ID               = 31
 50SIMPLE_CONTROL_ID                       = 32
 51COMPLEX_CONTROL_ID                      = 33
 52COMPLEX_CONTROL_CONDITION_ID            = 34
 53COMPLEX_CONTROL_ACTION_ID               = 35
 54COLOR_SCHEMES_ID                        = 36
 55
 56
 57JSON_SERIALIZABLE = {
 58}
 59
 60
[docs] 61def my_packb(data: Any) -> bytes: 62 """ 63 Overriden `umsgpack.packb <https://msgpack-python.readthedocs.io/en/latest/api.html#msgpack.packb>`_ 64 method to support custom serialization handlers. 65 """ 66 return umsgpack.packb(data, ext_handlers=ext_handler_pack)
67 68
[docs] 69def my_unpackb(data: Any) -> Any: 70 """ 71 Overriden `umsgpack.unpackb <https://msgpack-python.readthedocs.io/en/latest/api.html#msgpack.unpackb>`_ 72 method to support custom serialization handlers. 73 """ 74 return umsgpack.unpackb(data, ext_handlers=ext_handler_unpack)
75 76
[docs] 77def serializable(my_id: int, my_file_ext: str) -> Any: 78 """ 79 Decorator for a serializable class -- i.e. subclass of 80 :class:`~epyt_flow.serialization.Serializable`. 81 82 This decorator registers a new class as a serializable class. 83 84 Parameters 85 ---------- 86 my_id : `int` 87 ID of the class. 88 my_file_ext : `str` 89 File extension. 90 """ 91 def wrapper(my_class): 92 if issubclass(my_class, JsonSerializable): 93 JSON_SERIALIZABLE[(my_class.__module__, my_class.__name__)] = my_class 94 95 @staticmethod 96 def unpackb(data: bytes) -> Any: 97 return my_class(**my_unpackb(data)) 98 setattr(my_class, "unpackb", unpackb) 99 100 @staticmethod 101 def file_ext() -> str: 102 return my_file_ext 103 setattr(my_class, "file_ext", file_ext) 104 105 return umsgpack.ext_serializable(my_id)(my_class) 106 107 return wrapper
108 109
[docs] 110class Serializable(ABC): 111 """ 112 Base class for a serializable class -- must be used in conjunction with the 113 :func:`~epyt_flow.serialization.serializable` decorator. 114 """ 115 def __init__(self, _parent_path: str = "", **kwds): 116 self._parent_path = _parent_path 117 118 super().__init__(**kwds) 119 120 def __eq__(self, other) -> bool: 121 return self._parent_path == other.parent_path 122 123 @property 124 def parent_path(self) -> str: 125 return self._parent_path 126
[docs] 127 @abstractmethod 128 def get_attributes(self) -> dict: 129 """ 130 Gets all attributes to be serialized -- these attributes are passed to the 131 constructor when the object is deserialized. 132 133 Returns 134 ------- 135 `dict` 136 Dictionary of attributes -- i.e. pairs of attribute name + value. 137 """ 138 return {}
139
[docs] 140 def file_ext(self) -> str: 141 """ 142 Returns the file extension of this class. 143 144 This function is automatically implemented by applying the 145 :func:`~epyt_flow.serialization.serializable` decorator. 146 147 Returns 148 ------- 149 `str` 150 File extension. 151 """ 152 raise NotImplementedError()
153
[docs] 154 def packb(self) -> bytes: 155 """ 156 Serializes the attributes of this object. 157 158 Returns 159 ------- 160 `bytes` 161 Serialized object. 162 """ 163 return my_packb(self.get_attributes())
164
[docs] 165 @staticmethod 166 def load(data: Union[bytes, BufferedIOBase]) -> Any: 167 """ 168 Deserializes an instance of this class. 169 170 Parameters 171 ---------- 172 data : `bytes` or `io.BufferedIOBase` 173 Serialized data or stream from which serialized data can be read. 174 175 Returns 176 ------- 177 `Any` 178 Deserialized object. 179 """ 180 return load(data)
181
[docs] 182 @staticmethod 183 def load_from_file(f_in: str, use_zip: bool = True) -> Any: 184 """ 185 Deserializes an instance of this class from a (compressed) file. 186 187 Parameters 188 ---------- 189 f_in : `str` 190 Path to the file from which to deserialize the object. 191 use_zip : `bool`, optional 192 If True, the file `f_in` is supposed to be zip compressed -- False, 193 if no compression was used when serializing the object. 194 195 The default is True. 196 197 Returns 198 ------- 199 `Any` 200 Deserialized object. 201 """ 202 return load_from_file(f_in, use_zip)
203
[docs] 204 def dump(self, stream_out: BufferedIOBase = None) -> Any: 205 """ 206 Serializes this object to a byte array. 207 208 Parameters 209 ---------- 210 stream_out : `io.BufferedIOBase`, optional 211 Stream to which the serialized object is written. 212 If None, the serialized object is returned as a `bytes` array. 213 214 The default is None. 215 216 Returns 217 ------- 218 `bytes` 219 If `stream_out` is None, the serialized object is returned as a `bytes` array. 220 """ 221 return dump(self, stream_out)
222
[docs] 223 def save_to_file(self, f_out: str, use_zip: bool = True) -> None: 224 """ 225 Serializes this instance and stores it in a (compressed) file. 226 227 Parameters 228 ---------- 229 f_in : `str` 230 Path to the file where this serialized object will be stored. 231 use_zip : `bool`, optional 232 If True, the file `f_in` will be zip compressed -- False, 233 if no compression is wanted. 234 235 The default is True. 236 """ 237 if not f_out.endswith(self.file_ext()): 238 f_out += self.file_ext() 239 240 return save_to_file(f_out, self, use_zip)
241 242
[docs] 243def my_to_json(obj: Any) -> str: 244 """ 245 Serializes a given object to JSON. 246 247 Parameters 248 ---------- 249 obj : `Any` 250 Object to be serialized. 251 252 Returns 253 ------- 254 `str` 255 JSON data. 256 """ 257 def __json_serialize(obj_: Any) -> dict: 258 if isinstance(obj_, JsonSerializable): 259 my_class_name = (obj_.__module__, obj_.__class__.__name__) 260 return obj_.get_attributes() | {"__type__": my_class_name} 261 elif isinstance(obj_, np.ndarray): 262 return obj_.tolist() 263 else: 264 return obj_ 265 266 return json.dumps(obj, default=__json_serialize)
267 268
[docs] 269def my_load_from_json(data: str) -> Any: 270 """ 271 Loads (i.e. deserializes) an object from given JSON data. 272 273 Parameters 274 ---------- 275 data : `str` 276 JSON data. 277 278 Returns 279 ------- 280 `Any` 281 Deserialized object. 282 """ 283 def __object_hook(obj: dict): 284 t = obj.get("__type__") 285 if not t: 286 return obj 287 288 if not (isinstance(t, (list, tuple)) and len(t) == 2 and 289 all(isinstance(x, str) for x in t)): 290 raise ValueError("Invalid __type__") 291 292 key = (t[0], t[1]) 293 cls = JSON_SERIALIZABLE.get(key) 294 if cls is None: 295 raise ValueError(f"Type not allowed: {key}") 296 297 args = {k: v for k, v in obj.items() if k != "__type__"} 298 return cls(**args) 299 300 return json.loads(data, object_hook=__object_hook)
301 302
[docs] 303class JsonSerializable(Serializable): 304 """ 305 Base class for JSON serializable classes. 306 Inherits from :class:`~epyt_flow.serialization.Serializable`. 307 """ 308
[docs] 309 def to_json(self) -> str: 310 """ 311 Serializes this instance to JSON. 312 313 Returns 314 ------- 315 `str` 316 JSON data. 317 """ 318 return my_to_json(self)
319
[docs] 320 @staticmethod 321 def load_from_json(data: str) -> Any: 322 """ 323 Loads (i.e. deserializes) an instance of this class from given JSON data. 324 325 Parameters 326 ---------- 327 data : `str` 328 JSON data. 329 330 Returns 331 ------- 332 `Any` 333 Deserialized instance of this class. 334 """ 335 return my_load_from_json(data)
336
[docs] 337 @staticmethod 338 def load_from_json_file(f_in: str) -> Any: 339 """ 340 Deserializes an instance of this class from a JSON file. 341 342 Parameters 343 ---------- 344 f_in : `str` 345 Path to the JSON file from which to deserialize the object. 346 347 Returns 348 ------- 349 `Any` 350 Deserialized object. 351 """ 352 with open(f_in, "r", encoding="utf-8") as f: 353 return my_load_from_json(f.read())
354
[docs] 355 def save_to_json_file(self, f_out: str) -> None: 356 """ 357 Serializes this instance and stores it in a JSON file. 358 359 Parameters 360 ---------- 361 f_in : `str` 362 Path to the JSON file where this serialized object will be stored. 363 """ 364 if not f_out.endswith(self.file_ext()): 365 f_out += self.file_ext() 366 367 with open(f_out, "w", encoding="utf-8") as f: 368 f.write(self.to_json())
369 370
[docs] 371def load(data: Union[bytes, BufferedIOBase]) -> Any: 372 """ 373 Deserializes data. 374 375 Parameters 376 ---------- 377 data : `bytes` or `io.BufferedIOBase` 378 Serialized data or stream from which serialized data can be read. 379 380 Returns 381 ------- 382 `Any` 383 Deserialized data. 384 """ 385 if isinstance(data, bytes): 386 return my_unpackb(data) 387 elif isinstance(data, BufferedIOBase): 388 return my_unpackb(data.read()) 389 else: 390 raise TypeError("Invalid type of 'data' -- must be either instance of 'bytes' or " + 391 f"'io.BufferedIOBase' but not of '{type(data)}'")
392 393
[docs] 394def dump(data: Any, stream_out: BufferedIOBase = None) -> Union[bytes, None]: 395 """ 396 Serializes some given data to a byte array. 397 398 Parameters 399 ---------- 400 stream_out : `io.BufferedIOBase`, optional 401 Stream to which the serialized object is written. 402 If None, the serialized object is returned as a `bytes` array. 403 404 The default is None. 405 406 Returns 407 ------- 408 `bytes` 409 Serialized data if `stream_out` is None -- otherwise, nothing is returned. 410 """ 411 if stream_out is None: 412 return my_packb(data) 413 else: 414 if not isinstance(stream_out, BufferedIOBase): 415 raise TypeError("'stream_out' must be an instance of 'io.BufferedIOBase' " + 416 f"but not of '{type(stream_out)}'") 417 418 stream_out.write(my_packb(data))
419 420
[docs] 421def load_from_file(f_in: str, use_compression: bool = True) -> Any: 422 """ 423 Deserializes data from a (compressed) file. 424 425 Parameters 426 ---------- 427 f_in : `str` 428 Path to the file from which to deserialize the data. 429 use_compression : `bool`, optional 430 If True, the file `f_in` is supposed to be gzip compressed -- False, 431 if no compression was used when serializing the data. 432 433 The default is True. 434 435 Returns 436 ------- 437 `Any` 438 Deserialized data. 439 """ 440 inst = None 441 442 if use_compression is False: 443 with open(f_in, "rb") as f: 444 inst = load(f.read()) 445 else: 446 with gzip.open(f_in, "rb") as f: 447 inst = load(f.read()) 448 449 if isinstance(inst, Serializable): 450 inst._parent_path = pathlib.Path(f_in).parent.resolve() 451 452 return inst
453 454
[docs] 455def save_to_file(f_out: str, data: Any, use_compression: bool = True) -> None: 456 """ 457 Serializes data and stores it in a (compressed) file. 458 459 Parameters 460 ---------- 461 f_in : `str` 462 Path to the file where the serialized data will be stored. 463 use_compression : `bool`, optional 464 If True, the file `f_in` will be gzip compressed -- False, if no compression is wanted. 465 466 The default is True. 467 """ 468 if use_compression is False: 469 with open(f_out, "wb") as f: 470 umsgpack.pack(data, f, ext_handlers=ext_handler_pack) 471 else: 472 with gzip.open(f_out, "wb") as f: 473 f.write(dump(data))
474 475 476# Add numpy.ndarray, networkx.Graph, and scipy.sparse.bsr_array support 477def __encode_bsr_array(array: scipy.sparse.bsr_array 478 ) -> tuple[tuple[int, int], tuple[list[float], tuple[list[int], list[int]]]]: 479 shape = array.shape 480 data = [] 481 rows = [] 482 cols = [] 483 484 array_ = array.tocsr() # Bug workaround: BSR arrays do not implement __getitem__ 485 for i, j in zip(*array_.nonzero()): 486 rows.append(int(i)) 487 cols.append(int(j)) 488 data.append(float(array_[i, j])) 489 490 return shape, (data, (rows, cols)) 491 492 493def __decode_bsr_array(ext_data: tuple[tuple[int, int], 494 tuple[list[float], tuple[list[int], list[int]]]] 495 ) -> scipy.sparse.bsr_array: 496 shape, data = ext_data 497 return scipy.sparse.bsr_array((data[0], (data[1][0], data[1][1])), shape=(shape[0], shape[1])) 498 499 500def __encode_numpy_array(array: np.ndarray) -> tuple[str, list[int], bytes]: 501 return array.dtype.descr[0][1], array.shape, array.tobytes() 502 503 504def __decode_numpy_array(ext_data: tuple[str, list[int], bytes]) -> np.ndarray: 505 dtype_descr, shape, buffer = ext_data 506 return np.frombuffer(buffer, dtype=np.dtype(dtype_descr)).reshape(shape) 507 508 509ext_handler_pack = {np.ndarray: 510 lambda arr: umsgpack.Ext(NUMPY_ARRAY_ID, umsgpack.packb(__encode_numpy_array(arr))), 511 networkx.Graph: 512 lambda graph: 513 umsgpack.Ext(NETWORKX_GRAPH_ID, 514 umsgpack.packb(networkx.edges(graph))), 515 scipy.sparse.bsr_array: 516 lambda arr: umsgpack.Ext(SCIPY_BSRARRAY_ID, 517 umsgpack.packb(__encode_bsr_array(arr)))} 518ext_handler_unpack = {NUMPY_ARRAY_ID: lambda ext: __decode_numpy_array(umsgpack.unpackb(ext.data)), 519 NETWORKX_GRAPH_ID: 520 lambda ext: networkx.node_link_graph(umsgpack.unpackb(ext.data)), 521 SCIPY_BSRARRAY_ID: lambda ext: __decode_bsr_array(umsgpack.unpackb(ext.data))}