Source code for sailor.sap_iot.fetch

"""
Timeseries module can be used to retrieve timeseries data from the SAP iot abstract timeseries api.

Interfaces for retrieval are aligned with AssetCentral objects such as equipment_set and indicator_set.
Timeseries data is generally stored in a pandas dataframe, wrapped in a convenience class to make it easier
to interact with the data in AssetCentral terms (see wrappers.py for the convenience class).
"""
from __future__ import annotations

from collections import defaultdict
from functools import partial
from datetime import datetime
from typing import TYPE_CHECKING, Union, BinaryIO
import logging
import time
import json
import zipfile
import gzip
from io import BytesIO

import pandas as pd

import sailor.assetcentral.indicators as ac_indicators
from sailor.utils.oauth_wrapper import get_oauth_client, RequestError
from sailor.utils.timestamps import _any_to_timestamp, _timestamp_to_date_string, _any_to_timedelta
from sailor.utils.config import SailorConfig
from sailor.utils.utils import DataNotFoundWarning, WarningAdapter
from .wrappers import TimeseriesDataset

if TYPE_CHECKING:
    from ..assetcentral.indicators import IndicatorSet
    from ..assetcentral.equipment import EquipmentSet

LOG = logging.getLogger(__name__)
LOG.addHandler(logging.NullHandler())
LOG = WarningAdapter(LOG)


fixed_timeseries_columns = {
    '_TIME': 'timestamp',
    'modelId': 'model_id',
    'equipmentId': 'equipment_id'
}


def _start_bulk_timeseries_data_export(start_date: str, end_date: str, liot_indicator_group: str) -> str:
    LOG.debug("Triggering raw indicator data export for indicator group: %s.", liot_indicator_group)
    oauth_iot = get_oauth_client('sap_iot')
    base_url = SailorConfig.get('sap_iot', 'export_url')  # todo: figure out what to do about these urls
    request_url = f'{base_url}/v1/InitiateDataExport/{liot_indicator_group}?timerange={start_date}-{end_date}'

    resp = oauth_iot.request('POST', request_url)
    return resp['RequestId']


def _check_bulk_timeseries_export_status(export_id: str) -> bool:
    LOG.debug("Checking export status for export id: %s.", export_id)
    oauth_iot = get_oauth_client('sap_iot')
    base_url = SailorConfig.get('sap_iot', 'export_url')  # todo: figure out what to do about these urls
    request_url = f'{base_url}/v1/DataExportStatus?requestId={export_id}'

    resp = oauth_iot.request('GET', request_url)

    if resp['Status'] == 'The file is available for download.':
        return True
    elif resp['Status'] in ['Request for data download is submitted.', 'Request for data download is initiated.']:
        return False
    else:
        raise RuntimeError(resp['Status'])


def _process_one_file(ifile: BinaryIO, indicator_set: IndicatorSet, equipment_set: EquipmentSet) -> pd.DataFrame:
    # each processed file contains data for some time range (one day it seems), one indicator group and all
    # equipment holding any data for that group in that time period.
    # Since the user might not have requested all indicators in the group we'll filter out any results that were not
    # requested. This is complicated by the fact that it's possible that the same indicator_id is present in the
    # indicator_group through two different templates. If it is requested only through one template it needs to be
    # filtered out after parsing the csv into a pandas dataframe, and converting to a
    # columnar format (one column for each (indicator_id, indicator_group_id, template_id)).

    float_types = ['numeric', 'numericflexible']

    selected_equipment_ids = [equipment.id for equipment in equipment_set]  # noqa: F841
    dtypes = {indicator._liot_id: float for indicator in indicator_set if indicator.datatype in float_types}
    dtypes.update({'equipmentId': 'object', 'indicatorGroupId': 'object', 'templateId': 'object'})

    try:
        df = pd.read_csv(ifile,
                         usecols=lambda x: x != 'modelId',
                         parse_dates=['_TIME'],
                         date_parser=partial(pd.to_datetime, utc=True, unit='ms', errors='coerce'),
                         dtype=dtypes)
    except pd.errors.EmptyDataError:
        LOG.debug('Empty file returned by SAP IoT API, ignoring the file.')
        return pd.DataFrame()

    df = df.pivot(index=['_TIME', 'equipmentId'], columns=['indicatorGroupId', 'templateId'])

    columns_to_keep = {}
    columns_flat = df.columns.to_flat_index()
    for indicator in indicator_set:
        id_tuple = (indicator._liot_id, indicator._liot_group_id, indicator.template_id)
        if id_tuple in columns_flat:
            columns_to_keep[id_tuple] = indicator._unique_id

    df.columns = columns_flat
    df = (
        df.filter(items=columns_to_keep.keys())
          .reset_index()
          .rename(columns=columns_to_keep)
          .rename(columns=fixed_timeseries_columns)
          .query('equipment_id in @selected_equipment_ids')
    )
    return df


def _get_exported_bulk_timeseries_data(export_id: str,
                                       indicator_set: IndicatorSet,
                                       equipment_set: EquipmentSet) -> pd.DataFrame:
    oauth_iot = get_oauth_client('sap_iot')
    base_url = SailorConfig.get('sap_iot', 'download_url')  # todo: figure out what to do about these urls
    request_url = f"{base_url}/v1/DownloadData('{export_id}')"
    resp = oauth_iot.request('GET', request_url, headers={'Accept': 'application/octet-stream'})
    ifile = BytesIO(resp)
    try:
        zip_content = zipfile.ZipFile(ifile)
    except zipfile.BadZipFile:
        raise RuntimeError('Downloaded file is corrupted, can not process contents.')

    frames = []
    for i, inner_file in enumerate(zip_content.filelist):
        # the end marker below allows us to keep updating the current line for a nicer 'progress update'
        print(f'processing compressed file {i + 1}/{len(zip_content.filelist)}', end='\x1b[2K\r')
        gzip_file = zip_content.read(inner_file)
        if not gzip_file:
            continue

        try:
            gzip_content = gzip.GzipFile(fileobj=BytesIO(gzip_file))
            frames.append(_process_one_file(gzip_content, indicator_set, equipment_set))
        except gzip.BadGzipFile:
            raise RuntimeError('Downloaded file is corrupted, can not process contents.')

    if frames:
        return pd.concat(frames)
    else:
        raise RuntimeError('Downloaded File did not have any content.')


[docs]def get_indicator_data(start_date: Union[str, pd.Timestamp, datetime.timestamp, datetime.date], end_date: Union[str, pd.Timestamp, datetime.timestamp, datetime.date], indicator_set: IndicatorSet, equipment_set: EquipmentSet, *, timeout: Union[str, pd.Timedelta, datetime.timedelta] = None) -> TimeseriesDataset: """ Read indicator data for a certain time period, a set of equipments and a set of indicators. Parameters ---------- start_date: Date of beginning of requested timeseries data. Time components of the date will be ignored. end_date: Date of end of requested timeseries data. Time components of the date will be ignored. indicator_set: IndicatorSet for which timeseries data is returned. equipment_set: Equipment set for which the timeseries data is read. timeout: Maximum amount of time the request may take. If None there is no time limit. Example ------- Get the indicator set 'my_indicator_set' timeseries data for equipments in the equipment set 'my_equipment_set' for a period from '2020-07-02' to '2021-01-10':: get_indicator_data('2020-07-02','2021-01-10', my_indicator_set, my_equipment_set) """ # some notes: # the bulk export api *only* works on indicator groups. No filtering for equipment_set or indicator_set. # so we always need to download data for the whole group. We filter on individual indicator-template combinations # as well as individual equipment in `_process_one_file`. if start_date is None or end_date is None: raise ValueError("Time parameters must be specified") if timeout is not None: timeout = _any_to_timedelta(timeout) timeout = timeout.total_seconds() start_date = _any_to_timestamp(start_date) end_date = _any_to_timestamp(end_date) query_groups = defaultdict(list) for indicator in indicator_set: query_groups[indicator._liot_group_id].append(indicator) request_ids = {} for indicator_group, indicator_subset in sorted(query_groups.items()): # sorted to make query order reproducable formatted_start_date = _timestamp_to_date_string(start_date) formatted_end_date = _timestamp_to_date_string(end_date) try: request_id = _start_bulk_timeseries_data_export(formatted_start_date, formatted_end_date, indicator_group) request_ids[request_id] = indicator_subset except RequestError as e: try: error_message = json.loads(e.error_text)['message'] except (json.JSONDecodeError, KeyError): raise e if error_message == 'Data not found for the requested date range': warning = DataNotFoundWarning( f'No data for indicator group {indicator_group} found in the requested time interval!') LOG.log_with_warning(warning) continue else: raise e LOG.info('Data export triggered for %s indicator group(s).', len(query_groups)) print(f'Data export triggered for {len(query_groups)} indicator group(s).') # string (or really uuid?) might be better data types for equipment_id # unfortunately, support for native string datatypes in pandas is still experimental # and if we cast to string right when reading the csv files it gets 'upcast' back to object # in `pivot` and `merge`. Hence we'll just stick with object for now. schema = {'equipment_id': 'object', 'timestamp': pd.DatetimeTZDtype(tz='UTC')} results = pd.DataFrame(columns=schema.keys()).astype(schema) print('Waiting for data export:') start_time = time.monotonic() while request_ids: for request_id in list(request_ids): if _check_bulk_timeseries_export_status(request_id): indicator_subset = ac_indicators.IndicatorSet(request_ids.pop(request_id)) print(f'\nNow downloading export for indicator group {indicator_subset[0].indicator_group_name}.') data = _get_exported_bulk_timeseries_data(request_id, indicator_subset, equipment_set) print('\nDownload complete') for indicator in indicator_subset: if indicator._unique_id not in data.columns: warning = DataNotFoundWarning(f'Could not find any data for indicator {indicator}') LOG.log_with_warning(warning) results = pd.merge(results, data, on=['equipment_id', 'timestamp'], how='outer') if request_ids: time.sleep(5) print('.', end='') if timeout is not None and timeout < (time.monotonic() - start_time): raise TimeoutError(f'Timeout of {timeout:.0f} seconds was reached for fetching indicator data.') print() wrapper = TimeseriesDataset(results, indicator_set, equipment_set, start_date, end_date) return wrapper