PyHydroQC Functions

The PyHydroQC Python package contains functions for anomaly detection and correction of aquatic sensor data.

The package may be installed from the Test Python Package Index.

Also see the GitHub repository.

All functions available in the PyHydroQC package are documented here. See the example notebooks and scripts for implementation of the steps and workflow for anomaly detection and correction.

Rules Based Detection and Correction Functions (module: rules_detect)

add_labels(df, value=-9999)

add_labels adds an indicator that there is an anomalous value that should have been labeled by the expert but was not. Considers a specified 'no data value' (default is -9999) as well as null values. Only relevant if comparing to technician/expert labeled data.

Parameters:
  • df (``) – data frame with columns: 'raw' of raw data 'cor' of corrected data 'labeled_anomaly' booleans where True=1 corresponds to anomalies

  • value (``) – the 'no data value' in the data for which the function checks.

Returns:
  • df – data frame with column 'labeled_anomaly' modified.

Source code in PyHydroQC/rules_detect.py
def add_labels(df, value=-9999):
    """
    add_labels adds an indicator that there is an anomalous value that should have been labeled by the expert but was not. Considers a specified 'no data value' (default is -9999) as well as null values. Only relevant if comparing to technician/expert labeled data.
    Arguments:
        df: data frame with columns:
            'raw' of raw data
            'cor' of corrected data
            'labeled_anomaly' booleans where True=1 corresponds to anomalies
        value: the 'no data value' in the data for which the function checks.
    Returns:
        df: data frame with column 'labeled_anomaly' modified.
    """
    df['labeled_anomaly'] = np.where((df['raw'] == value) | (df['cor'] == value) | (df['cor'].isnull()), True, df['labeled_anomaly'])

    return df

group_size(df)

group_size determines the size of the largest consecutive group of anomalous points.

Parameters:
  • df (``) – data frame with column 'anomaly'.

Returns:
  • size – length of the largest consecutive group of anomalous points.

Source code in PyHydroQC/rules_detect.py
def group_size(df):
    """
    group_size determines the size of the largest consecutive group of anomalous points.
    Arguments:
        df: data frame with column 'anomaly'.
    Returns:
        size: length of the largest consecutive group of anomalous points.
    """
    temp = df[['anomaly']].copy(deep=True)
    temp['value_grp'] = anomaly_utilities.anomaly_events(temp['anomaly'], 0, 1)
    size = 0
    if max(temp['value_grp']) > 0:
        size = len(temp['value_grp'][temp['value_grp'] == 1])
        for i in range(2, max(temp['value_grp']) + 1):
            if(len(temp['value_grp'][temp['value_grp'] == i]) > size):
                size = len(temp['value_grp'][temp['value_grp'] == i])

    return size

interpolate(df, limit=10000)

interpolate performs linear interpolation on points identified as anomalous (typically by rules based approaches).

Parameters:
  • df (``) – data frame with a column 'raw' of raw data and a boolean column 'anomaly'.

  • limit (``) – maximum length/number of points of acceptable interpolation. If an event is exceeds this length, it will not be interpolated.

Returns:
  • df – data frame with added column 'observed' for interpolated data.

Source code in PyHydroQC/rules_detect.py
def interpolate(df, limit=10000):
    """
    interpolate performs linear interpolation on points identified as anomalous (typically by rules based approaches).
    Arguments:
        df: data frame with a column 'raw' of raw data and a boolean column 'anomaly'.
        limit: maximum length/number of points of acceptable interpolation. If an event is exceeds this length, it will not be interpolated.
    Returns:
        df: data frame with added column 'observed' for interpolated data.
    """
    df['observed'] = np.where(df['anomaly'], np.nan, df['raw'])
    df['observed'].interpolate(method='linear', inplace=True, limit=limit, limit_direction='both')

    return df

persistence(df, length, output_grp=False)

persistence adds an anomalous label in the data frame if data repeat for specified length.

Parameters:
  • df (``) – data frame with a column 'raw' of raw data and a boolean column 'anomaly' (typically output of range_check)

  • length (``) – duration of persistent/repeated values to be flagged

  • output_grp (``) – boolean to indicate whether the length of persistence should be output as a column in the original dataframe.

Returns:
  • df – dataframe with column 'anomaly' modified and added column 'persist_grp' that indexes points as part of persistent groups persist_count: total number of persistent points in the data frame

Source code in PyHydroQC/rules_detect.py
def persistence(df, length, output_grp=False):
    """
    persistence adds an anomalous label in the data frame if data repeat for specified length.
    Arguments:
        df: data frame with a column 'raw' of raw data and a boolean column 'anomaly' (typically output of range_check)
        length: duration of persistent/repeated values to be flagged
        output_grp: boolean to indicate whether the length of persistence should be output as a column in the original dataframe.
    Returns:
        df: dataframe with column 'anomaly' modified and added column 'persist_grp' that indexes points as part of persistent groups
        persist_count: total number of persistent points in the data frame
    """
    temp = df[['raw', 'anomaly']].copy(deep=True)
    temp['persist_grp'] = (temp.raw.diff(1) == 0)
    temp['persist_grp'] = anomaly_utilities.anomaly_events(temp['persist_grp'], 0, 1)
    for i in range(1, max(temp['persist_grp']) + 1):
        if(len(temp['persist_grp'][temp['persist_grp'] == i]) >= length):
            temp['anomaly'][temp['persist_grp'] == i] = True
    persist_count = sum(temp['persist_grp'] != 0)
    df['anomaly'] = temp['anomaly']
    if output_grp:
        df['persist_grp'] = temp['persist_grp']

    return df, persist_count

range_check(df, maximum, minimum)

range_check adds a column to data frame with label if data are out of range.

Parameters:
  • df (``) – data frame with a column 'raw' of raw data.

  • maximum (``) – maximum acceptable value - above this value, data are anomalous

  • minimum (``) – minimum acceptable value - below this value, data are anomalous

Returns:
  • df – data frame with an added column 'anomaly' with boolean where 1 = True for anomalous range_count: total number of anomalies from this check

Source code in PyHydroQC/rules_detect.py
def range_check(df, maximum, minimum):
    """
    range_check adds a column to data frame with label if data are out of range.
    Arguments:
        df: data frame with a column 'raw' of raw data.
        maximum: maximum acceptable value - above this value, data are anomalous
        minimum: minimum acceptable value - below this value, data are anomalous
    Returns:
        df: data frame with an added column 'anomaly' with boolean where 1 = True for anomalous
        range_count: total number of anomalies from this check
    """
    # could do some sort of look up table with the values for each sensor
    # could also add seasonal checks
    df = df.eval('anomaly = raw > @maximum or raw < @minimum')
    range_count = sum(df['anomaly'])

    return df, range_count

Calibration Detection and Correction Functions (module: calibration)

calib_edge_detect(observed, width, calib_params, threshold=nan, num_events=1, alpha=nan)

calib_edge_detect seeks to find likely calibration event candidates by using edge filtering

Parameters:
  • observed (``) – time series of observations

  • width (``) – the width of the edge detection filter

  • calib_params (``) – parameters defined in the parameters file hour_low: earliest hour for calibrations to have occurred hour_high: latest hour for calibrations to have occurred

  • threshold (``) – used for determining candidates from edge filter results

  • num_events (``) – the number of calibration event candidates to return

  • alpha (``) – used for determining a threshold from the data

Returns:
  • candidates – datetimes of the most likely calibration event candidates edge_diff: differences indicating degree of edges

Source code in PyHydroQC/calibration.py
def calib_edge_detect(observed, width, calib_params, threshold=float("nan"), num_events=1, alpha=float("nan")):
    """
   calib_edge_detect seeks to find likely calibration event candidates by using edge filtering
   Arguments:
       observed: time series of observations
       width: the width of the edge detection filter
       calib_params: parameters defined in the parameters file
            hour_low: earliest hour for calibrations to have occurred
            hour_high: latest hour for calibrations to have occurred
       threshold: used for determining candidates from edge filter results
       num_events: the number of calibration event candidates to return
       alpha: used for determining a threshold from the data
   Returns:
       candidates: datetimes of the most likely calibration event candidates
       edge_diff: differences indicating degree of edges
    """
    # TODO: add functionality for num_events and alpha

    candidates = []
    edge_diff = pd.DataFrame(index=observed.index)  # diff['val'] is the filter output
    edge_diff['val'] = 0
    for i in range(width, len(observed) - width):  # loop over every possible difference calculation
        # implement the edge detection filter - difference of the sums of before and after data
        edge_diff.iloc[i] = (sum(observed[i - width:i]) - sum(observed[i:i + width])) / width

    if not np.isnan(threshold):  # if the function is being called with a threshold

        # iterate over each day, this assumes that a sensor will not be calibrated twice in one day
        for idx, day in edge_diff.groupby(edge_diff.index.date):
            if max(abs((day['val']))) > threshold:  # if any value is above the threshold in that day
                candidates.append(pd.to_datetime(day.idxmax()['val']))  # add it to the list of calibration candidates

    # specify that calibrations would only occur on work days and between specified hours of the day
    candidates = np.array(candidates)
    candidates = candidates[(pd.to_datetime(candidates).dayofweek <= 4) &
                            (pd.to_datetime(candidates).hour >= calib_params['hour_low']) &
                            (pd.to_datetime(candidates).hour <= calib_params['hour_high'])]

    return candidates, edge_diff

calib_overlap(sensor_names, input_array, calib_params)

calib_overlap seeks to identify calibration events by identifying where overlaps occur between multiple sensors. Calls the calib_detect function to identify events with a defined persistence length during certain days of the week (M-F) and hours of the day.

Parameters:
  • sensor_names (``) – list of sensors to be considered for overlap.

  • input_array (``) – array of data frames each with columns: 'observed' of observed data 'anomaly' booleans where True=1 corresponds to anomalies 'persist_grp' with indices of persistent groups (output of the persist function)

  • calib_params (``) – parameters defined in the parameters file persist_high: longest length of the persistent group perist_low: shortest length of the persistent group hour_low: earliest hour for calibrations to have occurred hour_high: latest hour for calibrations to have occurred

Returns:
  • all_calib – array of data frames (one for each sensor) of booleans indicating whether the conditions were met for a possible calibration event. all_calib_dates: array of datetimes (one for each sensor) for which the conditions were met for a possible calibration event. df_all_calib: data frame with columns for each sensor observations, columns of booleans for each sensor indicating whether a calibration event may have occurred, and a column 'all_calib' that indicates if the conditions were met for all sensors. calib_dates_overlap: datetimes for which the conditions were met for a possible calibration event for all sensors.

Source code in PyHydroQC/calibration.py
def calib_overlap(sensor_names, input_array, calib_params):
    """
    calib_overlap seeks to identify calibration events by identifying where overlaps occur between multiple sensors.
    Calls the calib_detect function to identify events with a defined persistence length during certain days of the
    week (M-F) and hours of the day.
    Arguments:
        sensor_names: list of sensors to be considered for overlap.
        input_array: array of data frames each with columns:
            'observed' of observed data
            'anomaly' booleans where True=1 corresponds to anomalies
            'persist_grp' with indices of persistent groups (output of the persist function)
        calib_params: parameters defined in the parameters file
            persist_high: longest length of the persistent group
            perist_low: shortest length of the persistent group
            hour_low: earliest hour for calibrations to have occurred
            hour_high: latest hour for calibrations to have occurred
    Returns:
        all_calib: array of data frames (one for each sensor) of booleans indicating whether the conditions were met
        for a possible calibration event.
        all_calib_dates: array of datetimes (one for each sensor) for which the conditions were met for a possible
        calibration event.
        df_all_calib: data frame with columns for each sensor observations, columns of booleans for each sensor
        indicating whether a calibration event may have occurred, and a column 'all_calib' that indicates if the
        conditions were met for all sensors.
        calib_dates_overlap: datetimes for which the conditions were met for a possible calibration event for
        all sensors.
    """
    all_calib = dict()
    all_calib_dates = dict()
    df_all_calib = pd.DataFrame(index=input_array[sensor_names[0]].index)
    df_all_calib['all_calib'] = True
    for snsr in sensor_names:
        calib, calib_dates = calib_persist_detect(input_array[snsr], calib_params)
        all_calib[snsr] = calib
        all_calib_dates[snsr] = calib_dates
        df_all_calib[snsr] = input_array[snsr]['observed']
        df_all_calib[snsr + '_calib'] = all_calib[snsr]['anomaly']
        df_all_calib[snsr + '_event'] = anomaly_utilities.anomaly_events(df_all_calib[snsr + '_calib'], wf=1, sf=1)
        df_all_calib['all_calib'] = np.where(df_all_calib['all_calib'] & (df_all_calib[snsr + '_event'] != 0), True, False)
    calib_dates_overlap = df_all_calib[df_all_calib['all_calib']].index

    return all_calib, all_calib_dates, df_all_calib, calib_dates_overlap

calib_persist_detect(df, calib_params)

calib_detect seeks to find calibration events based on 2 conditions: persistence of a defined length, which often occurs when sensors are out of the water, during certain days of the week (M-F) and times of the day.

Parameters:
  • df (``) – data frame with columns: 'observed' of observed data 'anomaly' booleans where True=1 corresponds to anomalies 'persist_grp' with indices of peristent groups (output of the persist function)

  • calib_params (``) – parameters defined in the parameters file persist_high: longest length of the persistent group perist_low: shortest length of the persistent group hour_low: earliest hour for calibrations to have occurred hour_high: latest hour for calibrations to have occurred

Returns:
  • calib – data frame of booleans indicating whether the conditions were met for a possible calibration event. calib_dates: datetimes for which the conditions were met for a possible calibration event.

Source code in PyHydroQC/calibration.py
def calib_persist_detect(df, calib_params):
    """
    calib_detect seeks to find calibration events based on 2 conditions: persistence of a defined length, which often occurs when sensors are out of the water, during certain days of the week (M-F) and times of the day.
    Arguments:
        df: data frame with columns:
            'observed' of observed data
            'anomaly' booleans where True=1 corresponds to anomalies
            'persist_grp' with indices of peristent groups (output of the persist function)
        calib_params: parameters defined in the parameters file
            persist_high: longest length of the persistent group
            perist_low: shortest length of the persistent group
            hour_low: earliest hour for calibrations to have occurred
            hour_high: latest hour for calibrations to have occurred
    Returns:
        calib: data frame of booleans indicating whether the conditions were met for a possible calibration event.
        calib_dates: datetimes for which the conditions were met for a possible calibration event.
    """
    if 'persist_grp' in df:
        temp = df[['observed', 'anomaly', 'persist_grp']].copy(deep=True)
        for i in range(1, max(temp['persist_grp']) + 1):
            temp['persist_grp'][temp.loc[temp['persist_grp'].shift(-1) == i].index[0]] = i
            if ((len(temp['persist_grp'][temp['persist_grp'] == i]) >= calib_params['persist_low']) and
                    (len(temp['persist_grp'][temp['persist_grp'] == i]) <= calib_params['persist_high'])):
                temp['anomaly'][temp['persist_grp'] == i] = True
    else:
        temp = df[['observed', 'anomaly']].copy(deep=True)
        temp['persist_grp'] = (temp.observed.diff(1) == 0)
        temp['persist_grp'] = anomaly_utilities.anomaly_events(temp['persist_grp'], 0, 1)
        for i in range(1, max(temp['persist_grp']) + 1):
            temp['persist_grp'][temp.loc[temp['persist_grp'].shift(-1) == i].index[0]] = i
            if ((len(temp['persist_grp'][temp['persist_grp'] == i]) >= calib_params['persist_low']) and
                    (len(temp['persist_grp'][temp['persist_grp'] == i]) <= calib_params['persist_high'])):
                temp['anomaly'][temp['persist_grp'] == i] = True

    dayofweek = temp.index.dayofweek
    hour = temp.index.hour
    business = temp.iloc[((dayofweek == 0) | (dayofweek == 1) | (dayofweek == 2) | (dayofweek == 3) | (dayofweek == 4))
                         & (hour >= calib_params['hour_low']) & (hour <= calib_params['hour_high'])]
    calib = pd.DataFrame(index=temp.index)
    calib['anomaly'] = False
    calib['anomaly'].loc[business[business['anomaly']].index] = True
    calib_dates = calib[calib['anomaly']].index

    return calib, calib_dates

find_gap(observed, calib_date, hours=2, show_shift=False)

find_gap determines the gap value of a calibration event based on the largest single difference. Uses a given time stamp and searches within a designated window. Accounts for large spikes immediately following the difference.

Parameters:
  • observed (``) – time series of observations

  • calib_date (``) – datetime for performing the correction

  • hours (``) – window on each side of the calib_date to consider for finding the greatest difference. To use the exact datetime and not consider a window, use hours=0.

  • show_shift (``) – boolean indicating if subset used to determine the gap value should be output

Returns:
  • gap – the resulting value of the gap end: the ending timestamp corresponding to applying the gap. Used as input to linear drift correction. shifted: the subset of data used to determine the gap value with the gap applied

Source code in PyHydroQC/calibration.py
def find_gap(observed, calib_date, hours=2, show_shift=False):
    """
    find_gap determines the gap value of a calibration event based on the largest single difference.
    Uses a given time stamp and searches within a designated window. Accounts for large spikes
    immediately following the difference.
    Args:
        observed: time series of observations
        calib_date: datetime for performing the correction
        hours: window on each side of the calib_date to consider for finding the greatest difference. To use the exact datetime and not consider a window, use hours=0.
        show_shift: boolean indicating if subset used to determine the gap value should be output
    Returns:
        gap: the resulting value of the gap
        end: the ending timestamp corresponding to applying the gap. Used as input to linear drift correction.
        shifted: the subset of data used to determine the gap value with the gap applied

    """
    # time window to consider
    subset = observed.loc[
             pd.to_datetime(calib_date) - pd.Timedelta(hours=hours):
             pd.to_datetime(calib_date) + pd.Timedelta(hours=hours)
             ]
    # shift index by 1
    shifted = subset.shift(-1)
    # timestamp of greatest difference
    maxtime = abs(subset.diff()).idxmax()
    # if the two subsequent signs are different, then add them together for the gap. This should address/eliminate
    #   spikes following the calibration.
    if subset.diff().loc[maxtime] * shifted.diff().loc[maxtime] < 0:
        gap = subset.diff().loc[maxtime] + shifted.diff().loc[maxtime]
    else:
        gap = subset.diff().loc[maxtime]
    # the last timestamp for the shift to occur
    end = abs(shifted.diff()).idxmax()

    if show_shift:
        # shift subset to compare
        shifted = subset.loc[subset.index[0]:end] + gap
        return gap, end, shifted
    else:
        return gap, end

lin_drift_cor(observed, start, end, gap, replace=True)

lin_drift_cor performs linear drift correction on data. Typical correction for calibration events. This function operates on the basis of a single event.

Parameters:
  • observed (``) – time series of observations

  • start (``) – datetime for the beginning of the correction

  • end (``) – datetime for the end of the correction

  • gap (``) – gap value that determines the degree of the shift, which occurs at the end date.

  • replace (``) – indicates whether the values of the correction should replace the associated values in the data frame

Returns:
  • result – data frame of corrected values observed: time series of observations with corrected values if replace was selected

Source code in PyHydroQC/calibration.py
def lin_drift_cor(observed, start, end, gap, replace=True):
    """
   lin_drift_cor performs linear drift correction on data. Typical correction for calibration events. This function operates on the basis of a single event.
   Arguments:
       observed: time series of observations
       start: datetime for the beginning of the correction
       end: datetime for the end of the correction
       gap: gap value that determines the degree of the shift, which occurs at the end date.
       replace: indicates whether the values of the correction should replace the associated values in the data frame
   Returns:
       result: data frame of corrected values
       observed: time series of observations with corrected values if replace was selected
    """
    # todo: ignore - 9999 values

    result = pd.DataFrame(index=observed.loc[start:end].index)
    result['ldc'] = pd.Series(dtype=float)
    for i in range(len(observed.loc[start:end])):
        # y = original_value[i] - i * gap / total_num_points
        y = observed.iloc[observed.index.get_loc(start) + i] + gap/(len(observed.loc[start:end])-1) * i
        result['ldc'].iloc[i] = y
    if replace:
        observed.loc[start:end] = result['ldc']

    return result, observed

Utilities Functions (module: anomaly_utilities)

aggregate_results(df, models, verbose=False, compare=False)

aggregate_results combines the assessments of detections from multiple models to give a single output of anomalies. If any model detects an anomaly, the point is labeled as anomalous.

Parameters:
  • df (``) – data frame with required column 'observed' of observed data values.

  • models (``) – dictionary of model outputs consisting of dataframes with the required column 'detected_event' of booleans indicating anomalies.

  • verbose (``) – if True, includes columns for each model type in the output.

  • compare (``) – if True, includes columns for technician labeled anomalies and labeled events in the output (as gathered from the input df) for determination of metrics.

Returns:
  • results_all – data frame containing columns 'detected_event' of booleans representing anomalies aggregated from all of the models and 'observed' of observed values. Additional columns are added if verbose and compare options are selected. metrics_all: if compare is selected, then metrics are output for the aggregate anomalies.

Source code in PyHydroQC/anomaly_utilities.py
def aggregate_results(df, models, verbose=False, compare=False):
    """
    aggregate_results combines the assessments of detections from multiple models to give a single output of anomalies.
    If any model detects an anomaly, the point is labeled as anomalous.
    Arguments:
        df: data frame with required column 'observed' of observed data values.
        models: dictionary of model outputs consisting of dataframes with the required column 'detected_event' of booleans indicating anomalies.
        verbose: if True, includes columns for each model type in the output.
        compare: if True, includes columns for technician labeled anomalies and labeled events in the output (as gathered from the input df) for determination of metrics.
    Returns:
        results_all: data frame containing columns 'detected_event' of booleans representing anomalies aggregated
        from all of the models and 'observed' of observed values.
            Additional columns are added if verbose and compare options are selected.
        metrics_all: if compare is selected, then metrics are output for the aggregate anomalies.
    """
    results_all = pd.DataFrame(index=df.index)
    for model in models:
        results_all[model] = models[model]['detected_event'] > 0
    results_all['detected_event'] = results_all.any(axis=1)
    results_all['observed'] = df['observed']

    if not verbose:
        for model in models:
            results_all = results_all.drop(model, 1)

    if compare:
        results_all['labeled_anomaly'] = df['labeled_anomaly']
        results_all['labeled_event'] = anomaly_events(results_all['labeled_anomaly'], wf=1)
        compare_events(results_all, wf=1)
        metrics_all = metrics(results_all)
        return results_all, metrics_all
    else:
        return results_all

anomaly_events(anomaly, wf=1, sf=0.05)

anomaly_events groups consecutively labeled data points as anomalous events by adding an index. Events may also be widened.

Parameters:
  • anomaly (``) – boolean series of labeled or detected anomalies where True (1) = anomalous data point. e.g., 0 0 0 0 1 1 1 1 0 0 0 0 0 0 0 1 1 1 0 0 0 1 0 0 0 1 1 1 1

  • wf (``) – positive integer, a widening factor that is used to determine how much to widen each event before and after the true values. Default = 1 adds a single anomalous point before/after the labeled point.

  • sf (``) – ratio between 0.0-1.0. a significance factor used to warn user when an event size is greater than this ratio compared to the entire data set. Default = 0.05 = 5%.

Returns:
  • event – integer series of enumerated event labels corresponding to each widened group of consecutive anomalous points. e.g., 0 0 0 1 1 1 1 1 1 0 0 0 0 0 2 2 2 2 2 0 3 3 3 0 4 4 4 4 4

Source code in PyHydroQC/anomaly_utilities.py
def anomaly_events(anomaly, wf=1, sf=0.05):
    """
    anomaly_events groups consecutively labeled data points as anomalous events by adding an index.
    Events may also be widened.
    Arguments:
        anomaly: boolean series of labeled or detected anomalies where True (1) = anomalous data point.
            e.g., 0 0 0 0 1 1 1 1 0 0 0 0 0 0 0 1 1 1 0 0 0 1 0 0 0 1 1 1 1
        wf: positive integer, a widening factor that is used to determine how much to widen each event
            before and after the true values. Default = 1 adds a single anomalous point before/after the labeled point.
        sf: ratio between 0.0-1.0. a significance factor used to warn user when an event size is greater
            than this ratio compared to the entire data set. Default = 0.05 = 5%.
    Returns:
        event: integer series of enumerated event labels corresponding to each widened group of consecutive anomalous points.
        e.g., 0 0 0 1 1 1 1 1 1 0 0 0 0 0 2 2 2 2 2 0 3 3 3 0 4 4 4 4 4
    """
    # initialize event variables
    event_count = 0
    event = []
    # handle the first wf data points in the entire time series
    for i in range(0, wf):
        event.append(0)

    # search through data assigning each point an event (positive integer) or 0 for points not belonging to an event
    for i in range(wf, len(anomaly) - wf):
        if (sum(anomaly[i - wf:i + wf + 1]) != 0):  # if there are anomalies within the wf window
            if (len(event) == 0):  # if event is empty
                event_count += 1  # increment the event counter
            elif (event[-1] == 0):  # if the last event value is 0, then a new event has been encountered
                event_count += 1  # increment the event counter
            event.append(event_count)  # append the event array with the current event number
        else:  # no anomalies are within the wf window
            event.append(0)  # this data point does not belong to an event, append 0

    # handle the last wf data points in the entire time series
    for i in range(0, wf):
        event.append(0)

    # determine if an event is greater than the significance factor
    event_values = pd.Series(event).value_counts()
    for i in range(1, len(event_values)):
        if event_values[i] > (sf * len(event)):
            print("WARNING: an event was found to be greater than the significance factor!")

    return event

assign_cm(val, len, wf)

assign_cm is a simple helper function used in compare_events

Parameters:
  • val (``) – string value to specify which area of the confusion matrix this point belongs to: 'tp', 'fp', or 'fn'

  • len (``) – how long the total array should be

  • wf (``) – integer widening factor that determines how many points should be turned into 'tn' on both edges

Returns:
  • cm – series of length len, with wf 'tn' at the beginning and the end filed with val in between

Source code in PyHydroQC/anomaly_utilities.py
def assign_cm(val, len, wf):
    """
    assign_cm is a simple helper function used in compare_events
    Arguments:
        val: string value to specify which area of the confusion matrix this point belongs to: 'tp', 'fp', or 'fn'
        len: how long the total array should be
        wf: integer widening factor that determines how many points should be turned into 'tn' on both edges
    Returns:
        cm: series of length len, with wf 'tn' at the beginning and the end filed with val in between
    """
    cm = ['tn' for i in range(len)]
    for i in range(wf, len - wf):
        cm[i] = val
    return cm

compare_events(df, wf=1)

compare_events compares anomalous events that are technician labeled and machine detected. Labeled and detected data may have been widened to increase the window of overlap.

Parameters:
  • wf (``) – integer widening factor used when generating events

  • df (``) – data frame with required columns: 'labeled_anomaly': series of booleans based on expert labeled anomalies. 'detected_anomaly': series of machine detected anomaly booleans based on modeling. 'labeled_event': series of numbered events based on expert labeled anomalies (output of anomaly_events function). 'detected_event': series of numbered events based on machine detected anomalies (output of anomaly_events function).

Returns:
  • df – orginal data frame with additional columns: 'grp': a new column representing the indices of event groups. 'conf_mtx': a new column that gives the confusion matrix value for each data point.

Source code in PyHydroQC/anomaly_utilities.py
def compare_events(df, wf=1):
    """
    compare_events compares anomalous events that are technician labeled and machine detected.
    Labeled and detected data may have been widened to increase the window of overlap.
    Arguments:
        wf: integer widening factor used when generating events
        df: data frame with required columns:
            'labeled_anomaly': series of booleans based on expert labeled anomalies.
            'detected_anomaly': series of machine detected anomaly booleans based on modeling.
            'labeled_event': series of numbered events based on expert labeled anomalies
            (output of anomaly_events function).
            'detected_event': series of numbered events based on machine detected anomalies
            (output of anomaly_events function).
    Returns:
        df: orginal data frame with additional columns:
            'grp': a new column representing the indices of event groups.
            'conf_mtx': a new column that gives the confusion matrix value for each data point.
    """

    # initialize variables
    grp_idx = 0
    df['grp'] = -1  # initialize to error flag
    df['conf_mtx'] = 'tn'
    prev_la = df['labeled_event'][0]
    prev_da = df['detected_event'][0]
    prev_gi = 0

    for i in range(0, len(df['labeled_event'])):  # for every row of data

        # if this row is an event transition case
        if (prev_la != df['labeled_event'][i] or prev_da != df['detected_event'][i]):

            # if coming from a true negative case
            if (prev_la == 0 and prev_da == 0):
                grp_idx += 1

            # if entering a true negative case
            elif (df['labeled_event'][i] == 0 and df['detected_event'][i] == 0):
                grp_idx += 1

            # if it's a complete flip-flop case
            elif (prev_la != df['labeled_event'][i] and prev_da != df['detected_event'][i]):
                grp_idx += 1

        df['grp'][i] = grp_idx  # update the group index for this row

        # if this row is a group transition
        if (grp_idx != prev_gi):
            # add confusion matrix category to previous group

            # if this event group is both labeled and detected as anomalous case
            if (any(df['detected_event'][df['grp'] == prev_gi]) and any(df['labeled_event'][df['grp'] == prev_gi])):
                # True Positive group
                df['conf_mtx'][df['grp'] == prev_gi] = assign_cm('tp', len(df[df['grp'] == prev_gi]), wf)
            # if this event group is detected as anomalous but not labeled
            elif (any(df['detected_event'][df['grp'] == prev_gi])):
                # False Positive group
                df['conf_mtx'][df['grp'] == prev_gi] = assign_cm('fp', len(df[df['grp'] == prev_gi]), wf)
            # if this event group is labeled as anomalous but not detected
            elif (any(df['labeled_event'][df['grp'] == prev_gi])):
                # False Negative group
                df['conf_mtx'][df['grp'] == prev_gi] = assign_cm('fn', len(df[df['grp'] == prev_gi]), wf)

        # update previous state variables
        prev_la = df['labeled_event'][i]
        prev_da = df['detected_event'][i]
        prev_gi = grp_idx

    df = df.drop(columns='grp')  # delete group index column

    return df

detect_anomalies(observed, predictions, residuals, threshold, summary=True)

detect_anomalies compares model residuals to thresholds to determine which points are anomalous.

Parameters:
  • observed (``) – data frame or series of observed data.

  • predictions (``) – series of model predictions.

  • residuals (``) – series of model residuals.

  • threshold (``) – data frame with the columns 'lower' and 'upper' corresponding to the acceptable range of the residual.

  • summary (``) – if True, will print the ratio of detections.

Returns:
  • detections – data frame with columns for observations, predictions, residuals, anomalies (boolean where True (1) = anomalous data point)

Source code in PyHydroQC/anomaly_utilities.py
def detect_anomalies(observed, predictions, residuals, threshold, summary=True):
    """
    detect_anomalies compares model residuals to thresholds to determine which points are anomalous.
    Arguments:
        observed: data frame or series of observed data.
        predictions: series of model predictions.
        residuals: series of model residuals.
        threshold: data frame with the columns 'lower' and 'upper' corresponding to the acceptable range of the residual.
        summary: if True, will print the ratio of detections.
    Returns:
        detections: data frame with columns for observations, predictions, residuals, anomalies
        (boolean where True (1) = anomalous data point)
    """
    detections = pd.DataFrame(observed)
    detections['prediction'] = np.array(predictions)
    detections['residual'] = np.array(residuals)
    detections['anomaly'] = (detections['residual'] < threshold['low']) | (threshold['high'] < detections['residual'])
    # anomalies = test_score_df[test_score_df.anomaly == True]

    # output summary
    if summary:
        print('ratio of detections: %f' % ((sum(detections.anomaly) / len(detections.anomaly)) * 100), '%')

    return detections

detect_anomalies_cons(residuals, threshold, summary=True)

Compares residuals to a constant threshold to identify anomalies. Can use set threshold level or threshold determined by set_cons_threshold function.

Parameters:
  • residuals (``) – series of model residuals.

  • threshold (``) – constant threshold value.

  • summary (``) – if True, will print the ratio of detections.

Returns:
  • detected_anomaly – boolean series where True (1) = anomalous data point

Source code in PyHydroQC/anomaly_utilities.py
def detect_anomalies_cons(residuals, threshold, summary=True):
    """
    Compares residuals to a constant threshold to identify anomalies. Can use set threshold level or threshold
    determined by set_cons_threshold function.
    Arguments:
        residuals: series of model residuals.
        threshold: constant threshold value.
        summary: if True, will print the ratio of detections.
    Returns:
        detected_anomaly: boolean series where True (1) = anomalous data point
    """
    # DETERMINE ANOMALIES
    detected_anomaly = (residuals[0] < threshold['low']) | (threshold['high'] < residuals[0])  # gives bools
    # output summary
    if summary:
        print('ratio of detections: %f' % ((sum(detected_anomaly) / len(detected_anomaly)) * 100), '%')

    return detected_anomaly

event_metrics(df)

event_metrics calculates an alternative set of metrics where every event is treated with equal weight regardless of size.

Parameters:
  • df (``) – data frame with required columns: 'conf_mtx': strings corresponding to confusion matrix categories: tp, tn, fp, fn

Returns:
  • true_positives – count of valid detection events. false_negatives: count of missed events. false_positives: count of incorrect detections. prc: precision of detections. npv: negative predicted value. acc: accuracy of detections. rcl: recall of detections. f1: statistic that balances true positives and false negatives. f2: statistic that gives more weight to true positives.

Source code in PyHydroQC/anomaly_utilities.py
def event_metrics(df):
    """
    event_metrics calculates an alternative set of metrics where every event is treated with equal weight
    regardless of size.
    Arguments:
        df: data frame with required columns:
            'conf_mtx': strings corresponding to confusion matrix categories: tp, tn, fp, fn
    Returns:
        true_positives: count of valid detection events.
        false_negatives: count of missed events.
        false_positives: count of incorrect detections.
        prc: precision of detections.
        npv: negative predicted value.
        acc: accuracy of detections.
        rcl: recall of detections.
        f1: statistic that balances true positives and false negatives.
        f2: statistic that gives more weight to true positives.
    """
    metrics = MetricsContainer()
    tp_events = 0
    fp_events = 0
    fn_events = 0
    prev_cm = 'tn'
    for i in range(0, len(df['conf_mtx'])):  # for every row of data

        # if the confusion matrix class has changed
        if (df['conf_mtx'][i] != prev_cm):

            if (df['conf_mtx'][i] == 'tp'):  # true positive case
                tp_events += 1
            elif (df['conf_mtx'][i] == 'fp'):  # false positive case
                fp_events += 1
            elif (df['conf_mtx'][i] == 'fn'):  # false negative case
                fn_events += 1
            prev_cm = df['conf_mtx'][i]

    # calculate metrics
    metrics.true_positives = tp_events
    metrics.false_positives = fp_events
    metrics.false_negatives = fn_events
    metrics.prc = metrics.ppv = tp_events / (tp_events + fp_events)
    metrics.rcl = tp_events / (tp_events + fn_events)
    metrics.f1 = 2.0 * (metrics.prc * metrics.rcl) / (metrics.prc + metrics.rcl)
    metrics.f2 = 5.0 * tp_events / \
                 (5.0 * tp_events + 4.0 * fn_events + fp_events)
    return metrics

get_data(sensors, filename='', site='', years='', path='')

get_data imports time series data from csv files. Files may specified explicitly by file name, or a series of files may be imported that follow a naming pattern with site and year (e.g. "MainStreet2014.csv"). Files should have columns corresponding to each sensor. If technician labels and corrections exist, they may be imported by naming columns sensor_cor and labeled_anomaly.

Parameters:
  • sensors (``) – list of name(s) of the sensor/variable data of interest. These must be the column names in data file(s).

  • filename (``) – string of the file name containing input data

  • site (``) – string of name of the data collection site

  • years (``) – list of the year(s) of interest

  • path (``) – path to .csv files containing the data of interest

Returns:
  • sensor_array – array of pandas DataFrames, each with 3 columns for the variable/sensor of interest: 'raw', 'cor', 'labeled_anomaly'.

Source code in PyHydroQC/anomaly_utilities.py
def get_data(sensors, filename="", site="", years="", path=""):
    """
    get_data imports time series data from csv files. Files may specified explicitly by file name, or a series of files
    may be imported that follow a naming pattern with site and year (e.g. "MainStreet2014.csv").
    Files should have columns corresponding to each sensor. If technician labels and corrections exist, they may be
    imported by naming columns sensor_cor and labeled_anomaly.
    Arguments:
        sensors: list of name(s) of the sensor/variable data of interest. These must be the column names in data file(s).
        filename: string of the file name containing input data
        site: string of name of the data collection site
        years: list of the year(s) of interest
        path: path to .csv files containing the data of interest
    Returns:
        sensor_array: array of pandas DataFrames, each with 3 columns for the variable/sensor of interest:
        'raw', 'cor', 'labeled_anomaly'.
    """
    if path == "":  # use default directory when none is provided
        path = os.getcwd() + "/"  # default directory is ./
    df_full = pd.DataFrame()  # start with empty dataframe and concatenate each file

    if filename:
        df_full = pd.read_csv(path + filename,
                              skipinitialspace=True,
                              engine='python',
                              header=0,
                              index_col=0,
                              parse_dates=True,
                              infer_datetime_format=True)
    if years:
        for yr in years:  # loop over each file
            df_year = pd.read_csv(path + site + str(yr) + ".csv",
                                  skipinitialspace=True,
                                  engine='python',
                                  header=0,
                                  index_col=0,
                                  parse_dates=True,
                                  infer_datetime_format=True)
            df_full = pd.concat([df_full, df_year], axis=0)

    # create data frames with raw, corrected, and labeled data (if the corrected and labeled data exist)
    sensor_array = dict()
    for snsr in sensors:
        df = []
        df = pd.DataFrame(index=df_full.index)
        df['raw'] = df_full[snsr]

        # if corrected data is available in dataset
        if snsr + '_cor' in df_full.columns:
            df['cor'] = df_full[snsr + '_cor']
        if snsr + "_qual" in df_full.columns:
            df['labeled_anomaly'] = ~df_full[snsr + '_qual'].isnull()
        sensor_array[snsr] = df

    return sensor_array

group_bools(df, column_in, column_out)

group_bools indexes each grouping of anomalies (1) and valid points (0) as numbered sets. Used for anomaly correction.

Parameters:
  • df (``) – data frame with required columns: 'detected_event': boolean array of classified data points

Returns:
  • df – original data frame with additional column: 'group' containing an index of boolean groupings

Source code in PyHydroQC/anomaly_utilities.py
def group_bools(df, column_in, column_out):
    """
    group_bools indexes each grouping of anomalies (1) and valid points (0) as numbered sets.
    Used for anomaly correction.
    Arguments:
        df: data frame with required columns:
            'detected_event': boolean array of classified data points
    Returns:
        df: original data frame with additional column:
            'group' containing an index of boolean groupings
    """
    # initialize the 'group' column to zeros
    df[column_out] = 0
    # initialize placeholder for boolean state of previous group
    last = df.iloc[0][column_in]
    # initialize the group index to zero
    gi = 0

    # loop over every row in dataframe
    for i in range(0, len(df[column_out])):

        # if the anomaly bool has changed
        if last != df.iloc[i][column_in]:
            gi += 1  # increment the group index
        # assign this row to the group index
        df.iloc[i, df.columns.get_loc(column_out)] = gi

        # update last boolean state
        last = df.iloc[i][column_in]

    return df

metrics(df)

metrics evaluates the performance of anomaly detection comparing detected anomalies to technician labeled anomalies. Output is contained in an object of the class MetricsContainer.

Parameters:
  • df (``) – data frame with required column: 'conf_mtx': strings corresponding to confusion matrix categories: tp, tn, fp, fn

Returns:
  • true_positives – count of data points from valid detections. false_negatives: count of data points from missed events. false_positives: count of data points from incorrect detections. true_negatives: count of valid undetected data. prc: is the precision of detections. npv: negative predicted value. acc: accuracy of detections. rcl: recall of detections. f1: statistic that balances true positives and false negatives. f2: statistic that gives more weight to true positives.

Source code in PyHydroQC/anomaly_utilities.py
def metrics(df):
    """
    metrics evaluates the performance of anomaly detection comparing detected anomalies to technician labeled anomalies.
    Output is contained in an object of the class MetricsContainer.
    Arguments:
        df: data frame with required column:
            'conf_mtx': strings corresponding to confusion matrix categories: tp, tn, fp, fn
    Returns:
        true_positives: count of data points from valid detections.
        false_negatives: count of data points from missed events.
        false_positives: count of data points from incorrect detections.
        true_negatives: count of valid undetected data.
        prc: is the precision of detections.
        npv: negative predicted value.
        acc: accuracy of detections.
        rcl: recall of detections.
        f1: statistic that balances true positives and false negatives.
        f2: statistic that gives more weight to true positives.
    """
    metrics = MetricsContainer()
    metrics.true_positives = len(df['conf_mtx'][df['conf_mtx'] == 'tp'])
    metrics.false_negatives = len(df['conf_mtx'][df['conf_mtx'] == 'fn'])
    metrics.false_positives = len(df['conf_mtx'][df['conf_mtx'] == 'fp'])
    metrics.true_negatives = len(df['conf_mtx'][df['conf_mtx'] == 'tn'])
    metrics.prc = metrics.ppv = metrics.true_positives / (metrics.true_positives + metrics.false_positives)
    metrics.npv = metrics.true_negatives / (metrics.true_negatives + metrics.false_negatives)
    metrics.acc = (metrics.true_positives + metrics.true_negatives) / len(df['conf_mtx'])
    metrics.rcl = metrics.true_positives / (metrics.true_positives + metrics.false_negatives)
    metrics.f1 = 2.0 * (metrics.prc * metrics.rcl) / (metrics.prc + metrics.rcl)
    metrics.f2 = 5.0 * metrics.true_positives / \
                 (5.0 * metrics.true_positives + 4.0 * metrics.false_negatives + metrics.false_positives)

    return metrics

set_cons_threshold(model_fit, alpha_in)

set_cons_threshold determines a threshold based on confidence interval and specified alpha for an ARIMA model.

Parameters:
  • model_fit (``) – SARIMAX model object.

  • alpha_in (``) – scalar between 0 and 1 representing the acceptable uncertainty.

Returns:
  • threshold – single value.

Source code in PyHydroQC/anomaly_utilities.py
def set_cons_threshold(model_fit, alpha_in):
    """
    set_cons_threshold determines a threshold based on confidence interval and specified alpha for an ARIMA model.
    Arguments:
        model_fit: SARIMAX model object.
        alpha_in: scalar between 0 and 1 representing the acceptable uncertainty.
    Returns:
        threshold: single value.
    """
    predict = model_fit.get_prediction()
    predict_ci = predict.conf_int(alpha=alpha_in)
    predict_ci.columns = ["lower", "upper"]
    predict_ci["lower"][0] = predict_ci["lower"][1]

    # This gives a constant interval for all points.
    # Could also try a threshold to maximize F2, but that requires having labeled data. Could base on a portion of data?
    thresholds = predict[0] - predict_ci["lower"]
    threshold = thresholds[-1]

    return threshold

set_dynamic_threshold(residuals, window_sz=96, alpha=0.01, min_range=0.0)

set_dynamic_threshold determines a threshold for each point based on the local confidence interval considering the model residuals looking forward and backward a specified number of steps.

Parameters:
  • residuals (``) – series like object or a data frame of model residuals.

  • alpha (``) – scalar between 0 and 1 representing the acceptable uncertainty.

  • window_sz (``) – integer representing how many data points to use in both directions. default = 96 for one day for 15-minute data.

Returns:
  • threshold – data frame of columns of low and high threshold values.

Source code in PyHydroQC/anomaly_utilities.py
def set_dynamic_threshold(residuals, window_sz=96, alpha=0.01, min_range=0.0):
    """
    set_dynamic_threshold determines a threshold for each point based on the local confidence interval
    considering the model residuals looking forward and backward a specified number of steps.
    Arguments:
        residuals: series like object or a data frame of model residuals.
        alpha: scalar between 0 and 1 representing the acceptable uncertainty.
        window_sz: integer representing how many data points to use in both directions.
            default = 96 for one day for 15-minute data.
    Returns:
        threshold: data frame of columns of low and high threshold values.
    """
    threshold = []  # initialize empty list to hold thresholds
    z = norm.ppf(1 - alpha / 2)

    # if the window size parameter is too big for this data set
    if (window_sz > len(residuals)):
        print("WARNING: in set_dynamic_threshold(), window_sz > len(data)! Reducing window_sz.")
        window_sz = len(residuals)  # reduce the window to the max allowable

    # loop through data and add each threshold pair
    for i in range(0, len(residuals)):
        if (window_sz > i):  # index is closer than window size to left edge of data
            lo = 0
        else:  # look back as far as the window size
            lo = i - window_sz
        if (i + window_sz > len(residuals)):  # index is close to right edge of data
            hi = len(residuals)
        else:  # look forward as far as the window size
            hi = i + window_sz

        # calculate the range of probable values using given alpha
        mean = residuals[lo:(hi + 1)].mean()
        sigma = residuals[lo:(hi + 1)].std()
        th_range = z * sigma
        if (th_range < min_range):
            th_range = min_range
        # append pair of upper and lower thresholds
        threshold.append([mean - th_range, mean + th_range])

    threshold = pd.DataFrame(threshold, columns=['low', 'high'])

    return threshold

xfade(xfor, xbac)

xfade ("cross-fade") blends two data sets of matching length with a ramp function (weighted average).

Parameters:
  • xfor (``) – forecasted data to be more weighted at the front

  • xbac (``) – backcasted data to be more weighted at the back

Returns:
  • x – the blended data

Source code in PyHydroQC/anomaly_utilities.py
def xfade(xfor, xbac):
    """
    xfade ("cross-fade") blends two data sets of matching length with a ramp function (weighted average).
    Arguments:
        xfor: forecasted data to be more weighted at the front
        xbac: backcasted data to be more weighted at the back
    Returns:
        x: the blended data
    """
    # if arrays are not matching in length
    if (len(xfor) != len(xbac)):
        # send error message
        print("ERROR in xfade() call: mismatched array lengths!")
    else:
        # initialize a weighting function
        fader = []

        # loop over the length of data
        for i in range(0, len(xfor)):
            # calculate the weights at each index
            fader.append((i + 1) / (len(xfor) + 1))
        # fader should be a ramp with positive slope between 0.0 and 1.0
        # use this to fade the back data
        xbac_faded = np.multiply(xbac, fader)

        # now flip the ramp to negative slope and fade the front data
        fader = np.flip(fader)
        xfor_faded = np.multiply(xfor, fader)

        # add the results
        x = xfor_faded + xbac_faded

    return x

Model Development and Training Functions (module: modeling_utilities)

build_arima_model(data, p, d, q, summary=True, suppress_warnings=True)

build_arima_model constructs and trains an ARIMA model.

Parameters:
  • data (``) – series or data frame of time series inputs.

  • p (``) – ARIMA hyperparameter that can be determined by manual assessment or by automated means.

  • d (``) – ARIMA hyperparameter that can be determined by manual assessment or by automated means.

  • q (``) – ARIMA hyperparameter that can be determined by manual assessment or by automated means.

  • summary (``) – indicates if the model summary should be printed.

  • suppress_warnings (``) – indicates whether warnings associated with ARIMA model development and fitting should be suppressed.

Returns:
  • model_fit – SARIMAX model object. residuals: series of model errors. predictions: model predictions determine as the in sample, one step ahead model forecasted values.

Source code in PyHydroQC/modeling_utilities.py
def build_arima_model(data, p, d, q, summary=True, suppress_warnings=True):
    """
    build_arima_model constructs and trains an ARIMA model.
    Arguments:
        data: series or data frame of time series inputs.
        p: ARIMA hyperparameter that can be determined by manual assessment or by automated means.
        d: ARIMA hyperparameter that can be determined by manual assessment or by automated means.
        q: ARIMA hyperparameter that can be determined by manual assessment or by automated means.
        summary: indicates if the model summary should be printed.
        suppress_warnings: indicates whether warnings associated with ARIMA model development and fitting should be suppressed.
    Returns:
        model_fit: SARIMAX model object.
        residuals: series of model errors.
        predictions: model predictions determine as the in sample, one step ahead model forecasted values.
    """

    if suppress_warnings:
        warnings.filterwarnings('ignore', message='A date index has been provided, but it has no associated frequency information and so will be ignored when e.g. forecasting')

    model = api.tsa.SARIMAX(data, order=(p, d, q))

    if suppress_warnings:
        warnings.filterwarnings('ignore', message='Non-stationary starting autoregressive parameters')
        warnings.filterwarnings('ignore', message='Non-invertible starting MA parameters found.')
        warnings.filterwarnings('ignore', message='ConvergenceWarning: Maximum Likelihood optimization failed to converge.')

    model_fit = model.fit(disp=0, warn_convergence=False)

    if suppress_warnings:
        warnings.filterwarnings('default')

    residuals = pd.DataFrame(model_fit.resid)
    predict = model_fit.get_prediction()
    predictions = pd.DataFrame(predict.predicted_mean)
    residuals[0][0] = 0
    predictions.predicted_mean[0] = data[0]

    # output summary
    if summary:
        print('\n\n')
        print(model_fit.summary())
        print('\n\nresiduals description:')
        print(residuals.describe())

    return model_fit, residuals, predictions

create_bidir_model(cells, time_steps, num_features, dropout, input_loss='mae', input_optimizer='adam')

Uses sequential model class from keras. Adds bidirectional layer. Adds LSTM vanilla layer.

Parameters:
  • time_steps (``) – number of steps to consider for each point.

  • num_features (``) – number of variables being considered

  • cells (``) – number of cells or nodes to be used to construct the model.

  • dropout (``) – ratio of cells to ignore for model training.

  • input_loss (``) – metric to be minimized for training. Default is mean average error ('mae').

  • input_optimizer (``) – algorithm for training model. Default is 'adam'.

Returns:
  • model – keras model structure

Source code in PyHydroQC/modeling_utilities.py
def create_bidir_model(cells, time_steps, num_features, dropout, input_loss='mae', input_optimizer='adam'):
    """
    Uses sequential model class from keras. Adds bidirectional layer. Adds LSTM vanilla layer.
    Arguments:
        time_steps: number of steps to consider for each point.
        num_features: number of variables being considered
        cells: number of cells or nodes to be used to construct the model.
        dropout: ratio of cells to ignore for model training.
        input_loss: metric to be minimized for training. Default is mean average error ('mae').
        input_optimizer: algorithm for training model. Default is 'adam'.
    Returns:
        model: keras model structure
    """
    model = Sequential()
    model.add(Bidirectional(LSTM(cells, dropout=dropout), input_shape=(time_steps*2, num_features)))
    model.add(Dense(num_features))
    model.compile(loss=input_loss, optimizer=input_optimizer)

    return model

create_bidir_sequenced_dataset(X, time_steps=10)

create_bidir_sequenced_dataset reshapes data to temporalize it into (samples, timestamps, features). Uses data before and after the point of interest (bidirectional).

Parameters:
  • X (``) – series of data to be reshaped.

  • time_steps (``) – number of past time steps to consider for each sample/row.

Returns:
  • Xs – array of data reshaped for input into an LSTM model. ys: array of data outputs corresponding to each Xs input.

Source code in PyHydroQC/modeling_utilities.py
def create_bidir_sequenced_dataset(X, time_steps=10):
    """
    create_bidir_sequenced_dataset reshapes data to temporalize it into (samples, timestamps, features). Uses data before and after the point of interest (bidirectional).
    Arguments:
        X: series of data to be reshaped.
        time_steps: number of past time steps to consider for each sample/row.
    Returns:
        Xs: array of data reshaped for input into an LSTM model.
        ys: array of data outputs corresponding to each Xs input.
    """
    Xs, ys = [], []  # start empty list
    for i in range(time_steps, len(X) - time_steps):  # loop within range of data frame minus the time steps
        v = pd.concat([X.iloc[(i - time_steps):i], X.iloc[(i + 1):(i + time_steps + 1)]]).values  # data from i backward and forward the specified number of time steps
        Xs.append(v)
        ys.append(X.iloc[i].values)

    return np.array(Xs).astype(np.float32), np.array(ys)  # convert lists into numpy arrays and return

create_bidir_training_dataset(X, anomalies, training_samples='', time_steps=10)

create_bidir_training_dataset creates a training dataset based on random selection of a specified number of points within the dataset. Uses data before and after the point of interest (bidirectional). Reshapes data to temporalize it into (samples, timestamps, features). Ensures that no data that has been corrected as part of preprocessing will be used for training the model.

Parameters:
  • X (``) – data to be reshaped.

  • anomalies (``) – series of booleans where True (1) = anomalous data point corresponding to the results of preprocessing.

  • training_samples (``) – number of observations used for training.

  • time_steps (``) – number of past time steps to consider for each sample/row.

Returns:
  • Xs – array of data reshaped for input into an LSTM model. ys: array of data outputs corresponding to each Xs input.

Source code in PyHydroQC/modeling_utilities.py
def create_bidir_training_dataset(X, anomalies, training_samples="", time_steps=10):
    """
    create_bidir_training_dataset creates a training dataset based on random selection of a specified number of points within the dataset. Uses data before and after the point of interest (bidirectional).
    Reshapes data to temporalize it into (samples, timestamps, features). Ensures that no data that has been corrected as part of preprocessing will be used for training the model.
    Arguments:
        X: data to be reshaped.
        anomalies: series of booleans where True (1) = anomalous data point corresponding to the results of preprocessing.
        training_samples: number of observations used for training.
        time_steps: number of past time steps to consider for each sample/row.
    Returns:
        Xs: array of data reshaped for input into an LSTM model.
        ys: array of data outputs corresponding to each Xs input.
    """
    Xs, ys = [], []  # start empty list
    if training_samples == "":
        training_samples = int(len(X) * 0.10)

    # create sample sequences from a randomized subset of the data series for training
    j = sample(range(time_steps, len(X) - time_steps), len(X) - 2 * time_steps)
    i = 0
    while (training_samples > len(ys)) and (i < len(j)):
        if not np.any(anomalies.iloc[(j[i] - time_steps):(j[i] + time_steps + 1)]):
            v = pd.concat([X.iloc[(j[i] - time_steps):j[i]], X.iloc[(j[i] + 1):(j[i] + time_steps + 1)]]).values
            ys.append(X.iloc[j[i]])
            Xs.append(v)
        i += 1

    return np.array(Xs).astype(np.float32), np.array(ys)  # convert lists into numpy arrays and return

create_scaler(data)

create_scaler creates a scaler object based on input data that removes mean and scales to unit vectors.

Parameters:
  • data (``) – a series of values to be scaled.

Returns:
  • scaler – a StandardScaler fit to the input data

Source code in PyHydroQC/modeling_utilities.py
def create_scaler(data):
    """
    create_scaler creates a scaler object based on input data that removes mean and scales to unit vectors.
    Arguments:
        data: a series of values to be scaled.
    Returns:
        scaler: a StandardScaler fit to the input data
    """
    scaler = StandardScaler()
    scaler = scaler.fit(data)

    return scaler

create_sequenced_dataset(X, time_steps=10)

create_sequenced_dataset reshapes data to temporalize it into (samples, timestamps, features).

Parameters:
  • X (``) – series of data to be reshaped.

  • time_steps (``) – number of past time steps to consider for each sample/row.

Returns:
  • Xs – array of data reshaped for input into an LSTM model. ys: array of data outputs corresponding to each Xs input.

Source code in PyHydroQC/modeling_utilities.py
def create_sequenced_dataset(X, time_steps=10):
    """
    create_sequenced_dataset reshapes data to temporalize it into (samples, timestamps, features).
    Arguments:
        X: series of data to be reshaped.
        time_steps: number of past time steps to consider for each sample/row.
    Returns:
        Xs: array of data reshaped for input into an LSTM model.
        ys: array of data outputs corresponding to each Xs input.
    """
    Xs, ys = [], []  # start empty list
    for i in range(len(X) - time_steps):  # loop within range of data frame minus the time steps
        v = X.iloc[i:(i + time_steps)].values  # data from i to end of the time step
        Xs.append(v)
        ys.append(X.iloc[i + time_steps].values)

    return np.array(Xs), np.array(ys)  # convert lists into numpy arrays and return

create_training_dataset(X, anomalies, training_samples='', time_steps=10)

create_training_dataset creates a training dataset based on random selection of a specified number of points within the dataset. Reshapes data to temporalize it into (samples, timestamps, features). Ensures that no data that has been corrected as part of preprocessing will be used for training the model.

Parameters:
  • X (``) – series of data to be reshaped.

  • anomalies (``) – series of booleans where True (1) = anomalous data point corresponding to the results of preprocessing.

  • training_samples (``) – number of observations used for training.

  • time_steps (``) – number of past time steps to consider for each sample/row.

Returns:
  • Xs – array of data reshaped for input into an LSTM model. ys: array of data output corresponding to each Xs input.

Source code in PyHydroQC/modeling_utilities.py
def create_training_dataset(X, anomalies, training_samples="", time_steps=10):
    """
    create_training_dataset creates a training dataset based on random selection of a specified number of points within the dataset.
    Reshapes data to temporalize it into (samples, timestamps, features). Ensures that no data that has been corrected as part of preprocessing will be used for training the model.
    Arguments:
        X: series of data to be reshaped.
        anomalies: series of booleans where True (1) = anomalous data point corresponding to the results of preprocessing.
        training_samples: number of observations used for training.
        time_steps: number of past time steps to consider for each sample/row.
    Returns:
        Xs: array of data reshaped for input into an LSTM model.
        ys: array of data output corresponding to each Xs input.
    """
    Xs, ys = [], []  # start empty list
    if training_samples == "":
        training_samples = int(len(X) * 0.10)

    # create sample sequences from a randomized subset of the data series for training
    j = sample(range(0, len(X) - time_steps - 2), len(X) - time_steps - 2)
    i = 0
    while (training_samples > len(ys)) and (i < len(j)):
        if not np.any(anomalies.iloc[j[i]:(j[i] + time_steps + 1)]):
            v = X.iloc[j[i]:(j[i] + time_steps)].values
            ys.append(X.iloc[j[i] + time_steps])
            Xs.append(v)
        i += 1

    return np.array(Xs), np.array(ys)  # convert lists into numpy arrays and return

create_vanilla_model(cells, time_steps, num_features, dropout, input_loss='mae', input_optimizer='adam')

Uses sequential model class from keras. Adds LSTM vanilla layer.

Parameters:
  • time_steps (``) – number of steps to consider for each point.

  • num_features (``) – number of variables being considered

  • cells (``) – number of cells or nodes to be used to construct the model.

  • dropout (``) – ratio of cells to ignore for model training.

  • input_loss (``) – metric to be minimized for training. Default is mean average error ('mae').

  • input_optimizer (``) – algorithm for training model. Default is 'adam'.

Returns:
  • model – keras model structure

Source code in PyHydroQC/modeling_utilities.py
def create_vanilla_model(cells, time_steps, num_features, dropout, input_loss='mae', input_optimizer='adam'):
    """
    Uses sequential model class from keras. Adds LSTM vanilla layer.
    Arguments:
        time_steps: number of steps to consider for each point.
        num_features: number of variables being considered
        cells: number of cells or nodes to be used to construct the model.
        dropout: ratio of cells to ignore for model training.
        input_loss: metric to be minimized for training. Default is mean average error ('mae').
        input_optimizer: algorithm for training model. Default is 'adam'.
    Returns:
        model: keras model structure
    """
    model = Sequential()
    model.add(LSTM(cells, input_shape=(time_steps, num_features), dropout=dropout)),  # one LSTM layer with dropout regularization
    model.add(Dense(num_features))
    model.compile(loss=input_loss, optimizer=input_optimizer)

    return model

LSTM_multivar(df_observed, df_anomaly, df_raw, LSTM_params, summary, name, model_output=True, model_save=True)

LSTM_multivar builds, trains, and evaluates a vanilla LSTM model for multivariate data.

Source code in PyHydroQC/modeling_utilities.py
def LSTM_multivar(df_observed, df_anomaly, df_raw, LSTM_params, summary, name, model_output=True, model_save=True):
    """
    LSTM_multivar builds, trains, and evaluates a vanilla LSTM model for multivariate data.
    """
    scaler = create_scaler(df_observed)
    df_scaled = pd.DataFrame(scaler.transform(df_observed), index=df_observed.index, columns=df_observed.columns)

    X_train, y_train = create_training_dataset(df_scaled, df_anomaly, LSTM_params['samples'], LSTM_params['time_steps'])
    num_features = X_train.shape[2]

    if summary:
        print('X_train shape: ' + str(X_train.shape))
        print('y_train shape: ' + str(y_train.shape))
        print('Number of features: ' + str(num_features))

    model = create_vanilla_model(LSTM_params['cells'], LSTM_params['time_steps'], num_features, LSTM_params['dropout'])
    if summary:
        model.summary()
        verbose = 1
    else:
        verbose = 0
    history = train_model(X_train, y_train, model, LSTM_params['patience'], verbose=verbose)

    df_raw_scaled = pd.DataFrame(scaler.transform(df_raw), index=df_raw.index, columns=df_raw.columns)
    X_test, y_test = create_sequenced_dataset(df_raw_scaled, LSTM_params['time_steps'])

    train_pred = model.predict(X_train)
    test_pred = model.predict(X_test)
    model_eval = model.evaluate(X_test, y_test)

    train_predictions = pd.DataFrame(scaler.inverse_transform(train_pred))
    predictions = pd.DataFrame(scaler.inverse_transform(test_pred))
    y_train_unscaled = pd.DataFrame(scaler.inverse_transform(y_train))
    y_test_unscaled = pd.DataFrame(scaler.inverse_transform(y_test))

    train_residuals = pd.DataFrame(np.abs(train_predictions - y_train_unscaled))
    test_residuals = pd.DataFrame(np.abs(predictions - y_test_unscaled))

    if model_save:
        model.save('originalsavedoutput/models/LSTM_multivar_' + str(name))

    LSTM_multivar = LSTMModelContainer()
    if model_output:
        LSTM_multivar.model = model
        LSTM_multivar.history = history
    LSTM_multivar.X_train = X_train
    LSTM_multivar.y_train = y_train
    LSTM_multivar.X_test = X_test
    LSTM_multivar.y_test = y_test
    LSTM_multivar.model_eval = model_eval
    LSTM_multivar.predictions = predictions
    LSTM_multivar.train_residuals = train_residuals
    LSTM_multivar.test_residuals = test_residuals

    return LSTM_multivar

LSTM_multivar_bidir(df_observed, df_anomaly, df_raw, LSTM_params, summary, name, model_output=True, model_save=True)

LSTM_multivar_bidir builds, trains, and evaluates a bidirectional LSTM model for multivariate data.

Source code in PyHydroQC/modeling_utilities.py
def LSTM_multivar_bidir(df_observed, df_anomaly, df_raw, LSTM_params, summary, name, model_output=True, model_save=True):
    """
    LSTM_multivar_bidir builds, trains, and evaluates a bidirectional LSTM model for multivariate data.
    """
    scaler = create_scaler(df_observed)
    df_scaled = pd.DataFrame(scaler.transform(df_observed), index=df_observed.index, columns=df_observed.columns)

    X_train, y_train = create_bidir_training_dataset(df_scaled, df_anomaly, LSTM_params['samples'], LSTM_params['time_steps'])
    num_features = X_train.shape[2]

    if summary:
        print('X_train shape: ' + str(X_train.shape))
        print('y_train shape: ' + str(y_train.shape))
        print('Number of features: ' + str(num_features))

    model = create_bidir_model(LSTM_params['cells'], LSTM_params['time_steps'], num_features, LSTM_params['dropout'])
    if summary:
        model.summary()
        verbose = 1
    else:
        verbose = 0
    history = train_model(X_train, y_train, model, LSTM_params['patience'], verbose=verbose)

    df_raw_scaled = pd.DataFrame(scaler.transform(df_raw), index=df_raw.index, columns=df_raw.columns)
    X_test, y_test = create_bidir_sequenced_dataset(df_raw_scaled, LSTM_params['time_steps'])

    train_pred = model.predict(X_train)
    test_pred = model.predict(X_test)
    model_eval = model.evaluate(X_test, y_test)

    train_predictions = pd.DataFrame(scaler.inverse_transform(train_pred))
    predictions = pd.DataFrame(scaler.inverse_transform(test_pred))
    y_train_unscaled = pd.DataFrame(scaler.inverse_transform(y_train))
    y_test_unscaled = pd.DataFrame(scaler.inverse_transform(y_test))

    train_residuals = pd.DataFrame(np.abs(train_predictions - y_train_unscaled))
    test_residuals = pd.DataFrame(np.abs(predictions - y_test_unscaled))

    if model_save:
        model.save('originalsavedoutput/models/LSTM_multiivar_bidir_' + str(name))

    LSTM_multivar_bidir = LSTMModelContainer()
    if model_output:
        LSTM_univar_bidir.model = model
        LSTM_univar_bidir.history = history
    LSTM_multivar_bidir.X_train = X_train
    LSTM_multivar_bidir.y_train = y_train
    LSTM_multivar_bidir.X_test = X_test
    LSTM_multivar_bidir.y_test = y_test
    LSTM_multivar_bidir.model_eval = model_eval
    LSTM_multivar_bidir.predictions = predictions
    LSTM_multivar_bidir.train_residuals = train_residuals
    LSTM_multivar_bidir.test_residuals = test_residuals

    return LSTM_multivar_bidir

LSTM_univar(df, LSTM_params, summary, name, model_output=True, model_save=True)

LSTM_univar builds, trains, and evaluates a vanilla LSTM model for univariate data.

Source code in PyHydroQC/modeling_utilities.py
def LSTM_univar(df, LSTM_params, summary, name, model_output=True, model_save=True):
    """
    LSTM_univar builds, trains, and evaluates a vanilla LSTM model for univariate data.
    """
    scaler = create_scaler(df[['observed']])
    df['obs_scaled'] = scaler.transform(df[['observed']])

    X_train, y_train = create_training_dataset(df[['obs_scaled']], df[['anomaly']], LSTM_params['samples'], LSTM_params['time_steps'])
    num_features = X_train.shape[2]

    if summary:
        print('X_train shape: ' + str(X_train.shape))
        print('y_train shape: ' + str(y_train.shape))
        print('Number of features: ' + str(num_features))

    model = create_vanilla_model(LSTM_params['cells'], LSTM_params['time_steps'], num_features, LSTM_params['dropout'])
    if summary:
        model.summary()
        verbose = 1
    else:
        verbose = 0
    history = train_model(X_train, y_train, model, LSTM_params['patience'], verbose=verbose)

    df['raw_scaled'] = scaler.transform(df[['raw']])
    X_test, y_test = create_sequenced_dataset(df[['raw_scaled']], LSTM_params['time_steps'])

    train_pred = model.predict(X_train)
    test_pred = model.predict(X_test)
    model_eval = model.evaluate(X_test, y_test)

    train_predictions = pd.DataFrame(scaler.inverse_transform(train_pred))
    predictions = pd.DataFrame(scaler.inverse_transform(test_pred))
    y_train_unscaled = pd.DataFrame(scaler.inverse_transform(y_train))
    y_test_unscaled = pd.DataFrame(scaler.inverse_transform(y_test))

    train_residuals = pd.DataFrame(np.abs(train_predictions - y_train_unscaled))
    test_residuals = pd.DataFrame(np.abs(predictions - y_test_unscaled))

    if model_save:
        model.save('originalsavedoutput/models/LSTM_univar_' + str(name))

    LSTM_univar = LSTMModelContainer()
    if model_output:
        LSTM_univar.model = model
        LSTM_univar.history = history
    LSTM_univar.X_train = X_train
    LSTM_univar.y_train = y_train
    LSTM_univar.X_test = X_test
    LSTM_univar.y_test = y_test
    LSTM_univar.model_eval = model_eval
    LSTM_univar.predictions = predictions
    LSTM_univar.train_residuals = train_residuals
    LSTM_univar.test_residuals = test_residuals

    return LSTM_univar

LSTM_univar_bidir(df, LSTM_params, summary, name, model_output=True, model_save=True)

LSTM_univar_bidir builds, trains, and evaluates a bidirectional LSTM model for univariate data.

Source code in PyHydroQC/modeling_utilities.py
def LSTM_univar_bidir(df, LSTM_params, summary, name, model_output=True, model_save=True):
    """
    LSTM_univar_bidir builds, trains, and evaluates a bidirectional LSTM model for univariate data.
    """
    scaler = create_scaler(df[['observed']])
    df['obs_scaled'] = scaler.transform(df[['observed']])

    X_train, y_train = create_bidir_training_dataset(df[['obs_scaled']], df[['anomaly']], LSTM_params['samples'], LSTM_params['time_steps'])

    num_features = X_train.shape[2]

    if summary:
        print('X_train shape: ' + str(X_train.shape))
        print('y_train shape: ' + str(y_train.shape))
        print('Number of features: ' + str(num_features))

    model = create_bidir_model(LSTM_params['cells'], LSTM_params['time_steps'], num_features, LSTM_params['dropout'])
    if summary:
        model.summary()
        verbose = 1
    else:
        verbose = 0
    history = train_model(X_train, y_train, model, LSTM_params['patience'], verbose=verbose)

    df['raw_scaled'] = scaler.transform(df[['raw']])
    X_test, y_test = create_bidir_sequenced_dataset(df[['raw_scaled']], LSTM_params['time_steps'])

    train_pred = model.predict(X_train)
    test_pred = model.predict(X_test)
    model_eval = model.evaluate(X_test, y_test)

    train_predictions = pd.DataFrame(scaler.inverse_transform(train_pred))
    predictions = pd.DataFrame(scaler.inverse_transform(test_pred))
    y_train_unscaled = pd.DataFrame(scaler.inverse_transform(y_train))
    y_test_unscaled = pd.DataFrame(scaler.inverse_transform(y_test))

    train_residuals = pd.DataFrame(np.abs(train_predictions - y_train_unscaled))
    test_residuals = pd.DataFrame(np.abs(predictions - y_test_unscaled))

    if model_save:
        model.save('originalsavedoutput/models/LSTM_univar_bidir_' + str(name))

    LSTM_univar_bidir = LSTMModelContainer()
    if model_output:
        LSTM_univar_bidir.model = model
        LSTM_univar_bidir.history = history
    LSTM_univar_bidir.X_train = X_train
    LSTM_univar_bidir.y_train = y_train
    LSTM_univar_bidir.X_test = X_test
    LSTM_univar_bidir.y_test = y_test
    LSTM_univar_bidir.model_eval = model_eval
    LSTM_univar_bidir.predictions = predictions
    LSTM_univar_bidir.train_residuals = train_residuals
    LSTM_univar_bidir.test_residuals = test_residuals

    return LSTM_univar_bidir

pdq(data)

pdq is an automated function for finding the order (hyperparameters) of an ARIMA model.

Parameters:
  • data (``) – series of observations

Returns:
  • pdq – list of (p,d,q) ARIMA hyperparameters

Source code in PyHydroQC/modeling_utilities.py
def pdq(data):
    """
    pdq is an automated function for finding the order (hyperparameters) of an ARIMA model.
    Arguments:
        data: series of observations
    Returns:
        pdq: list of (p,d,q) ARIMA hyperparameters
    """
    model = pm.auto_arima(np.array(data), seasonal=False, suppress_warnings=True, error_action="ignore")
    (p, d, q) = model.order
    pdq = [p, d, q]

    return pdq

train_model(X_train, y_train, model, patience, monitor='val_loss', mode='min', epochs=100, verbose=1, batch_size=32, validation_split=0.1)

train_model fits the model to training data. Early stopping ensures that too many epochs of training are not used. Monitors the validation loss for improvements and stops training when improvement stops.

Parameters:
  • X_train (``) – training input data (output of one of the create_training_data functions)

  • y_train (``) – training output data (output of one of the create_training_data functions)

  • model (``) – existing LSTM model structure (output of one of the create_model functions)

  • patience (``) – how many epochs to wait for early stopping.

  • epochs (``) – how many training epochs to use.

  • verbose (``) – what type of output to show. 0 is silent. 1 is progress bar. 2 is one line per epoch.

  • batch_size (``) – hyperparameter.

  • validation_split (``) – indicates how much data to use for internal training.

Returns:
  • history – keras model.fit object

Source code in PyHydroQC/modeling_utilities.py
def train_model(X_train, y_train, model, patience, monitor='val_loss', mode='min', epochs=100, verbose=1, batch_size=32,
                validation_split=0.1):
    """
    train_model fits the model to training data. Early stopping ensures that too many epochs of training are not used.
    Monitors the validation loss for improvements and stops training when improvement stops.
    Arguments:
        X_train: training input data (output of one of the create_training_data functions)
        y_train: training output data (output of one of the create_training_data functions)
        model: existing LSTM model structure (output of one of the create_model functions)
        patience: how many epochs to wait for early stopping.
        epochs: how many training epochs to use.
        verbose: what type of output to show. 0 is silent. 1 is progress bar. 2 is one line per epoch.
        batch_size: hyperparameter.
        validation_split: indicates how much data to use for internal training.
    Returns:
        history: keras model.fit object
    """
    es = tf.keras.callbacks.EarlyStopping(monitor=monitor, patience=patience, mode=mode)
    history = model.fit(
        X_train, y_train,
        epochs=epochs,  # just set to something high, early stopping will monitor.
        verbose=verbose, # how to give output. 0 is silent. 1 is progress bar. 2 is one line per epoch.
        batch_size=batch_size,  # this can be optimized later
        validation_split=validation_split,  # use 10% of data for validation, use 90% for training.
        callbacks=[es],  # early stopping similar to earlier
        shuffle=False,  # because order matters
        )

    return history

Workflow Functions for Model Based Anomaly Detection (model_workflow)

ModelType

ARIMA_detect(df, sensor, params, rules=False, plots=False, summary=True, compare=False, suppress_warnings=True)

Source code in PyHydroQC/model_workflow.py
def ARIMA_detect(df, sensor, params,
                 rules=False, plots=False, summary=True, compare=False, suppress_warnings=True):
    """
    """
    print('\nProcessing ARIMA detections.')
    # RULES BASED DETECTION #
    if rules:
        df = rules_detect.range_check(df, params['max_range'], params['min_range'])
        df = rules_detect.persistence(df, params['persist'])
        size = rules_detect.group_size(df)
        df = rules_detect.interpolate(df)
        print(sensor + ' rules based detection complete. Longest detected group = ' + str(size))

    # MODEL CREATION #
    [p, d, q] = params['pdq']
    model_fit, residuals, predictions = modeling_utilities.build_arima_model(df['observed'], p, d, q, summary, suppress_warnings)
    print(sensor + ' ARIMA model complete.')

    # DETERMINE THRESHOLD AND DETECT ANOMALIES #
    threshold = anomaly_utilities.set_dynamic_threshold(residuals[0], params['window_sz'], params['alpha'], params['threshold_min'])
    threshold.index = residuals.index
    if plots:
        plt.figure()
        anomaly_utilities.plt_threshold(residuals, threshold, sensor)
        plt.show()
    print('Threshold determination complete.')
    detections = anomaly_utilities.detect_anomalies(df['observed'], predictions, residuals, threshold, summary=True)

    # WIDEN AND NUMBER ANOMALOUS EVENTS #
    df['detected_anomaly'] = detections['anomaly']
    df['all_anomalies'] = df.eval('detected_anomaly or anomaly')
    df['detected_event'] = anomaly_utilities.anomaly_events(df['all_anomalies'], params['widen'])

    if compare:
        df['labeled_event'] = anomaly_utilities.anomaly_events(df['labeled_anomaly'], params['widen'])
        # DETERMINE METRICS #
        anomaly_utilities.compare_events(df, params['widen'])
        metrics = anomaly_utilities.metrics(df)
        e_metrics = anomaly_utilities.event_metrics(df)
        # OUTPUT RESULTS #
        print('Model type: ARIMA')
        print('Sensor: ' + sensor)
        anomaly_utilities.print_metrics(metrics)
        print('Event based calculations:')
        anomaly_utilities.print_metrics(e_metrics)
        print('Model report complete\n')

    # GENERATE PLOTS #
    if plots:
        plt.figure()
        anomaly_utilities.plt_results(
            raw=df['raw'],
            predictions=detections['prediction'],
            labels=df['labeled_event'],
            detections=df['detected_event'],
            sensor=sensor
        )
        plt.show()

    ARIMA_detect = ModelWorkflow()
    ARIMA_detect.df = df
    ARIMA_detect.model_fit = model_fit
    ARIMA_detect.threshold = threshold
    ARIMA_detect.detections = detections
    if compare:
        ARIMA_detect.metrics = metrics
        ARIMA_detect.e_metrics = e_metrics

    return ARIMA_detect

LSTM_detect_multivar(sensor_array, sensors, params, LSTM_params, model_type, name='', rules=False, plots=False, summary=True, compare=False, model_output=True, model_save=True)

Source code in PyHydroQC/model_workflow.py
def LSTM_detect_multivar(sensor_array, sensors, params, LSTM_params, model_type, name='',
                rules=False, plots=False, summary=True, compare=False, model_output=True, model_save=True):
    """
    """
    print('\nProcessing LSTM multivariate ' + str(model_type) + ' detections.')
    # RULES BASED DETECTION #
    if rules:
        size = dict()
        for snsr in sensors:
            sensor_array[snsr], r_c = rules_detect.range_check(sensor_array[snsr], params[snsr].max_range, params[snsr].min_range)
            sensor_array[snsr], p_c = rules_detect.persistence(sensor_array[snsr], params[snsr].persist)
            size[snsr] = rules_detect.group_size(sensor_array[snsr])
            sensor_array[snsr] = rules_detect.interpolate(sensor_array[snsr])
            print(snsr + ' maximum detected group length = ' + str(size[snsr]))
        print('Rules based detection complete.\n')
    # Create new data frames with raw and observed (after applying rules) and preliminary anomaly detections for selected sensors
    df_raw = pd.DataFrame(index=sensor_array[sensors[0]].index)
    df_observed = pd.DataFrame(index=sensor_array[sensors[0]].index)
    df_anomaly = pd.DataFrame(index=sensor_array[sensors[0]].index)
    for snsr in sensors:
        df_raw[snsr + '_raw'] = sensor_array[snsr]['raw']
        df_observed[snsr + '_obs'] = sensor_array[snsr]['observed']
        df_anomaly[snsr + '_anom'] = sensor_array[snsr]['anomaly']
    print('Raw data shape: ' + str(df_raw.shape))
    print('Observed data shape: ' + str(df_observed.shape))
    print('Initial anomalies data shape: ' + str(df_anomaly.shape))

    # MODEL CREATION #
    if model_type == ModelType.VANILLA:
        model = modeling_utilities.LSTM_multivar(df_observed, df_anomaly, df_raw, LSTM_params, summary, name, model_output, model_save)
    elif model_type == ModelType.BIDIRECTIONAL:
        model = modeling_utilities.LSTM_multivar_bidir(df_observed, df_anomaly, df_raw, LSTM_params, summary, name, model_output, model_save)

    print('multivariate ' + str(model_type) + ' LSTM model complete.\n')
    # Plot Metrics and Evaluate the Model
    if plots:
        plt.figure()
        plt.plot(model.history.history['loss'], label='Training Loss')
        plt.plot(model.history.history['val_loss'], label='Validation Loss')
        plt.legend()
        plt.show()

    # DETERMINE THRESHOLD AND DETECT ANOMALIES #
    ts = LSTM_params['time_steps']
    residuals = pd.DataFrame(model.test_residuals)
    residuals.columns = sensors
    predictions = pd.DataFrame(model.predictions)
    predictions.columns = sensors
    if model_type == ModelType.VANILLA:
        residuals.index = df_observed[ts:].index
        predictions.index = df_observed[ts:].index
        observed = df_observed[ts:]
    elif model_type == ModelType.BIDIRECTIONAL:
        residuals.index = df_observed[ts:-ts].index
        predictions.index = df_observed[ts:-ts].index
        observed = df_observed[ts:-ts]

    threshold = dict()
    detections = dict()
    for snsr in sensors:
        threshold[snsr] = anomaly_utilities.set_dynamic_threshold(
            residuals[snsr], params[snsr]['window_sz'], params[snsr]['alpha'], params[snsr]['threshold_min'])
        threshold[snsr].index = residuals.index
        detections[snsr] = anomaly_utilities.detect_anomalies(
            observed[snsr+'_obs'], predictions[snsr], residuals[snsr], threshold[snsr], summary=True)
        if plots:
            plt.figure()
            anomaly_utilities.plt_threshold(residuals[snsr], threshold[snsr], sensors[snsr])
            plt.show()
    print('Threshold determination complete.')

    # WIDEN AND NUMBER ANOMALOUS EVENTS #
    all_data = dict()
    for snsr in sensors:
        if model_type == ModelType.VANILLA:
            all_data[snsr] = sensor_array[snsr].iloc[ts:]
        elif model_type == ModelType.BIDIRECTIONAL:
            all_data[snsr] = sensor_array[snsr].iloc[ts:-ts]
        all_data[snsr]['detected_anomaly'] = detections[snsr]['anomaly']
        all_data[snsr]['all_anomalies'] = all_data[snsr].eval('detected_anomaly or anomaly')
        all_data[snsr]['detected_event'] = anomaly_utilities.anomaly_events(all_data[snsr]['all_anomalies'], params[snsr]['widen'])

    # COMPARE AND DETERMINE METRICS #
    if compare:
        metrics = dict()
        e_metrics = dict()
        for snsr in sensors:
            all_data[snsr]['labeled_event'] = anomaly_utilities.anomaly_events(all_data[snsr]['labeled_anomaly'], params[snsr]['widen'])
            anomaly_utilities.compare_events(all_data[snsr], params[snsr]['widen'])
            metrics[snsr] = anomaly_utilities.metrics(all_data[snsr])
            e_metrics[snsr] = anomaly_utilities.event_metrics(all_data[snsr])
        # OUTPUT RESULTS #
            print('\nModel type: LSTM multivariate ' + str(model_type))
            print('Sensor: ' + snsr)
            anomaly_utilities.print_metrics(metrics[snsr])
            print('Event based calculations:')
            anomaly_utilities.print_metrics(e_metrics[snsr])
        print('Model report complete\n')

    # GENERATE PLOTS #
    if plots:
        for snsr in sensors:
            plt.figure()
            anomaly_utilities.plt_results(
                raw=sensor_array[snsr]['raw'],
                predictions=detections[snsr]['prediction'],
                labels=sensor_array[snsr]['labeled_event'],
                detections=all_data[snsr]['detected_event'],
                sensor=snsr
                )
            plt.show()

    LSTM_detect_multivar = ModelWorkflow()
    LSTM_detect_multivar.sensor_array = sensor_array
    LSTM_detect_multivar.df_observed = df_observed
    LSTM_detect_multivar.df_raw = df_raw
    LSTM_detect_multivar.df_anomaly = df_anomaly
    LSTM_detect_multivar.model = model
    LSTM_detect_multivar.threshold = threshold
    LSTM_detect_multivar.detections = detections
    LSTM_detect_multivar.all_data = all_data
    if compare:
        LSTM_detect_multivar.metrics = metrics
        LSTM_detect_multivar.e_metrics = e_metrics

    return LSTM_detect_multivar

LSTM_detect_univar(df, sensor, params, LSTM_params, model_type, name='', rules=False, plots=False, summary=True, compare=False, model_output=True, model_save=True)

Source code in PyHydroQC/model_workflow.py
def LSTM_detect_univar(df, sensor, params, LSTM_params, model_type, name='',
                rules=False, plots=False, summary=True, compare=False, model_output=True, model_save=True):
    """
    """
    print('\nProcessing LSTM univariate ' + str(model_type) + ' detections.')
    # RULES BASED DETECTION #
    if rules:
        df = rules_detect.range_check(df, params['max_range'], params['min_range'])
        df = rules_detect.persistence(df, params['persist'])
        size = rules_detect.group_size(df)
        df = rules_detect.interpolate(df)
        print(sensor + ' rules based detection complete. Maximum detected group length = '+str(size))

    # MODEL CREATION #
    if model_type == ModelType.VANILLA:
        model = modeling_utilities.LSTM_univar(df, LSTM_params, summary, name, model_output, model_save)
    elif model_type == ModelType.BIDIRECTIONAL:
        model = modeling_utilities.LSTM_univar_bidir(df, LSTM_params, summary, name, model_output, model_save)
    print(sensor + ' ' + str(model_type) + ' LSTM model complete.')
    if plots:
        plt.figure()
        plt.plot(model.history.history['loss'], label='Training Loss')
        plt.plot(model.history.history['val_loss'], label='Validation Loss')
        plt.legend()
        plt.show()

    # DETERMINE THRESHOLD AND DETECT ANOMALIES #
    ts = LSTM_params['time_steps']
    threshold = anomaly_utilities.set_dynamic_threshold(model.test_residuals[0], params['window_sz'], params['alpha'], params['threshold_min'])
    if model_type == ModelType.VANILLA:
        threshold.index = df[ts:].index
    elif model_type == ModelType.BIDIRECTIONAL:
        threshold.index = df[ts:-ts].index
    residuals = pd.DataFrame(model.test_residuals)
    residuals.index = threshold.index
    if plots:
        plt.figure()
        anomaly_utilities.plt_threshold(residuals, threshold, sensor)
        plt.show()
    if model_type == ModelType.VANILLA:
        observed = df[['observed']][ts:]
    elif model_type == ModelType.BIDIRECTIONAL:
        observed = df[['observed']][ts:-ts]
    print('Threshold determination complete.')
    detections = anomaly_utilities.detect_anomalies(observed, model.predictions, model.test_residuals,
                                                    threshold, summary=True)

    # WIDEN AND NUMBER ANOMALOUS EVENTS #
    if model_type == ModelType.VANILLA:
        df_anomalies = df.iloc[ts:]
    elif model_type == ModelType.BIDIRECTIONAL:
        df_anomalies = df.iloc[ts:-ts]
    df_anomalies['detected_anomaly'] = detections['anomaly']
    df_anomalies['all_anomalies'] = df_anomalies.eval('detected_anomaly or anomaly')
    df_anomalies['detected_event'] = anomaly_utilities.anomaly_events(df_anomalies['all_anomalies'], params['widen'])

    if compare:
        df_anomalies['labeled_event'] = anomaly_utilities.anomaly_events(df_anomalies['labeled_anomaly'], params['widen'])
        # DETERMINE METRICS #
        anomaly_utilities.compare_events(df_anomalies, params['widen'])
        metrics = anomaly_utilities.metrics(df_anomalies)
        e_metrics = anomaly_utilities.event_metrics(df_anomalies)
        # OUTPUT RESULTS #
        print('Model type: LSTM univariate ' + str(model_type))
        print('Sensor: ' + sensor)
        anomaly_utilities.print_metrics(metrics)
        print('Event based calculations:')
        anomaly_utilities.print_metrics(e_metrics)
        print('Model report complete\n')

    # GENERATE PLOTS #
    if plots:
        plt.figure()
        anomaly_utilities.plt_results(
            raw=df['raw'],
            predictions=detections['prediction'],
            labels=df['labeled_event'],
            detections=df_anomalies['detected_event'],
            sensor=sensor
            )
        plt.show()

    LSTM_detect_univar = ModelWorkflow()
    LSTM_detect_univar.df = df
    LSTM_detect_univar.model = model
    LSTM_detect_univar.threshold = threshold
    LSTM_detect_univar.detections = detections
    LSTM_detect_univar.df_anomalies = df_anomalies
    if compare:
        LSTM_detect_univar.metrics = metrics
        LSTM_detect_univar.e_metrics = e_metrics

    return LSTM_detect_univar

Model Based Correction Functions (ARIMA_correct)

ARIMA_forecast(x, l, suppress_warnings=True)

ARIMA_forecast creates predictions of data where anomalies occur. Creates ARIMA model and outputs forecasts of specified length.

Parameters:
  • x (``) – array of values from which to predict corrections. corresponds to non-anomalous data.

  • l (``) – number of predicted data points to be forecasted/corrected.

  • suppress_warnings (``) – indicates whether warnings associated with ARIMA model development and fitting should be suppressed.

Returns:
  • y – array of length l of the corrected values as predicted by the model

Source code in PyHydroQC/ARIMA_correct.py
def ARIMA_forecast(x, l, suppress_warnings=True):
    """
    ARIMA_forecast creates predictions of data where anomalies occur. Creates ARIMA model and outputs forecasts of specified length.
    Arguments:
        x: array of values from which to predict corrections. corresponds to non-anomalous data.
        l: number of predicted data points to be forecasted/corrected.
        suppress_warnings: indicates whether warnings associated with ARIMA model development and fitting should be suppressed.
    Returns:
        y: array of length l of the corrected values as predicted by the model
    """
    model = pm.auto_arima(x, error_action='ignore', suppress_warnings=True)
    if suppress_warnings:
        warnings.filterwarnings('ignore', message='Non-stationary starting autoregressive parameters')
        warnings.filterwarnings('ignore', message='Non-invertible starting MA parameters found.')
        warnings.filterwarnings('ignore', message='ConvergenceWarning: Maximum Likelihood optimization failed to converge.')
    y = model.predict(l)
    return y

ARIMA_group(df, anomalies, group, min_group_len=20)

ARIMA_group examines detected events and performs conditional widening (marks some valid points as anomalous) to ensure that widened event is sufficient for forecasting/backcasting.

Parameters:
  • df (``) – data frame with a column for detected events.

  • anomalies (``) – string of the column name for detected events.

  • group (``) – string of column name containing an ascending index for each group of valid or anomolous data points (output of the group_bools function).

  • min_group_len (``) – the minimum group length.

Returns:
  • df – data frame with new columns: 'ARIMA_event' and 'ARIMA_group'

Source code in PyHydroQC/ARIMA_correct.py
def ARIMA_group(df, anomalies, group, min_group_len=20):
    """
    ARIMA_group examines detected events and performs conditional widening (marks some valid points as anomalous) to ensure that widened event is sufficient for forecasting/backcasting.
    Arguments:
        df: data frame with a column for detected events.
        anomalies: string of the column name for detected events.
        group: string of column name containing an ascending index for each group of valid or anomolous data points (output of the group_bools function).
        min_group_len: the minimum group length.
    Returns:
        df: data frame with new columns: 'ARIMA_event' and 'ARIMA_group'
    """
    ARIMA_group = []
    df['ARIMA_event'] = df[anomalies]
    new_gi = 0
    # for each group
    for i in range(0, (max(df[group]) + 1)):
        # determine the length of this group
        group_len = len(df.loc[df[group] == i][group])
        # if this group is not an anomaly event and is too small to support an ARIMA model
        if ((df.loc[df[group] == i][anomalies][0] == 0) and (group_len < min_group_len)):
            # this group needs to be added to previous group
            df.loc[df[group] == i, 'ARIMA_event'] = True
            if (new_gi > 0):
                new_gi -= 1
            ARIMA_group.extend(np.full([1, group_len], new_gi, dtype=int)[0])

        else:  # this group does not need to be altered
            ARIMA_group.extend(np.full([1, group_len], new_gi, dtype=int)[0])
            new_gi += 1

    if (new_gi < (max(df[group])/2)):
        print("WARNING: more than half of the anomaly events have been merged!")
    df['ARIMA_group'] = ARIMA_group

    return df

generate_corrections(df, observed, anomalies, model_limit=6, savecasts=False, suppress_warnings=True)

generate_corrections passes through data with identified anomalies and determines corrections using ARIMA models. Corrections are determined by combining both a forecast and a backcast in a weighted average that is informed by non-anamolous data before and after anomalies. Corrections are generated for anomalies by order of the shortest to longest and those corrected values from the shorter anomalies are used with non-anomalous values to generate corrections for longer anomalies.

Parameters:
  • df (``) – data frame with columns for observations and anomalies as defined by the user.

  • observed (``) – string that names the column in the data frame containing observed values.

  • anomalies (``) – string that names the column in the data frame containing booleans corresponding to anomalies where True = anomalous.

  • model_limit (``) – int for number of days used to limit the amount of data from which to generate forecasts and backcasts

  • savecasts (``) – boolean used for saving the forecast as backcast data which can be used for analysis or plotting.

  • suppress_warnings (``) – indicates whether warnings associated with ARIMA model development and fitting should be suppressed.

Returns:
  • df with additional columns – 'det_cor' - determined correction 'corrected' - boolean indicating whether the data was corrected 'forecasts' - forecasted values used in correction (only created if savecasts=True) 'backcasts' - backcasted values used in correction (only created if savecasts=True)

Source code in PyHydroQC/ARIMA_correct.py
def generate_corrections(df, observed, anomalies, model_limit=6, savecasts=False, suppress_warnings=True):
    """
    generate_corrections passes through data with identified anomalies and determines corrections using ARIMA models.
    Corrections are determined by combining both a forecast and a backcast in a weighted average that is informed by
    non-anamolous data before and after anomalies. Corrections are generated for anomalies by order of the shortest to
    longest and those corrected values from the shorter anomalies are used with non-anomalous values to generate
    corrections for longer anomalies.
    Arguments:
        df: data frame with columns for observations and anomalies as defined by the user.
        observed: string that names the column in the data frame containing observed values.
        anomalies: string that names the column in the data frame containing booleans corresponding to anomalies where True = anomalous.
        model_limit: int for number of days used to limit the amount of data from which to generate forecasts and backcasts
        savecasts: boolean used for saving the forecast as backcast data which can be used for analysis or plotting.
        suppress_warnings: indicates whether warnings associated with ARIMA model development and fitting should be suppressed.
    Returns:
        df with additional columns:
            'det_cor' - determined correction
            'corrected' - boolean indicating whether the data was corrected
            'forecasts' - forecasted values used in correction (only created if savecasts=True)
            'backcasts' - backcasted values used in correction (only created if savecasts=True)
    """

    # assign group index numbers to each set of consecutiveTrue/False data points
    df = anomaly_utilities.group_bools(df, column_in=anomalies, column_out='group')
    df = ARIMA_group(df, anomalies, 'group')

    # create new output columns
    df['det_cor'] = df[observed]
    df['corrected'] = df['ARIMA_event']
    if (savecasts):
        df['forecasts'] = np.nan
        df['backcasts'] = np.nan

    # while there are anomalous groups of points left to correct
    while len(df[df['ARIMA_event'] != 0]) > 0:
        # find an index for an anomalous ARIMA_group having the smallest number of points
        i = df[df['ARIMA_event'] != 0]['ARIMA_group'].value_counts().index.values[-1]

        # reset the conditionals
        forecasted = False
        backcasted = False
        # perform forecasting to generate corrected data points
        if (i != 0):  # if not at the beginning
            # forecast in forward direction
            # create an array of corrected data for current anomalous group
            # i-1 is the index of the previous group being used to forecast
            pre_data = df.loc[df['ARIMA_group'] == (i - 1)][observed]  # save off data for modeling
            pre_data = pre_data[pre_data.index[-1] - pd.Timedelta(days=model_limit):pre_data.index[-1]]  # limit data
            # generate the forecast data
            yfor = ARIMA_forecast(np.array(pre_data),
                                  len(df.loc[df['ARIMA_group'] == i]),
                                  suppress_warnings)

            forecasted = True
            if (savecasts):
                df.loc[df['ARIMA_group'] == i, 'forecasts'] = yfor
                df.loc[df[df['ARIMA_group'] == i - 1].index[-1], 'forecasts'] = \
                    df.loc[df[df['ARIMA_group'] == i - 1].index[-1], 'observed']

        # perform backcasting to generate corrected data points
        if (i != max(df['ARIMA_group'])): # if not at the end
            # forecast in reverse direction
            # data associated with group i+1 gets flipped for making a forecast
            post_data = df.loc[df['ARIMA_group'] == (i + 1)][observed]  # save off data for modeling
            post_data = post_data[post_data.index[0]:post_data.index[0] + pd.Timedelta(days=model_limit)]  # limit data
            # create backcast
            yrev = ARIMA_forecast(np.flip(np.array(post_data)),
                                  len(df.loc[df['ARIMA_group'] == i]),
                                  suppress_warnings)
            # output is reversed, making what was forecast into a backcast
            ybac = np.flip(yrev)
            backcasted = True
            if (savecasts):
                df.loc[df['ARIMA_group'] == i, 'backcasts'] = ybac
                df.loc[df[df['ARIMA_group'] == i + 1].index[0], 'backcasts'] = \
                    df.loc[df[df['ARIMA_group'] == i + 1].index[0], 'observed']

        # fill the det_cor column using forecasted and backcasted conditionals
        if ((not forecasted) and (not backcasted)):
            print("ERROR: all data points are anomalous!")
        elif (not forecasted):  # if there is no forecast
            # add the correction to the detected event
            df.loc[df['ARIMA_group'] == i, 'det_cor'] = ybac

            # remove the ARIMA_event
            df.loc[df['ARIMA_group'] == i, 'ARIMA_event'] = 0

            # decrement the following ARIMA_groups (to merge 0 and 1)
            df.loc[df['ARIMA_group'] > i, 'ARIMA_group'] -= 1

        elif (not backcasted):  # if there is no backcast
            # add the correction to the detected event
            df.loc[df['ARIMA_group'] == i, 'det_cor'] = yfor

            # remove the ARIMA_event
            df.loc[df['ARIMA_group'] == i, 'ARIMA_event'] = 0

            # merge the last ARIMA_group after correction
            df.loc[df['ARIMA_group'] == i, 'ARIMA_group'] = i - 1

        else:  # both a forecast and a backcast exist
            # add the correction to the detected event
            df.loc[df['ARIMA_group'] == i, 'det_cor'] = anomaly_utilities.xfade(yfor, ybac)

            # remove the ARIMA_event
            df.loc[df['ARIMA_group'] == i, 'ARIMA_event'] = 0

            # merge the ARIMA_groups after correction
            df.loc[df['ARIMA_group'] == i, 'ARIMA_group'] = i - 1
            df.loc[df['ARIMA_group'] == i + 1, 'ARIMA_group'] = i - 1

            # decrement the following ARIMA_groups
            df.loc[df['ARIMA_group'] > i, 'ARIMA_group'] -= 2

    # delete unused columns
    df = df.drop('group', 1)
    df = df.drop('ARIMA_event', 1)
    df = df.drop('ARIMA_group', 1)

    return df