Source code for sailor.assetcentral.notification

"""
Retrieve Notification information from AssetCentral.

Classes are provided for individual Notifications as well as groups of Notifications (NotificationSet).
"""
import logging

import pandas as pd
import plotnine as p9

import sailor.assetcentral.equipment
from sailor import _base
from sailor._base.masterdata import _nested_put_setter
from sailor.utils.utils import WarningAdapter
from ..utils.oauth_wrapper import get_oauth_client
from ..utils.timestamps import _string_to_timestamp_parser
from ..utils.plot_helper import _default_plot_theme
from .constants import VIEW_NOTIFICATIONS
from .utils import (AssetcentralEntity, _AssetcentralField, _AssetcentralWriteRequest, AssetcentralEntitySet,
                    _ac_application_url, _ac_fetch_data)

_NOTIFICATION_FIELDS = [
    _AssetcentralField('name', 'internalId'),
    _AssetcentralField('equipment_name', 'equipmentName'),
    _AssetcentralField('priority_description', 'priorityDescription'),
    _AssetcentralField('status_text', 'statusDescription'),
    _AssetcentralField('short_description', 'shortDescription', 'description', is_mandatory=True,
                       put_setter=_nested_put_setter('description', 'shortDescription')),
    _AssetcentralField('malfunction_start_date', 'malfunctionStartDate', 'malfunctionStartDate',
                       get_extractor=_string_to_timestamp_parser(),
                       query_transformer=_base.masterdata._qt_timestamp),
    _AssetcentralField('malfunction_end_date', 'malfunctionEndDate', 'malfunctionEndDate',
                       get_extractor=_string_to_timestamp_parser(),
                       query_transformer=_base.masterdata._qt_timestamp),
    _AssetcentralField('breakdown', 'breakdown', 'breakdown', get_extractor=lambda v: bool(int(v)),
                       query_transformer=_base.masterdata._qt_boolean_int_string),
    _AssetcentralField('confirmed_failure_mode_description', 'confirmedFailureModeDesc'),
    _AssetcentralField('cause_description', 'causeDesc'),
    _AssetcentralField('effect_description', 'effectDesc'),
    _AssetcentralField('notification_type', 'notificationType', 'type', is_mandatory=True),
    _AssetcentralField('status', 'status', 'status', is_mandatory=True,
                       put_setter=lambda p, v: p.update({'status': [v] if isinstance(v, str) else v})),
    _AssetcentralField('long_description', 'longDescription', 'description',
                       put_setter=_nested_put_setter('description', 'longDescription')),
    _AssetcentralField('id', 'notificationId', 'notificationID'),
    _AssetcentralField('priority', 'priority', 'priority', is_mandatory=True),
    _AssetcentralField('equipment_id', 'equipmentId', 'equipmentID', is_mandatory=True),
    _AssetcentralField('cause_id', 'causeID', 'causeID'),
    _AssetcentralField('cause_display_id', 'causeDisplayID'),
    _AssetcentralField('effect_id', 'effectID', 'effectID'),
    _AssetcentralField('effect_display_id', 'effectDisplayID'),
    _AssetcentralField('instruction_id', 'instructionID', 'instructionID'),
    _AssetcentralField('instruction_title', 'instructionTitle'),
    _AssetcentralField('operator_id', 'operatorId', 'operator'),  # setting 'operator' has no effect
    _AssetcentralField('confirmed_failure_mode_id', 'confirmedFailureModeID', 'confirmedFailureModeID'),
    _AssetcentralField('confirmed_failure_mode_name', 'confirmedFailureModeDisplayID'),
    _AssetcentralField('end_date', 'endDate', 'endDate', get_extractor=_string_to_timestamp_parser(),
                       query_transformer=_base.masterdata._qt_timestamp),
    _AssetcentralField('functional_location_id', 'functionalLocationID', 'functionalLocationID'),
    _AssetcentralField('location_id', 'locationId', 'locationID'),
    _AssetcentralField('location_name', 'location'),
    _AssetcentralField('model_id', 'modelId'),
    _AssetcentralField('notification_type_description', 'notificationTypeDescription'),
    _AssetcentralField('root_equipment_id', 'rootEquipmentId'),
    _AssetcentralField('root_equipment_name', 'rootEquipmentName'),
    _AssetcentralField('start_date', 'startDate', 'startDate',
                       get_extractor=_string_to_timestamp_parser(),
                       query_transformer=_base.masterdata._qt_timestamp),
    # can not filter systemProposedFailureModeID (no error but filter is ignored...)
    # that's probably not enough to use a _base.masterdata._qt_non_filterable though.
    _AssetcentralField('system_failure_mode_id', 'systemProposedFailureModeID', 'systemProposedFailureModeID'),
    _AssetcentralField('system_failure_mode_description', 'systemProposedFailureModeDesc'),
    _AssetcentralField('system_failure_mode_name', 'systemProposedFailureModeDisplayID'),
    _AssetcentralField('user_failure_mode_id', 'proposedFailureModeID', 'proposedFailureModeID'),
    _AssetcentralField('user_failure_mode_description', 'proposedFailureModeDesc'),
    _AssetcentralField('user_failure_mode_name', 'proposedFailureModeDisplayID'),
    _AssetcentralField('_is_internal', 'isInternal'),
    _AssetcentralField('_created_by', 'createdBy'),
    _AssetcentralField('_creation_datetime', 'creationDateTime'),
    _AssetcentralField('_lastchanged_by', 'lastChangedBy'),
    _AssetcentralField('_lastchange_datetime', 'lastChangeDateTime'),
    _AssetcentralField('_progress_status', 'progressStatus'),
    _AssetcentralField('_progress_status_description', 'progressStatusDescription'),
    _AssetcentralField('_coordinates', 'coordinates'),
    _AssetcentralField('_source', 'source'),
    _AssetcentralField('_assetcore_equipment_id', 'assetCoreEquipmentId'),  # duplicate of equipmentId?
    _AssetcentralField('_operator', 'operator'),
]

LOG = logging.getLogger(__name__)
LOG.addHandler(logging.NullHandler())
LOG = WarningAdapter(LOG)


[docs]@_base.add_properties class Notification(AssetcentralEntity): """AssetCentral Notification Object.""" _field_map = {field.our_name: field for field in _NOTIFICATION_FIELDS}
[docs] def update(self, **kwargs) -> 'Notification': """Write the current state of this object to AssetCentral with updated values supplied. After the update in the remote system was successful, this object reflects the updated state. Accepts keyword arguments which names correspond to the available properties. Example ------- .. code-block:: notf2 = notf.update(notification_type='M1') assert notf.notification_type == 'M1' assert notf2 == notf Returns ------- Notification self See Also -------- :meth:`update_notification` """ updated_obj = update_notification(self, **kwargs) self.raw = updated_obj.raw return self
[docs] def plot_context(self, data=None, window_before=pd.Timedelta(days=7), window_after=pd.Timedelta(days=2)): """ Plot a notification in the context of the timeseries data around the time of the notification. This plot can be used to gain insight into the sensor behaviour around the time that a malfunction occurs. If the `data` parameter is left as `None` the data required for plotting is automatically retrieved from SAP IoT. Parameters ---------- data TimeseriesDataset to use for plotting indicator data near the Notification. window_before Time interval plotted before a notification. Default value is 7 days before a notification window_after Time interval plotted after a notification. Default value is 2 days after a notification """ equipment_set = sailor.assetcentral.equipment.find_equipment(id=self.equipment_id) if self.start_date and self.end_date: data_start = self.start_date - window_before area_start = max(data_start, self.start_date) data_end = self.end_date + window_after area_end = min(data_end, self.end_date) elif self.start_date: data_start = self.start_date - window_before area_start = max(data_start, self.start_date) data_end = self.start_date + window_after area_end = data_end elif self.end_date: data_start = self.end_date - window_before area_start = data_start data_end = self.end_date + window_after area_end = min(data_end, self.end_date) else: raise RuntimeError('Either notification start_date or notification end_date must be known to plot context.') if data is None: data = equipment_set.get_indicator_data(data_start, data_end) plot = ( data.plot(data_start, data_end, equipment_set=equipment_set) + p9.annotate('vline', xintercept=[self.start_date, self.end_date], size=2, linetype='dotted') + p9.annotate('rect', xmin=area_start, xmax=area_end, ymin=-float('inf'), ymax=float('inf'), alpha=0.2) ) return plot
[docs]class NotificationSet(AssetcentralEntitySet): """Class representing a group of Notifications.""" _element_type = Notification _method_defaults = { 'plot_distribution': { 'by': 'equipment_name', }, }
[docs] def plot_overview(self): """ Plot an overview over all notifications in the set as a function of time. Each notification will be shown by a rectangle, on a y-scale representing the affected equipment and with a color representing the confirmed failure mode description. Example ------- Plot an overview over all notifications in the dataset "my_notifications" by time:: my_notifications.plot_overview() """ data = self.as_df(columns=['malfunction_start_date', 'malfunction_end_date', 'equipment_name', 'confirmed_failure_mode_description']) # if there are any `NA` values in the equipment_name the plot gets messed up. # this turns the NAs into an 'nan' string, which works fine. data['equipment_name'] = data['equipment_name'].astype(str) aes = { 'x': 'malfunction_start_date', 'xend': 'malfunction_end_date', 'y': 'equipment_name', 'yend': 'equipment_name', 'color': 'confirmed_failure_mode_description', } plot = p9.ggplot(data, p9.aes(**aes)) plot += p9.geom_segment(size=6, alpha=0.7) plot += _default_plot_theme() return plot
[docs]def find_notifications(*, extended_filters=(), **kwargs) -> NotificationSet: """Fetch Notifications from AssetCentral with the applied filters, return a NotificationSet. This method supports the common filter language explained at :ref:`filter`. **Allowed entries for filter terms** Type of notifications and its meanings M1: Maintenance Request, M2: BreakDown Priorities and its meanings 5: Low, 10: Medium, 15: High, 20: Very High, 25: Emergency Status types and its meanings NEW: New, PBD: Published, CPT: Completed, IPR: InProcess Parameters ---------- extended_filters See :ref:`filter`. **kwargs See :ref:`filter`. Examples -------- Find all notifications with short_description 'MyNotification':: find_notifications(short_description='MyNotification') Find all notifications which either have the short_description 'MyNotification' or the short_description 'MyOtherNotification':: find_notifications(short_description=['MyNotification', 'MyOtherNotification']) Find all notifications with short_description 'MyNotification' which also have the start date '2020-07-01':: find_notifications(short_description='MyNotification', start_date='2020-07-01') Find all notifications with a confirmed failure mode description is not empty:: find_notifications(extended_filters=['confirmed_failure_mode_description != "None"']) Find all notifications in a given timeframe for specific equipment:: find_notifications(extended_filters=['malfunctionStartDate > "2020-08-01"', 'malfunctionEndDate <= "2020-09-01"'], equipment_id=['id1', 'id2']) """ unbreakable_filters, breakable_filters = \ _base.parse_filter_parameters(kwargs, extended_filters, Notification._field_map) endpoint_url = _ac_application_url() + VIEW_NOTIFICATIONS object_list = _ac_fetch_data(endpoint_url, unbreakable_filters, breakable_filters, paginate=True) LOG.debug('Found %d notifications for the specified filters.', len(object_list)) return NotificationSet([Notification(obj) for obj in object_list])
def _create_or_update_notification(request, method) -> Notification: request.validate() endpoint_url = _ac_application_url() + VIEW_NOTIFICATIONS oauth_client = get_oauth_client('asset_central') response = oauth_client.request(method, endpoint_url, json=request.data) result = find_notifications(id=response['notificationID']) if len(result) != 1: raise RuntimeError('Unexpected error when creating or updating the notification "%s" Please try again.', response['notificationID']) return result[0]
[docs]def create_notification(**kwargs) -> Notification: """Create a new notification. Parameters ---------- **kwargs Keyword arguments which names correspond to the available properties. Returns ------- Notification A new notification object as retrieved from AssetCentral after the create succeeded. Example ------- notf = create_notification(equipment_id='123', short_description='test', notification_type='M2', status='NEW', priority=5) """ request = _AssetcentralWriteRequest(Notification._field_map) request.insert_user_input(kwargs, forbidden_fields=['id']) return _create_or_update_notification(request, 'POST')
[docs]def update_notification(notification: Notification, **kwargs) -> Notification: """Update an existing notification. Write the current state of the given notification object to AssetCentral with updated values supplied. This equals a PUT request in the traditional REST programming model. Parameters ---------- **kwargs Keyword arguments which names correspond to the available properties. Returns ------- Notification A new notification object as retrieved from AssetCentral after the update succeeded. Examples -------- notf = update_notification(notf, status='IPR', long_description='hello world') """ request = _AssetcentralWriteRequest.from_object(notification) request.insert_user_input(kwargs, forbidden_fields=['id', 'equipment_id']) return _create_or_update_notification(request, 'PUT')