"""
System module can be used to retrieve System information from AssetCentral.
Classes are provided for individual Systems as well as groups of Systems (SystemSet).
"""
from __future__ import annotations
import math
import itertools
import logging
from typing import Union, List, Tuple, Dict
from datetime import datetime
from functools import cached_property
from operator import itemgetter
import pandas as pd
from sailor import _base, sap_iot
from sailor.utils.utils import WarningAdapter
from .utils import (AssetcentralEntity, _AssetcentralField, AssetcentralEntitySet,
_ac_application_url, _ac_fetch_data)
from .equipment import find_equipment, EquipmentSet
from .indicators import (IndicatorSet, SystemIndicator, SystemAggregatedIndicator, AggregatedIndicatorSet,
SystemIndicatorSet, SystemAggregatedIndicatorSet)
from .constants import VIEW_SYSTEMS
_SYSTEM_FIELDS = [
_AssetcentralField('name', 'internalId'),
_AssetcentralField('model_name', 'model',
query_transformer=_base.masterdata._qt_non_filterable('model_name')),
_AssetcentralField('status_text', 'systemStatusDescription',
query_transformer=_base.masterdata._qt_non_filterable('status_text')),
_AssetcentralField('short_description', 'shortDescription'),
_AssetcentralField('class_name', 'className'),
_AssetcentralField('id', 'systemId'),
_AssetcentralField('model_id', 'modelID',
query_transformer=_base.masterdata._qt_non_filterable('model_id')),
_AssetcentralField('template_id', 'templateID',
query_transformer=_base.masterdata._qt_non_filterable('template_id')),
_AssetcentralField('_status', 'status'),
_AssetcentralField('_model_version', 'modelVersion'),
_AssetcentralField('_system_provider', 'systemProvider'),
_AssetcentralField('_system_version', 'systemVersion'),
_AssetcentralField('_created_on', 'createdOn'),
_AssetcentralField('_changed_on', 'changedOn'),
_AssetcentralField('_published_on', 'publishedOn'),
_AssetcentralField('_source', 'source'),
_AssetcentralField('_image_URL', 'imageURL'),
_AssetcentralField('_class_id', 'classID'),
_AssetcentralField('_subclass', 'subclass'),
_AssetcentralField('_subclass_id', 'subclassID'),
_AssetcentralField('_system_provider_id', 'systemProviderID'),
_AssetcentralField('_source_search_terms', 'sourceSearchTerms'),
_AssetcentralField('_system_provider_search_terms', 'systemProviderSearchTerms'),
_AssetcentralField('_operator', 'operator'),
_AssetcentralField('_operator_id', 'operatorID'),
_AssetcentralField('_completeness', 'completeness'),
]
LOG = logging.getLogger(__name__)
LOG.addHandler(logging.NullHandler())
LOG = WarningAdapter(LOG)
# Path = list[(str, int)] needs Python 3.9
Path = List[Tuple[str, int]]
[docs]@_base.add_properties
class System(AssetcentralEntity):
"""AssetCentral System Object."""
_field_map = {field.our_name: field for field in _SYSTEM_FIELDS}
@staticmethod
def _traverse_components(component, model_order, equipment_ids, system_ids):
"""Traverse component structure recursively, starting from 'component.' Pydocstyle does not know punctuation."""
compd = {}
compd['key'] = (component['model'], model_order)
compd['id'] = component['id']
compd['name'] = component['name']
compd['order'] = component['order']
if component['order'] is not None:
if component['objectType'] == 'EQU':
equipment_ids.append(component['id'])
else:
system_ids.append(component['id'])
compd['object_type'] = component['objectType']
if 'childNodes' in component.keys():
component['childNodes'] = sorted(component['childNodes'], key=itemgetter('model', 'order'))
compd['child_list'] = []
for _, comps_by_model in itertools.groupby(component['childNodes'], itemgetter('model')):
for model_order, c in enumerate(comps_by_model):
compd0, equipment_ids, system_ids = System._traverse_components(c, model_order,
equipment_ids, system_ids)
compd['child_list'].append(compd0)
compd['child_list'] = sorted(compd['child_list'], key=itemgetter('order'))
return compd, equipment_ids, system_ids
def _update_components(self, component):
"""Add indicators and replace id with model_id in the key."""
if component['object_type'] == 'EQU':
obj = self.__hierarchy['equipment'].filter(id=component['id'])[0]
component['indicators'] = self.__hierarchy['indicators'][component['id']]
else:
if component['id'] == self.id:
obj = self
else:
obj = self.__hierarchy['systems'].filter(id=component['id'])[0]
component['key'] = (obj.model_id, component['key'][1])
if 'child_list' in component.keys():
component['child_nodes'] = {}
for child in component['child_list']:
self._update_components(child)
component['child_nodes'][child['key']] = child
del component['child_nodes'][child['key']]['key']
del component['child_list']
@cached_property
def _hierarchy(self):
"""Prepare component tree and cache it."""
endpoint_url = _ac_application_url() + VIEW_SYSTEMS + f'({self.id})' + '/components'
comps = _ac_fetch_data(endpoint_url)[0]
self.__hierarchy = {}
self.__hierarchy['component_tree'], equipment_ids, system_ids = System._traverse_components(comps, 0, [], [])
if system_ids:
self.__hierarchy['systems'] = find_systems(id=system_ids)
else:
self.__hierarchy['systems'] = SystemSet([])
self.__hierarchy['indicators'] = {}
if equipment_ids:
self.__hierarchy['equipment'] = find_equipment(id=equipment_ids)
for equi in self.__hierarchy['equipment']:
self.__hierarchy['indicators'][equi.id] = equi.find_equipment_indicators(type='Measured')
else:
self.__hierarchy['equipment'] = EquipmentSet([])
self._update_components(self.__hierarchy['component_tree'])
del self.__hierarchy['component_tree']['key']
return self.__hierarchy
@staticmethod
def _create_selection_dictionary(comp_tree):
"""Create a selection dictionary recursively based on 'comp_tree.' Pydocstyle does not know punctuation."""
selection = {}
selection['object_type'] = comp_tree['object_type']
if comp_tree['object_type'] == 'EQU':
selection['indicators'] = comp_tree['indicators']
if 'child_nodes' in comp_tree.keys():
selection['child_nodes'] = []
for child in comp_tree['child_nodes']:
sel = System._create_selection_dictionary(comp_tree['child_nodes'][child])
sel['key'] = child
selection['child_nodes'].append(sel)
return selection
@staticmethod
def _find_first_equipment(comp_tree, level, equi_id, equi_level):
"""Find first equipment on highest level of a tree recursively."""
for child in comp_tree['child_nodes']:
if comp_tree['child_nodes'][child]['object_type'] == 'EQU':
return comp_tree['child_nodes'][child]['id'], level
else:
if (level + 1) < equi_level:
equi_id, equi_level = System._find_first_equipment(comp_tree['child_nodes'][child], level+1,
equi_id, equi_level)
return equi_id, equi_level
[docs] def get_leading_equipment(self, path: Path = None):
"""Get leading piece of equipment (by path or default).
.. versionadded:: 1.9.0
"""
if path:
child_nodes = self._hierarchy['component_tree']['child_nodes']
for p in path:
if p not in child_nodes:
raise RuntimeError(f'Path entry {p} not found in system {self.name}')
object_id = child_nodes[p]['id']
child_nodes = child_nodes[p]['child_nodes']
return object_id
else:
# no path given: find first equipment on highest level
# looks like a breadth-first search problem, but DFS is more efficient
equi_id, _ = System._find_first_equipment(self._hierarchy['component_tree'], 0, 0, math.inf)
return equi_id
[docs] def get_indicator_data(self, start: Union[str, pd.Timestamp, datetime.timestamp, datetime.date],
end: Union[str, pd.Timestamp, datetime.timestamp, datetime.date],
indicator_set: IndicatorSet = None, *,
timeout: Union[str, pd.Timedelta, datetime.timedelta] = None) -> sap_iot.TimeseriesDataset:
"""
Get timeseries data for all Equipment in the System.
This is a wrapper for :meth:`sailor.sap_iot.fetch.get_indicator_data` that limits the fetch query
to the equipment in this System.
Each component equipment will be returned as separate rows in the dataset,
potentially making the dataset very sparse.
Parameters
----------
start
Begin of time series data.
end
End of time series data.
indicator_set
IndicatorSet for which timeseries data is returned.
timeout
Maximum amount of time the request may take. Can be specified as an ISO 8601 string
(like `PT2M` for 2-minute duration) or as a pandas.Timedelta or datetime.timedelta object.
If None, there is no time limit.
"""
if indicator_set is None:
indicator_set = sum((equi.find_equipment_indicators() for equi in self._hierarchy['equipment']),
IndicatorSet([]))
LOG.debug('Requesting indicator data of system "%s" for %d indicators.', self.id, len(indicator_set))
return sap_iot.get_indicator_data(start, end, indicator_set, self._hierarchy['equipment'], timeout=timeout)
[docs]class SystemSet(AssetcentralEntitySet):
"""Class representing a group of Systems."""
_element_type = System
_method_defaults = {
'plot_distribution': {
'by': 'model_name',
},
}
[docs] def get_leading_equipment(self, path: Path = None) -> pd.DataFrame:
"""Get a DataFrame that contains all system ids together with their leading equipment id.
.. versionadded:: 1.9.0
"""
leading_equipments = {system.get_leading_equipment(path=path): system.id for system in self}
return pd.DataFrame([leading_equipments.keys(), leading_equipments.values()],
index=['equipment_id', 'system_id']).transpose()
[docs] def get_indicator_data(self, start: Union[str, pd.Timestamp, datetime.timestamp, datetime.date],
end: Union[str, pd.Timestamp, datetime.timestamp, datetime.date],
indicator_set: IndicatorSet = None, *,
timeout: Union[str, pd.Timedelta, datetime.timedelta] = None) -> sap_iot.TimeseriesDataset:
"""
Fetch data for a set of systems for all component equipment of each system.
This is a wrapper for :meth:`sailor.sap_iot.fetch.get_indicator_data` that limits the fetch query
to the equipment in this SystemSet.
Similar to ``System.get_indicator_data`` each component will be returned as separate rows in the dataset,
potentially making the dataset very sparse.
Parameters
----------
start
Begin of time series data.
end
End of time series data.
indicator_set
IndicatorSet for which timeseries data is returned.
timeout
Maximum amount of time the request may take. Can be specified as an ISO 8601 string
(like `PT2M` for 2-minute duration) or as a pandas.Timedelta or datetime.timedelta object.
If None, there is no time limit.
"""
all_equipment = sum((system._hierarchy['equipment'] for system in self), EquipmentSet([]))
if indicator_set is None:
indicator_set = sum((equipment.find_equipment_indicators() for equipment in all_equipment),
IndicatorSet([]))
LOG.debug("Requesting indicator data of system set for %d equipments and %d indicators.",
len(all_equipment), len(indicator_set))
return sap_iot.get_indicator_data(start, end, indicator_set, all_equipment, timeout=timeout)
@staticmethod
def _fill_nones(sel_nodes, indicator_list, none_positions, equi_counter):
"""Fill None for indicators of missing subtrees recursively."""
for node in sel_nodes:
if node['object_type'] == 'EQU':
equi_counter += 1
for _ in node['indicators']:
none_positions.add(len(indicator_list))
indicator_list.append(None)
if 'child_nodes' in node.keys():
equi_counter = SystemSet._fill_nones(node['child_nodes'], indicator_list, none_positions, equi_counter)
return equi_counter
@staticmethod
def _map_comp_info(sel_nodes, sys_nodes, indicator_list, none_positions, equipment, equi_counter):
"""Map selection dictionary against component dictionary recursively."""
for node in sel_nodes:
if node['object_type'] == 'EQU':
if (node['key'] in sys_nodes.keys()):
equipment[sys_nodes[node['key']]['id']] = equi_counter
equi_counter += 1
for indicator in node['indicators']:
if (node['key'] in sys_nodes.keys()) and (indicator in sys_nodes[node['key']]['indicators']):
indicator_list.append((sys_nodes[node['key']]['id'], indicator))
else:
none_positions.add(len(indicator_list))
indicator_list.append(None)
if 'child_nodes' in node.keys():
if node['key'] in sys_nodes.keys():
equi_counter = SystemSet._map_comp_info(node['child_nodes'], sys_nodes[node['key']]['child_nodes'],
indicator_list, none_positions, equipment, equi_counter)
else:
equi_counter = SystemSet._fill_nones(node['child_nodes'], indicator_list, none_positions,
equi_counter)
return equi_counter
def _map_component_information(self, selection: Dict = None):
"""Map selection dictionary against component dictionary of systems in a system set.
Parameters
----------
selection
dictionary of pieces of equipment and indicators that are to be selected
if selection is None or {}, all pieces of equipment and indicators are selected that appear for all
systems of the system set
Returns
-------
system_indicators: dict
dictionary of selected indicators
system_equipment: dict
dictionary of pieces of equipment and their positions
"""
system_indicators = {}
system_equipment = {}
none_positions = set()
intersection = False
if not selection:
# build selection dictionary from one of the systems
intersection = True
selection = System._create_selection_dictionary(self[0]._hierarchy['component_tree'])
for system in self:
indicator_list = []
equipment = {}
equi_counter = 0
equi_counter = SystemSet._map_comp_info(selection['child_nodes'],
system._hierarchy['component_tree']['child_nodes'],
indicator_list, none_positions, equipment, equi_counter)
system_indicators[system.id] = indicator_list
system_equipment[system.id] = equipment
if intersection:
# keep only indicators that appear for all systems
none_positions = list(none_positions)[::-1]
for system in self:
for p in none_positions:
del system_indicators[system.id][p]
# keep only pieces of equipment for relevant indicators
keep_equi = set()
for indicator in system_indicators[self[0].id]:
keep_equi.add(indicator[0])
equi_map = {}
c = 0
for equi in system_equipment[self[0].id]:
if equi in keep_equi:
equi_map[system_equipment[self[0].id][equi]] = c
c += 1
sys_equipment = {}
for system in self:
equipment = {}
for equi in system_equipment[system.id]:
if system_equipment[system.id][equi] in equi_map.keys():
equipment[equi] = equi_map[system_equipment[system.id][equi]]
sys_equipment[system.id] = equipment
system_equipment = sys_equipment
return system_indicators, system_equipment
def _get_leading_equipment_and_equipment_counter(self, system_equipment: Dict, lead_equi_path: Path = None):
"""Get leading equipment and equipment counter."""
def equi_counter(equi_id, sys):
"""Get equipment counter (function makes apply() nicer)."""
if equi_id in system_equipment[sys].keys():
return system_equipment[sys][equi_id]
else:
return -1
# get leading piece of equipment for every piece of equipment in the hierarchy trees of a system set
for i in range(len(self)):
eq = self[i]._hierarchy['equipment'].as_df()[['id']]
eq.rename(columns={"id": "equipment_id"}, inplace=True)
eq['leading_equipment'] = self[i].get_leading_equipment(lead_equi_path)
eq['equi_counter'] = eq.equipment_id.apply(equi_counter, sys=self[i].id)
if i == 0:
equi_info = eq
else:
equi_info = equi_info.append(eq)
# category gets lost in append(), so we have to do it here, copy = False does not work
return equi_info.astype({'equipment_id': 'category', 'leading_equipment': 'category'})
[docs]def find_systems(*, extended_filters=(), **kwargs) -> SystemSet:
"""Fetch Systems from AssetCentral with the applied filters, return a SystemSet.
This method supports the usual filter criteria, i.e.
- Any named keyword arguments applied as equality filters, i.e. the name of the System property is checked
against the value of the keyword argument. If the value of the keyword argument is an iterable (e.g. a list)
then all objects matching any of the values in the iterable are returned.
Parameters
----------
extended_filters
See :ref:`filter`.
**kwargs
See :ref:`filter`.
Examples
--------
Find all Systems with name 'MySystem'::
find_systems(name='MySystem')
Find all Systems which either have the name 'MySystem' or the name 'MyOtherSystem'::
find_systems(name=['MySystem', 'MyOtherSystem'])
If multiple named arguments are provided then *all* conditions have to match.
Example
-------
Find all Systems with name 'MySystem' which also is published (status_text = 'Published')::
find_systems(name='MySystem', status_text='Published')
The ``extended_filters`` parameter can be used to specify filters that can not be expressed as an equality. Each
extended_filter needs to be provided as a string, multiple filters can be passed as a list of strings. As above,
all filter criteria need to match. Extended filters can be freely combined with named arguments. Here, too all
filter criteria need to match for a System to be returned.
Example
-------
Find all Systems with creation date higher or equal to 01.01.2020::
find_systems(extended_filters=['created_on >= "2020-01-01"'])
"""
unbreakable_filters, breakable_filters = \
_base.parse_filter_parameters(kwargs, extended_filters, System._field_map)
endpoint_url = _ac_application_url() + VIEW_SYSTEMS
object_list = _ac_fetch_data(endpoint_url, unbreakable_filters, breakable_filters, paginate=True)
LOG.debug('Found %d systems for the specified filters.', len(object_list))
return SystemSet([System(obj) for obj in object_list])
[docs]def create_analysis_table(system_set: SystemSet, indicator_data: sap_iot.TimeseriesDataset, selection: Dict = None,
leading_equipment_path: Path = None) -> sap_iot.TimeseriesDataset:
"""Create analysis table for a system set.
An analysis table is a table in which each row contains all indicator data that are valid for a system and a
timestamp. The system is represented by its leading piece of equipment. The data columns are represented by
SystemIndicators or SystemAggregatedIndicators, i.e. their key consists of information about the indicator,
the equipment counter, and for SystemAggregatedIndicators the aggregation function.
.. versionadded:: 1.9.0
Parameters
----------
system_set
Set of systems for which data is collected
indicator_data
TimeseriesDataset containing the relevant indicator data
selection
dictionary that contains information about the pieces of equipment and indicators that are to be selected
leading_equipment_path
path to the leading piece of equipment of a system
"""
_, system_equipment = system_set._map_component_information(selection)
equi_info = system_set._get_leading_equipment_and_equipment_counter(system_equipment, leading_equipment_path)
agg = isinstance(indicator_data.indicator_set, AggregatedIndicatorSet)
id_df = indicator_data.as_df(speaking_names=False).reset_index()
# join with leading equipment
id_df = id_df.merge(equi_info)
# drop equipment id
id_df.drop(['equipment_id'], axis=1, inplace=True)
id_df.rename(columns={'leading_equipment': 'equipment_id'}, inplace=True)
# create really long format
long = id_df.melt(id_vars=['timestamp', 'equipment_id', 'equi_counter'])
long = long[long.equi_counter >= 0]
# get rid of NAs
long = long.dropna(subset=['value'])
# create wide format
wide = long.pivot(index=['equipment_id', 'timestamp'], columns=['variable', 'equi_counter'])
# create columns and System(Aggregated)Indicators
columns = []
rawmap = indicator_data._indicator_set._unique_id_to_raw()
sysindlist = []
for c in wide.columns:
if agg:
sysind = SystemAggregatedIndicator(rawmap[c[1]][0], rawmap[c[1]][1], c[2])
else:
sysind = SystemIndicator(rawmap[c[1]], c[2])
sysindlist.append(sysind)
columns.append(sysind._unique_id)
if agg:
sysindset = SystemAggregatedIndicatorSet(sysindlist)
else:
sysindset = SystemIndicatorSet(sysindlist)
wide.columns = columns
wide.reset_index(inplace=True)
equipment_set = EquipmentSet([e for e in indicator_data.equipment_set
if e.id in list(id_df['equipment_id'].unique())])
return sap_iot.TimeseriesDataset(wide, sysindset, equipment_set, indicator_data.nominal_data_start,
indicator_data.nominal_data_end)