Timeseries module can be used to retrieve timeseries data from the SAP iot abstract timeseries api.
This is the equivalent to sap_iot.fetch, except we're retrieving pre-aggregated data from the hot store rather than
raw data from the cold store.
from __future__ import annotations
from typing import Union, Iterable, TYPE_CHECKING
from datetime import datetime, timedelta
import logging
from collections import defaultdict
import pandas as pd
import isodate
from sailor.utils.timestamps import _any_to_timestamp, _timestamp_to_isoformat
from sailor.assetcentral.indicators import AggregatedIndicatorSet, IndicatorSet
from sailor.sap_iot.wrappers import TimeseriesDataset
from sailor.utils.oauth_wrapper import get_oauth_client
from sailor.sap_iot._common import request_aggregates_url
from sailor.utils.utils import DataNotFoundWarning, WarningAdapter
LOG = logging.getLogger(__name__)
LOG = WarningAdapter(LOG)
from ..assetcentral.equipment import EquipmentSet
def _parse_aggregation_interval(aggregation_interval):
if aggregation_interval is not None:
if isinstance(aggregation_interval, str):
aggregation_interval = isodate.parse_duration(aggregation_interval)
total_seconds = aggregation_interval.total_seconds()
aggregation_interval = f'PT{total_seconds}S'
return aggregation_interval
def _compose_query_params(template: str, equipment_set: EquipmentSet,
aggregated_indicator_set: AggregatedIndicatorSet,
aggregation_interval: str):
equipment_selector = ' or '.join(f"\"equipmentId\"='{equipment.id}'" for equipment in equipment_set)
tags_filter = f"({equipment_selector}) and \"templateId\" = '{template}'"
query_params = {
'tagsFilter': tags_filter,
if aggregation_interval:
query_params['duration'] = aggregation_interval
select_query_list = [indicator._iot_column_header for indicator in aggregated_indicator_set]
query_params['select'] = ",".join(select_query_list)
LOG.debug('Query parameters for aggregate timeseries request: %s', query_params)
return query_params
[docs]def get_indicator_aggregates(start: Union[str, pd.Timestamp, datetime], end: Union[str, pd.Timestamp, datetime],
indicator_set: IndicatorSet, equipment_set: EquipmentSet,
aggregation_functions: Iterable[str] = ('AVG',),
aggregation_interval: Union[str, pd.Timedelta, timedelta] = 'PT2M'):
Fetch timeseries data from SAP Internet of Things for Indicators attached to all equipments in this set.
Unlike :meth:`sailor.assetcentral.equipment.Equipment.get_indicator_data` this function retrieves
pre-aggregated data from the hot store.
Date of beginning of requested timeseries data.
Date of end of requested timeseries data.
IndicatorSet for which timeseries data is returned.
EquipmentSet for which timeseries data is returned.
aggregation_functions: Determines which aggregates to retrieve. Possible aggregates are
'MIN', 'MAX', 'AVG', 'STDDEV', 'SUM', 'FIRST', 'LAST',
aggregation_interval: Determines the aggregation interval. Can be specified as an ISO 8601 string
(like `PT2M` for 2-minute aggregates) or as a pandas.Timedelta or datetime.timedelta object.
Get indicator data for all Equipment belonging to the Model 'MyModel'
for a period from 01.06.2020 to 05.12.2020 ::
equipment_set = find_equipment(model_name='MyModel')
indicator_set = equipment_set.find_common_indicators()
get_indicator_aggregates('2020-06-01', '2020-12-05', indicator_set, equipment_set,
aggregation_functions=['MIN'], aggregation_interval=pd.Timedelta(hours=2))
# The data we get from SAP IoT is basically the first format below, where columns are identified by
# indicator_id and aggregation_function, indicator_group and template_id are in a separate column.
# That is confusing, as it's neither a purely horizonal format nor a purely vertical format.
# Much of the code below is therefore taking care of conversion from the SAP IoT format to our own format
# which is fully horizontal (ie column headers encode template, indicator group, indicator and aggregation function)
# SAP IoT format is effectively:
# time | equi - id | equi - model - id | indicator - group - id | template - id | indicator_A_value_AGG
# x | equi - 1 | model - 1 | indi - group - 1 | template - 1 | 5
# x | equi - 1 | model - 1 | indi - group - 2 | template - 1 | 15
# Our format is effectively:
# time | equi - id | equi - model - id | column_id_1 | columnd_id_2
# x | equi - 1 | model - 1 | 5 | 15
# where the column_ids encode all unique identifiers of an aggregated sensor value here
# column_id = hash(indicator_id + indicator_group_id + template_id + aggregation_function)
if start is None or end is None:
raise ValueError("Time parameters must be specified")
start = _any_to_timestamp(start)
end = _any_to_timestamp(end)
query_groups = defaultdict(list)
for indicator in indicator_set:
query_groups[(indicator.template_id, indicator._liot_group_id)].append(indicator._liot_id)
if aggregation_functions is None:
aggregation_functions = _ALL_KNOWN_AGGREGATION_FUNCTIONS
aggregated_indicators = AggregatedIndicatorSet._from_indicator_set_and_aggregation_functions(indicator_set,
oauth_iot = get_oauth_client('sap_iot')
schema = {'equipment_id': 'object', 'timestamp': pd.DatetimeTZDtype(tz='UTC')}
df = pd.DataFrame(columns=schema.keys()).astype(schema)
duration = None
aggregation_interval = _parse_aggregation_interval(aggregation_interval)
params = dict(start=_timestamp_to_isoformat(start, True), end=_timestamp_to_isoformat(end, True),
equipment_set=equipment_set, aggregated_indicator_set=aggregated_indicators,
# doing this as the aggregates api needs to be called for each indicator group
for (template_id, liot_group_id), group_indicators in query_groups.items():
LOG.debug("Fetching indicator data. Indicator Group ID: %s.", liot_group_id)
results = _fetch_aggregates(liot_group_id, template_id, oauth_iot, **params)
results_df = _prepare_df(results, aggregated_indicators, liot_group_id, template_id)
if results_df.empty:
duration = results[0]['properties']['duration'] if duration is None else duration
df = pd.merge(df, results_df, on=['timestamp', 'equipment_id'], how='outer')
df['timestamp'] = pd.to_datetime(df['timestamp'])
if df.empty:
warning = DataNotFoundWarning('Could not find any data for the requested period.')
if aggregation_interval is not None and duration is not None:
duration = isodate.parse_duration('P' + duration)
aggregation_interval = isodate.parse_duration(aggregation_interval)
if duration != aggregation_interval:
LOG.log_with_warning(f'The aggregation interval returned by the query ("{duration}") does ' +
f'not match the requested aggregation interval ("{aggregation_interval}")')
return TimeseriesDataset(df, aggregated_indicators, equipment_set, start, end)
def _fetch_aggregates(liot_group_id, template_id, oauth_iot, start, end, **params):
aggregates_url = request_aggregates_url(liot_group_id, start, end)
query_params = _compose_query_params(template_id, **params)
results = []
response_data = oauth_iot.request("GET", aggregates_url, params=query_params)
while response_data is not None and len(response_data['results']) > 0:
if next_link := response_data.get('nextLink'):
LOG.debug('Query incomplete, fetching next page.')
response_data = oauth_iot.request('GET', next_link)
return results
def _prepare_df(results, aggregated_indicators, liot_group_id, template_id) -> pd.DataFrame:
key_tags = ['equipmentId', 'modelId', 'templateId', 'indicatorGroupId']
grouped_results = defaultdict(list)
for row in results:
key = tuple(row['tags'][k] for k in key_tags)
all_dfs = []
for k, v in grouped_results.items():
df = pd.DataFrame(v)
for i, key_tag in enumerate(key_tags):
df[key_tag] = k[i]
if not all_dfs:
return pd.DataFrame()
tmp = pd.concat(all_dfs)
column_mapping = {}
drop_columns = ['templateId', 'indicatorGroupId', 'duration', 'modelId']
for column in tmp.columns:
if column.startswith('I_'):
indicator_candidates = aggregated_indicators.filter(_iot_column_header=column,
if len(indicator_candidates) == 0:
LOG.warning('No matching indicator found for %s, ignoring data.', column)
assert len(indicator_candidates) == 1, f'More than one indicator matching column {column} found!'
aggregated_indicator = indicator_candidates[0]
column_mapping[column] = aggregated_indicator._unique_id
# this typecasting based on the name of the column is pretty wild...
# should refactor to do it based on indicator data type
# but need to somehow combine that with the aggregation function
if aggregated_indicator.aggregation_function in ['TMIN', 'TMAX', 'TFIRST', 'TLAST']:
tmp[column] = pd.to_datetime(tmp[column])
else: # should always be an aggregated indicator, since we're filtering on 'I_' above.
tmp[column] = tmp[column].astype('double', errors='ignore')
results_df = (
tmp.drop(columns=drop_columns, errors='ignore')
.reindex(columns=['equipmentId', 'time'] + sorted(column_mapping))
.rename(columns={'time': 'timestamp', 'equipmentId': 'equipment_id'})
return results_df