1"""
2Module provides functions and classes for serialization.
3"""
4from typing import Any, Union
5from abc import abstractmethod, ABC
6from io import BufferedIOBase
7import pathlib
8import json
9import gzip
10import umsgpack
11import numpy as np
12import networkx
13import scipy
14
15
16SCIPY_BSRARRAY_ID = -3
17NETWORKX_GRAPH_ID = -2
18NUMPY_ARRAY_ID = -1
19SENSOR_CONFIG_ID = 0
20SCENARIO_CONFIG_ID = 1
21MODEL_UNCERTAINTY_ID = 2
22SENSOR_NOISE_ID = 3
23ABSOLUTE_GAUSSIAN_UNCERTAINTY_ID = 4
24RELATIVE_GAUSSIAN_UNCERTAINTY_ID = 5
25ABSOLUTE_UNIFORM_UNCERTAINTY_ID = 6
26RELATIVE_UNIFORM_UNCERTAINTY_ID = 7
27PERCENTAGE_DEVIATON_UNCERTAINTY_ID = 8
28ABSOLUTE_DEEP_UNIFORM_UNCERTAINTY_ID = 9
29RELATIVE_DEEP_UNIFORM_UNCERTAINTY_ID = 10
30ABSOLUTE_DEEP_GAUSSIAN_UNCERTAINTY_ID = 11
31RELATIVE_DEEP_GAUSSIAN_UNCERTAINTY_ID = 12
32ABSOLUTE_DEEP_UNCERTAINTY_ID = 13
33RELATIVE_DEEP_UNCERTAINTY_ID = 14
34SENSOR_FAULT_CONSTANT_ID = 15
35SENSOR_FAULT_DRIFT_ID = 16
36SENSOR_FAULT_GAUSSIAN_ID = 17
37SENSOR_FAULT_PERCENTAGE_ID = 18
38SENSOR_FAULT_STUCKATZERO_ID = 19
39LEAKAGE_ID = 20
40ABRUPT_LEAKAGE_ID = 21
41INCIPIENT_LEAKAGE_ID = 22
42SCADA_DATA_ID = 23
43SENSOR_ATTACK_OVERRIDE_ID = 24
44SENSOR_ATTACK_REPLAY_ID = 25
45NETWORK_TOPOLOGY_ID = 26
46PUMP_STATE_EVENT_ID = 28
47PUMP_SPEED_EVENT_ID = 29
48VALVE_STATE_EVENT_ID = 30
49SPECIESINJECTION_EVENT_ID = 31
50SIMPLE_CONTROL_ID = 32
51COMPLEX_CONTROL_ID = 33
52COMPLEX_CONTROL_CONDITION_ID = 34
53COMPLEX_CONTROL_ACTION_ID = 35
54COLOR_SCHEMES_ID = 36
55
56
57JSON_SERIALIZABLE = {
58}
59
60
[docs]
61def my_packb(data: Any) -> bytes:
62 """
63 Overriden `umsgpack.packb <https://msgpack-python.readthedocs.io/en/latest/api.html#msgpack.packb>`_
64 method to support custom serialization handlers.
65 """
66 return umsgpack.packb(data, ext_handlers=ext_handler_pack)
67
68
[docs]
69def my_unpackb(data: Any) -> Any:
70 """
71 Overriden `umsgpack.unpackb <https://msgpack-python.readthedocs.io/en/latest/api.html#msgpack.unpackb>`_
72 method to support custom serialization handlers.
73 """
74 return umsgpack.unpackb(data, ext_handlers=ext_handler_unpack)
75
76
[docs]
77def serializable(my_id: int, my_file_ext: str) -> Any:
78 """
79 Decorator for a serializable class -- i.e. subclass of
80 :class:`~epyt_flow.serialization.Serializable`.
81
82 This decorator registers a new class as a serializable class.
83
84 Parameters
85 ----------
86 my_id : `int`
87 ID of the class.
88 my_file_ext : `str`
89 File extension.
90 """
91 def wrapper(my_class):
92 if issubclass(my_class, JsonSerializable):
93 JSON_SERIALIZABLE[(my_class.__module__, my_class.__name__)] = my_class
94
95 @staticmethod
96 def unpackb(data: bytes) -> Any:
97 return my_class(**my_unpackb(data))
98 setattr(my_class, "unpackb", unpackb)
99
100 @staticmethod
101 def file_ext() -> str:
102 return my_file_ext
103 setattr(my_class, "file_ext", file_ext)
104
105 return umsgpack.ext_serializable(my_id)(my_class)
106
107 return wrapper
108
109
[docs]
110class Serializable(ABC):
111 """
112 Base class for a serializable class -- must be used in conjunction with the
113 :func:`~epyt_flow.serialization.serializable` decorator.
114 """
115 def __init__(self, _parent_path: str = "", **kwds):
116 self._parent_path = _parent_path
117
118 super().__init__(**kwds)
119
120 def __eq__(self, other) -> bool:
121 return self._parent_path == other.parent_path
122
123 @property
124 def parent_path(self) -> str:
125 return self._parent_path
126
[docs]
127 @abstractmethod
128 def get_attributes(self) -> dict:
129 """
130 Gets all attributes to be serialized -- these attributes are passed to the
131 constructor when the object is deserialized.
132
133 Returns
134 -------
135 `dict`
136 Dictionary of attributes -- i.e. pairs of attribute name + value.
137 """
138 return {}
139
[docs]
140 def file_ext(self) -> str:
141 """
142 Returns the file extension of this class.
143
144 This function is automatically implemented by applying the
145 :func:`~epyt_flow.serialization.serializable` decorator.
146
147 Returns
148 -------
149 `str`
150 File extension.
151 """
152 raise NotImplementedError()
153
[docs]
154 def packb(self) -> bytes:
155 """
156 Serializes the attributes of this object.
157
158 Returns
159 -------
160 `bytes`
161 Serialized object.
162 """
163 return my_packb(self.get_attributes())
164
[docs]
165 @staticmethod
166 def load(data: Union[bytes, BufferedIOBase]) -> Any:
167 """
168 Deserializes an instance of this class.
169
170 Parameters
171 ----------
172 data : `bytes` or `io.BufferedIOBase`
173 Serialized data or stream from which serialized data can be read.
174
175 Returns
176 -------
177 `Any`
178 Deserialized object.
179 """
180 return load(data)
181
[docs]
182 @staticmethod
183 def load_from_file(f_in: str, use_zip: bool = True) -> Any:
184 """
185 Deserializes an instance of this class from a (compressed) file.
186
187 Parameters
188 ----------
189 f_in : `str`
190 Path to the file from which to deserialize the object.
191 use_zip : `bool`, optional
192 If True, the file `f_in` is supposed to be zip compressed -- False,
193 if no compression was used when serializing the object.
194
195 The default is True.
196
197 Returns
198 -------
199 `Any`
200 Deserialized object.
201 """
202 return load_from_file(f_in, use_zip)
203
[docs]
204 def dump(self, stream_out: BufferedIOBase = None) -> Any:
205 """
206 Serializes this object to a byte array.
207
208 Parameters
209 ----------
210 stream_out : `io.BufferedIOBase`, optional
211 Stream to which the serialized object is written.
212 If None, the serialized object is returned as a `bytes` array.
213
214 The default is None.
215
216 Returns
217 -------
218 `bytes`
219 If `stream_out` is None, the serialized object is returned as a `bytes` array.
220 """
221 return dump(self, stream_out)
222
[docs]
223 def save_to_file(self, f_out: str, use_zip: bool = True) -> None:
224 """
225 Serializes this instance and stores it in a (compressed) file.
226
227 Parameters
228 ----------
229 f_in : `str`
230 Path to the file where this serialized object will be stored.
231 use_zip : `bool`, optional
232 If True, the file `f_in` will be zip compressed -- False,
233 if no compression is wanted.
234
235 The default is True.
236 """
237 if not f_out.endswith(self.file_ext()):
238 f_out += self.file_ext()
239
240 return save_to_file(f_out, self, use_zip)
241
242
[docs]
243def my_to_json(obj: Any) -> str:
244 """
245 Serializes a given object to JSON.
246
247 Parameters
248 ----------
249 obj : `Any`
250 Object to be serialized.
251
252 Returns
253 -------
254 `str`
255 JSON data.
256 """
257 def __json_serialize(obj_: Any) -> dict:
258 if isinstance(obj_, JsonSerializable):
259 my_class_name = (obj_.__module__, obj_.__class__.__name__)
260 return obj_.get_attributes() | {"__type__": my_class_name}
261 elif isinstance(obj_, np.ndarray):
262 return obj_.tolist()
263 else:
264 return obj_
265
266 return json.dumps(obj, default=__json_serialize)
267
268
[docs]
269def my_load_from_json(data: str) -> Any:
270 """
271 Loads (i.e. deserializes) an object from given JSON data.
272
273 Parameters
274 ----------
275 data : `str`
276 JSON data.
277
278 Returns
279 -------
280 `Any`
281 Deserialized object.
282 """
283 def __object_hook(obj: dict):
284 t = obj.get("__type__")
285 if not t:
286 return obj
287
288 if not (isinstance(t, (list, tuple)) and len(t) == 2 and
289 all(isinstance(x, str) for x in t)):
290 raise ValueError("Invalid __type__")
291
292 key = (t[0], t[1])
293 cls = JSON_SERIALIZABLE.get(key)
294 if cls is None:
295 raise ValueError(f"Type not allowed: {key}")
296
297 args = {k: v for k, v in obj.items() if k != "__type__"}
298 return cls(**args)
299
300 return json.loads(data, object_hook=__object_hook)
301
302
[docs]
303class JsonSerializable(Serializable):
304 """
305 Base class for JSON serializable classes.
306 Inherits from :class:`~epyt_flow.serialization.Serializable`.
307 """
308
[docs]
309 def to_json(self) -> str:
310 """
311 Serializes this instance to JSON.
312
313 Returns
314 -------
315 `str`
316 JSON data.
317 """
318 return my_to_json(self)
319
[docs]
320 @staticmethod
321 def load_from_json(data: str) -> Any:
322 """
323 Loads (i.e. deserializes) an instance of this class from given JSON data.
324
325 Parameters
326 ----------
327 data : `str`
328 JSON data.
329
330 Returns
331 -------
332 `Any`
333 Deserialized instance of this class.
334 """
335 return my_load_from_json(data)
336
[docs]
337 @staticmethod
338 def load_from_json_file(f_in: str) -> Any:
339 """
340 Deserializes an instance of this class from a JSON file.
341
342 Parameters
343 ----------
344 f_in : `str`
345 Path to the JSON file from which to deserialize the object.
346
347 Returns
348 -------
349 `Any`
350 Deserialized object.
351 """
352 with open(f_in, "r", encoding="utf-8") as f:
353 return my_load_from_json(f.read())
354
[docs]
355 def save_to_json_file(self, f_out: str) -> None:
356 """
357 Serializes this instance and stores it in a JSON file.
358
359 Parameters
360 ----------
361 f_in : `str`
362 Path to the JSON file where this serialized object will be stored.
363 """
364 if not f_out.endswith(self.file_ext()):
365 f_out += self.file_ext()
366
367 with open(f_out, "w", encoding="utf-8") as f:
368 f.write(self.to_json())
369
370
[docs]
371def load(data: Union[bytes, BufferedIOBase]) -> Any:
372 """
373 Deserializes data.
374
375 Parameters
376 ----------
377 data : `bytes` or `io.BufferedIOBase`
378 Serialized data or stream from which serialized data can be read.
379
380 Returns
381 -------
382 `Any`
383 Deserialized data.
384 """
385 if isinstance(data, bytes):
386 return my_unpackb(data)
387 elif isinstance(data, BufferedIOBase):
388 return my_unpackb(data.read())
389 else:
390 raise TypeError("Invalid type of 'data' -- must be either instance of 'bytes' or " +
391 f"'io.BufferedIOBase' but not of '{type(data)}'")
392
393
[docs]
394def dump(data: Any, stream_out: BufferedIOBase = None) -> Union[bytes, None]:
395 """
396 Serializes some given data to a byte array.
397
398 Parameters
399 ----------
400 stream_out : `io.BufferedIOBase`, optional
401 Stream to which the serialized object is written.
402 If None, the serialized object is returned as a `bytes` array.
403
404 The default is None.
405
406 Returns
407 -------
408 `bytes`
409 Serialized data if `stream_out` is None -- otherwise, nothing is returned.
410 """
411 if stream_out is None:
412 return my_packb(data)
413 else:
414 if not isinstance(stream_out, BufferedIOBase):
415 raise TypeError("'stream_out' must be an instance of 'io.BufferedIOBase' " +
416 f"but not of '{type(stream_out)}'")
417
418 stream_out.write(my_packb(data))
419
420
[docs]
421def load_from_file(f_in: str, use_compression: bool = True) -> Any:
422 """
423 Deserializes data from a (compressed) file.
424
425 Parameters
426 ----------
427 f_in : `str`
428 Path to the file from which to deserialize the data.
429 use_compression : `bool`, optional
430 If True, the file `f_in` is supposed to be gzip compressed -- False,
431 if no compression was used when serializing the data.
432
433 The default is True.
434
435 Returns
436 -------
437 `Any`
438 Deserialized data.
439 """
440 inst = None
441
442 if use_compression is False:
443 with open(f_in, "rb") as f:
444 inst = load(f.read())
445 else:
446 with gzip.open(f_in, "rb") as f:
447 inst = load(f.read())
448
449 if isinstance(inst, Serializable):
450 inst._parent_path = pathlib.Path(f_in).parent.resolve()
451
452 return inst
453
454
[docs]
455def save_to_file(f_out: str, data: Any, use_compression: bool = True) -> None:
456 """
457 Serializes data and stores it in a (compressed) file.
458
459 Parameters
460 ----------
461 f_in : `str`
462 Path to the file where the serialized data will be stored.
463 use_compression : `bool`, optional
464 If True, the file `f_in` will be gzip compressed -- False, if no compression is wanted.
465
466 The default is True.
467 """
468 if use_compression is False:
469 with open(f_out, "wb") as f:
470 umsgpack.pack(data, f, ext_handlers=ext_handler_pack)
471 else:
472 with gzip.open(f_out, "wb") as f:
473 f.write(dump(data))
474
475
476# Add numpy.ndarray, networkx.Graph, and scipy.sparse.bsr_array support
477def __encode_bsr_array(array: scipy.sparse.bsr_array
478 ) -> tuple[tuple[int, int], tuple[list[float], tuple[list[int], list[int]]]]:
479 shape = array.shape
480 data = []
481 rows = []
482 cols = []
483
484 array_ = array.tocsr() # Bug workaround: BSR arrays do not implement __getitem__
485 for i, j in zip(*array_.nonzero()):
486 rows.append(int(i))
487 cols.append(int(j))
488 data.append(float(array_[i, j]))
489
490 return shape, (data, (rows, cols))
491
492
493def __decode_bsr_array(ext_data: tuple[tuple[int, int],
494 tuple[list[float], tuple[list[int], list[int]]]]
495 ) -> scipy.sparse.bsr_array:
496 shape, data = ext_data
497 return scipy.sparse.bsr_array((data[0], (data[1][0], data[1][1])), shape=(shape[0], shape[1]))
498
499
500def __encode_numpy_array(array: np.ndarray) -> tuple[str, list[int], bytes]:
501 return array.dtype.descr[0][1], array.shape, array.tobytes()
502
503
504def __decode_numpy_array(ext_data: tuple[str, list[int], bytes]) -> np.ndarray:
505 dtype_descr, shape, buffer = ext_data
506 return np.frombuffer(buffer, dtype=np.dtype(dtype_descr)).reshape(shape)
507
508
509ext_handler_pack = {np.ndarray:
510 lambda arr: umsgpack.Ext(NUMPY_ARRAY_ID, umsgpack.packb(__encode_numpy_array(arr))),
511 networkx.Graph:
512 lambda graph:
513 umsgpack.Ext(NETWORKX_GRAPH_ID,
514 umsgpack.packb(networkx.edges(graph))),
515 scipy.sparse.bsr_array:
516 lambda arr: umsgpack.Ext(SCIPY_BSRARRAY_ID,
517 umsgpack.packb(__encode_bsr_array(arr)))}
518ext_handler_unpack = {NUMPY_ARRAY_ID: lambda ext: __decode_numpy_array(umsgpack.unpackb(ext.data)),
519 NETWORKX_GRAPH_ID:
520 lambda ext: networkx.node_link_graph(umsgpack.unpackb(ext.data)),
521 SCIPY_BSRARRAY_ID: lambda ext: __decode_bsr_array(umsgpack.unpackb(ext.data))}