SINGLE SITE ANOMALY DETECTION AND CORRECTION
This script performs anomaly detection and correction for multiple sensors at a single monitoring site for one year. The script imports data, performs initial anomaly detection based on rules, uses models (ARIMA and 4 flavors of LSTM) and associated thresholds to detect anomalies, aggregates for overall anomaly detection, and performs correction based on ARIMA.
Site: Logan River at Main Street
Sensors: temperature, specific conductance, pH, dissolved oxygen
Year: 2017
Created: Amber Jones, March 2021
Import Libraries and Functions
import pandas as pd
import matplotlib.pyplot as plt
import math
from PyHydroQC import anomaly_utilities
from PyHydroQC import calibration
from PyHydroQC import model_workflow
from PyHydroQC import rules_detect
from PyHydroQC import ARIMA_correct
from PyHydroQC import modeling_utilities
from PyHydroQC.model_workflow import ModelType
Retrieve data
Creates an object with a data frame specific to each sensor.
Functions called:
- get_data
site = 'MainStreet'
sensors = ['temp', 'cond', 'ph', 'do']
sensor_array = anomaly_utilities.get_data(sensors=sensors, filename='MS2017.csv', path='../LRO_data/')
for snsr in sensor_array:
print(snsr + str(sensor_array[snsr]))
Import Parameters
# Parameters may be specified in a parameters file or in this script
from PyHydroQC.parameters import site_params, LSTM_params, calib_params
for snsr in sensors:
print(snsr + str(site_params[site][snsr]))
print('LSTM' + str(LSTM_params))
print('calib' + str(calib_params))
Rules Based Anomaly Detection
Performs checks for range and persistence. Min/max range and duration are defined in the parameters. Data outside a range or persisting longer than a duration are detected as anomalous, corrected by linear interpolation. The output is a column 'observed' of intermediate results that are used for subsequent modeling.
Functions called:
- range_check
- persistence
- interpolate
range_count = dict()
persist_count = dict()
rules_metrics = dict()
for snsr in sensor_array:
sensor_array[snsr], range_count[snsr] = rules_detect.range_check(df=sensor_array[snsr],
maximum=site_params[site][snsr]['max_range'],
minimum=site_params[site][snsr]['min_range'])
sensor_array[snsr], persist_count[snsr] = rules_detect.persistence(df=sensor_array[snsr],
length=site_params[site][snsr]['persist'],
output_grp=True)
sensor_array[snsr] = rules_detect.interpolate(df=sensor_array[snsr])
print('Rules based detection complete.\n')
Calibration and Drift Correction
Detect Calibration Events
Two separate rules based approaches attempt to identify calibration events. Calibration events detected here should be reviewed, compared to field records, and organized as input to the following step: drift correction. A subset of sensors are selected (1:4) because temperature is not calibrated. Calibration Approaches:
-
Concurrent persistence: where persistence is within a certain window (e.g., after a sensor is returned to the water, it is 'stuck' and reports the same values for several time steps), the time of day within a certain window, and where these overlap for all sensors, the points are identified as an event. Hours and durations are defined in the parameters.
-
Edge detection: Edges are determined using the differences in data before and after each point. The width may be a single point (width = 1) or greater. If the difference exceeds a threshold (a parameter setting), then it is identified as an event.
Functions called:
- calib_overlap
- calib_detect
- edge detect
# Concurrent persistence
calib_sensors = sensors[1:4]
input_array = dict()
for snsr in calib_sensors:
input_array[snsr] = sensor_array[snsr]
all_calib, all_calib_dates, df_all_calib, calib_dates_overlap = calibration.calib_overlap(sensor_names=calib_sensors,
input_array=input_array,
calib_params=calib_params)
print('Possible calibration events identified by concurrent persistence.')
print(calib_dates_overlap)
# Edge detection
calib_candidates = dict()
edge_diff = dict()
threshold = dict()
# Set threshold for each variable - the level of change for a difference to be identified as a calibration event.
# This can be an iterative process.
threshold['cond'] = 60
threshold['ph'] = 0.1
threshold['do'] = 0.4
print('Possible calibration events identified by edge detection.')
for snsr in calib_sensors:
# Width is the window of time to consider in the edge detect difference.
# 1 determines the difference between each point independently.
# Higher numbers use the difference over a longer window.
calib_candidates[snsr], edge_diff[snsr] = calibration.calib_edge_detect(observed=sensor_array[snsr]['observed'],
width=1,
calib_params=calib_params,
threshold=threshold[snsr])
print(snsr)
print(calib_candidates[snsr])
Find Gap Values and Perform Linear Drift Correction
Drift correction is used when data are shifted due to calibration or cleaning of a sensor. To perform drift correction, the routine requires a start date, an end date, and a gap value to shift the data for each event. A separate function determines the gap value for each calibration. To do so requires dates of calibration determined either by the above calibration detection functions (after review) or compiled from field records. The determined gap values should be reviewed and adjusted.
Functions called:
- find_gap
- lin_drift_cor
# Subset of sensors that are calibrated
calib_sensors = sensors[1:4]
# Initialize data structures
calib_dates = dict()
gaps = dict()
shifts = dict()
tech_shifts = dict()
for snsr in calib_sensors:
# Import calibration dates
calib_dates[snsr] = pd.read_csv('../LRO_data/' + site + '_' + snsr + '_calib_dates.csv',
header=1, parse_dates=True, infer_datetime_format=True)
calib_dates[snsr]['start'] = pd.to_datetime(calib_dates[snsr]['start'])
calib_dates[snsr]['end'] = pd.to_datetime(calib_dates[snsr]['end'])
# Ensure date range of calibrations correspond to imported data
calib_dates[snsr] = calib_dates[snsr].loc[(calib_dates[snsr]['start'] > min(sensor_array[snsr].index)) &
(calib_dates[snsr]['end'] < max(sensor_array[snsr].index))]
# Initialize dataframe to store determined gap values and associated dates
gaps[snsr] = pd.DataFrame(columns=['end', 'gap'],
index=range(min(calib_dates[snsr].index), max(calib_dates[snsr].index)+1))
if len(calib_dates[snsr]) > 0:
# Initialize data structures
shifts[snsr] = []
tech_shifts[snsr] = []
# Loop through each calibration event date.
for i in range(min(calib_dates[snsr].index), max(calib_dates[snsr].index)+1):
# Apply find_gap routine, add to dataframe, add output of shifts to list.
gap, end, shifted = calibration.find_gap(observed=sensor_array[snsr]['observed'],
calib_date=calib_dates[snsr]['end'][i],
hours=2,
show_shift=True)
gaps[snsr].loc[i]['end'] = end
gaps[snsr].loc[i]['gap'] = gap
shifts[snsr].append(shifted)
# Plotting to examine gap values
l = len(calib_dates[snsr])
ncol = math.ceil(math.sqrt(l))
nrow = math.ceil(l/ncol)
hours = 6
fig, ax = plt.subplots(nrows=nrow, ncols=ncol, figsize=(nrow, ncol+1), facecolor='w')
for i, axi in enumerate(ax.flat):
if i < l:
axi.plot(sensor_array[snsr]['observed'].loc[
pd.to_datetime(calib_dates[snsr]['end'].iloc[i]) - pd.Timedelta(hours=hours):
pd.to_datetime(calib_dates[snsr]['end'].iloc[i]) + pd.Timedelta(hours=hours)
])
axi.plot(shifts[snsr][i], 'c')
print('Gap value determination complete.')
# Review gaps and make adjustments as needed before performing drift correction
# For example:
gaps['cond'].iloc[5]['gap'] = -5
# Perform Linear Drift Correction
calib_sensors = sensors[1:4]
for snsr in calib_sensors:
# Set start dates for drift correction at the previously identified calibration.
gaps[snsr]['start'] = gaps[snsr]['end'].shift(1)
gaps[snsr]['start'].iloc[0] = sensor_array[snsr].index[0]
snsr = 'cond'
if len(gaps[snsr]) > 0:
for i in range(min(gaps[snsr].index), max(gaps[snsr].index) + 1):
result, sensor_array[snsr]['observed'] = calibration.lin_drift_cor(observed=sensor_array[snsr]['observed'],
start=gaps[snsr]['start'][i],
end=gaps[snsr]['end'][i],
gap=gaps[snsr]['gap'][i],
replace=True)
print('Linear drift correction complete.')
Model Based Anomaly Detection
Generates 5 models. Each model predicts one step ahead using immediately adjacent data. Anomalies are detected by comparing the model residual (predictions - observations) to a threshold. Dynamic thresholds are based on model variability. Settings for ARIMA, LSTM, and threshold determination are in the parameters. Results from all models are aggregated so that a detection by any model results in an anomaly.
ARIMA Detection
ARIMA models use a combination of past data in a linear form to predict the next value. These models are univariate. Each ARIMA model requires the parameters p, d, q, which can be defined automatically as shown here.
Functions called:
- pdq
- ARIMA_detect:
- build_ARIMA_model
- set_dynamic_threshold
- detect_anomalies
- anomaly_events
all_pdq = dict()
for snsr in sensors:
all_pdq[snsr] = modeling_utilities.pdq(data=sensor_array[snsr]['observed'])
print(snsr + ' (p, d, q) = ' + str(all_pdq[snsr]))
site_params[site][snsr]['pdq'] = all_pdq[snsr]
ARIMA = dict()
for snsr in sensors:
ARIMA[snsr] = model_workflow.ARIMA_detect(df=sensor_array[snsr], sensor=snsr, params=site_params[site][snsr],
rules=False, plots=False, summary=False, compare=False)
print('\nARIMA detection complete.\n')
LSTM Detection
LSTM models create a neural network that uses a sequence of values to make a prediction. Settings and hyperparameters are defined in the parameters.
DATA: univariate, MODEL: vanilla
Uses a single sensor and data prior to predict the next point.
Functions called:
- LSTM_detect_univar:
- LSTM_univar:
- create_scaler
- create_sequenced_dataset
- create_training_dataset
- create_vanilla_model
- train_model
- set_dynamic_threshold
- detect_anomalies
- anomaly_events
- LSTM_univar:
LSTM_univar = dict()
for snsr in sensors:
LSTM_univar[snsr] = model_workflow.LSTM_detect_univar(df=sensor_array[snsr], sensor=snsr,
params=site_params[site][snsr], LSTM_params=LSTM_params,
model_type=ModelType.VANILLA, rules=False, plots=False,
summary=False, compare=False, model_output=False,
model_save=False)
print('\nLSTM univariate detection complete.\n')
DATA: univariate, MODEL: bidirectional
Uses a single sensor and data before and after to predict a point.
Functions called:
- LSTM_detect_univar:
- LSTM_univar_bidir:
- create_scaler
- create_bidir_sequenced_dataset
- create_bidir_training_dataset
- create_bidir_model
- train_model
- set_dynamic_threshold
- detect_anomalies
- anomaly_events
- LSTM_univar_bidir:
LSTM_univar_bidir = dict()
for snsr in sensors:
LSTM_univar_bidir[snsr] = model_workflow.LSTM_detect_univar(df=sensor_array[snsr], sensor=snsr,
params=site_params[site][snsr], LSTM_params=LSTM_params,
model_type=ModelType.BIDIRECTIONAL,rules=False,
plots=False, summary=False, compare=False,
model_output=False, model_save=False)
print('\nLSTM univariate bidirectional detection complete.\n')
DATA: multivariate, MODEL: vanilla
Uses multiple sensors as inputs and outputs and data prior to predict the next point.
Functions called:
- LSTM_detect_multivar:
- LSTM_multivar:
- create_scaler
- create_sequenced_dataset
- create_training_dataset
- create_vanilla_model
- train_model
- set_dynamic_threshold
- detect_anomalies
- anomaly_events
- LSTM_multivar:
LSTM_multivar = model_workflow.LSTM_detect_multivar(sensor_array=sensor_array, sensors=sensors,
params=site_params[site], LSTM_params=LSTM_params,
model_type=ModelType.VANILLA, rules=False, plots=False,
summary=False, compare=False, model_output=False, model_save=False)
print('\nLSTM mutivariate detection complete.\n')
DATA: multivariate, MODEL: bidirectional
Uses multiple sensors as inputs and outputs and data before and after to predict a point.
Functions called:
- LSTM_detect_multivar:
- LSTM_multivar_bidir:
- create_scaler
- create_bidir_sequenced_dataset
- create_bidir_training_dataset
- create_bidir_model
- train_model
- set_dynamic_threshold
- detect_anomalies
- anomaly_events
- LSTM_multivar_bidir:
LSTM_multivar_bidir = model_workflow.LSTM_detect_multivar(sensor_array=sensor_array, sensors=sensors,
params=site_params[site], LSTM_params=LSTM_params,
model_type=ModelType.BIDIRECTIONAL, rules=False, plots=False,
summary=False, compare=False, model_output=False,
model_save=False)
print('\nLSTM multivariate bidirectional detection complete.\n')
Aggregate Detections for All Models
Aggregates the results from all models.
Functions called:
- aggregate_results
results_all = dict()
for snsr in sensors:
models = dict()
models['ARIMA'] = ARIMA[snsr].df
models['LSTM_univar'] = LSTM_univar[snsr].df_anomalies
models['LSTM_univar_bidir'] = LSTM_univar_bidir[snsr].df_anomalies
models['LSTM_multivar'] = LSTM_multivar.all_data[snsr]
models['LSTM_multivar_bidir'] = LSTM_multivar_bidir.all_data[snsr]
results_all[snsr] = anomaly_utilities.aggregate_results(df=sensor_array[snsr],
models=models,
verbose=True,
compare=False)
Correction
Correction is performed using piecewise ARIMA- small models determined for each period of anomalous data, along with forecast and backcast, which consider data before and after a period of anomalous data and are merged together to ensure there are no discontinuities.
Functions called:
- generate_corrections
- group_bools
- ARIMA_group
- xfade
corrections = dict()
for snsr in sensors:
corrections[snsr] = ARIMA_correct.generate_corrections(df=results_all[snsr],
observed='observed',
anomalies='detected_event',
savecasts=True)
print(snsr + ' correction complete.')