"""General filtering utilities."""
import warnings
import numpy as np
import pandas as pd
from annalist.annalist import Annalist
annalizer = Annalist()
[docs]
def clip(unclipped: pd.Series, low_clip: float, high_clip: float):
"""Clip values in a pandas Series within a specified range.
Parameters
----------
unclipped : pandas.Series
Input data to be clipped.
high_clip : float
Upper bound for clipping. Values greater than this will be set to NaN.
low_clip : float
Lower bound for clipping. Values less than this will be set to NaN.
Returns
-------
pandas.Series
A Series containing the clipped values with the same index as the
input Series.
"""
unclipped_arr = unclipped.to_numpy()
# np.nan gives warning
with np.errstate(invalid="ignore"):
# Create a boolean condition for values that need to be clipped
clip_cond = (unclipped_arr > high_clip) | (unclipped_arr < low_clip)
# Use pandas' where function to clip values to NaN where the condition is
# True
clipped_series = unclipped.where(~clip_cond, np.nan)
return clipped_series
# noinspection SpellCheckingInspection
[docs]
def fbewma(input_data, span: int):
"""Calculate the Forward-Backward Exponentially Weighted Moving Average (FBEWMA).
Parameters
----------
input_data : pandas.Series
Input time series data to calculate the FBEWMA on.
span : int
Span parameter for exponential weighting.
Returns
-------
pandas.Series
A Series containing the FBEWMA values with the same index as the
input Series.
"""
# Calculate the Forward EWMA.
fwd = input_data.ewm(span=span).mean()
# Calculate the Backward EWMA. (x[::-1] is the reverse of x)
bwd = input_data[::-1].ewm(span=span).mean()
# Stack fwd and the reverse of bwd on top of each other.
stacked_ewma = pd.concat([fwd, bwd[::-1]])
# Calculate the FB-EWMA by taking the mean between fwd and bwd.
return stacked_ewma.groupby(level=0).mean()
[docs]
def remove_outliers(input_data: pd.Series, span: int, delta: float):
"""Remove outliers.
Remove outliers from a time series by comparing it to the
Forward-Backward Exponentially Weighted Moving Average (FBEWMA).
Parameters
----------
input_data : pandas.Series
Input time series data.
span : int
Span parameter for exponential weighting used in the FBEWMA.
delta : float
Threshold for identifying outliers. Values greater than this
threshold will be set to NaN.
Returns
-------
pandas.Series
A Series containing the time series with outliers removed with
the same index as the input Series.
"""
# Calculate the FBEWMA of the time series
fbewma_series = fbewma(input_data, span)
# Create a condition to identify outliers based on the absolute difference
# between input_data and fbewma_series
delta_cond = np.abs(input_data - fbewma_series) > delta
# Set values to NaN where the condition is True
return input_data.where(~delta_cond, np.nan)
[docs]
def remove_spikes(
input_data: pd.Series, span: int, low_clip: float, high_clip: float, delta: float
) -> pd.Series:
"""Remove spikes.
Remove spikes from a time series data using a combination of clipping and
interpolation.
Parameters
----------
input_data : pandas.Series
Input time series data.
span : int
Span parameter for exponential weighting used in outlier detection.
low_clip : float
Lower bound for clipping. Values less than this will be set to NaN.
high_clip : float
Upper bound for clipping. Values greater than this will be set to NaN.
delta : float
Threshold for identifying outliers. Values greater than this threshold
will be considered spikes.
Returns
-------
pandas.Series
A Series containing the time series with spikes removed with the same
index as the input Series.
"""
# Clip values in the input data within the specified range
clipped = clip(input_data, low_clip, high_clip)
# Remove outliers using the remove_outliers function
gaps_series = remove_outliers(clipped, span, delta)
# Could use pandas' .interpolate() on the Series
# interp_series = gaps_series.interpolate()
return gaps_series
[docs]
def remove_one_spikes(
input_data: pd.Series, threshold_factor=3.0, window_size=5
) -> pd.Series:
"""
Detect and remove single-point spikes in a time series.
A one-point spike is defined as a data point that deviates significantly from
both its preceding and following points and the local trend. For the removal of more
complex multi-spikes, use the remove_spikes() function.
NOTE: This function only works when baseline data is fairly stable. If baseline data
is noisy or has high variability, use one_spike_filter_mad() instead.
Parameters
----------
input_data: pandas.Series
The input time series data.
threshold_factor: float
Multiplier for the standard deviation to define the spike threshold.
Default is 3.0.
window_size: int
The size of the rolling window to compute local statistics. Default is 5.
Returns
-------
filtered_data: pandas.Series
The time series with one-point spikes removed (set to NaN).
"""
data = input_data.to_numpy(dtype=float)
n = len(input_data)
filtered_data = data.copy()
# Need at least 3 points to identify a spike
if n < 3:
return input_data
# Check each point (except the first and last)
for i in range(1, n - 1):
prev_val = data[i - 1]
curr_val = data[i]
next_val = data[i + 1]
# Calculate expected value (average of neighbors)
expected = (prev_val + next_val) / 2
deviation = abs(curr_val - expected)
# Calulate local std for context window
start_idx = max(0, i - window_size // 2)
end_idx = min(n, i + window_size // 2 + 1)
local_window = np.concatenate([data[start_idx:i], data[i + 1 : end_idx]])
if len(local_window) > 0:
local_std = np.std(local_window)
local_mean = np.mean(local_window)
# Spike detection criteria:
# 1. Current point deviates from expected by more than threshold
# 2. Neighbors are close to each other (not spikes themselves)
# 3. Current point deviates significantly from local mean
neighbour_diff = abs(prev_val - next_val)
threshold = threshold_factor * local_std
is_spike = (
deviation > threshold
and neighbour_diff < threshold
and abs(curr_val - local_mean) > threshold
)
if is_spike:
# Replace spike with NaN
filtered_data[i] = np.nan
return pd.Series(filtered_data, index=input_data.index)
[docs]
def remove_one_spikes_mad(input_data: pd.Series, threshold_factor=2.5) -> pd.Series:
"""
Detect and remove single-point spikes using Median Absolute Deviation (MAD).
A one-point spike is defined as a data point that deviates significantly from
both its preceding and following points and the local trend. For the removal of
more complex multi-spikes, use the remove_spikes() function.
NOTE: This function is more robust to noisy or variable baseline data than
remove_one_spikes().
ALSO NOTE: This function is... not very good. I think I need to play with desmos
a bit more to get a better thresholding mechanism.
Parameters
----------
input_data: pandas.Series
The input time series data.
threshold_factor: float
Multiplier for the MAD to define the spike threshold.
Default is 2.5.
Returns
-------
filtered_data: pandas.Series
The time series with one-point spikes removed (set to NaN).
"""
data = input_data.to_numpy(dtype=float)
n = len(data)
filtered_data = data.copy()
# Need at least 3 points to identify a spike
if n < 3:
return input_data
# Check each point (except the first and last)
for i in range(1, n - 1):
prev_val = data[i - 1]
curr_val = data[i]
next_val = data[i + 1]
# Calculate expected value (average of neighbors)
expected = np.median([prev_val, next_val])
deviation = abs(curr_val - expected)
# Local MAD calculation
local_context = [prev_val, next_val]
if i > 1:
local_context.append(data[i - 2])
if i < n - 2:
local_context.append(data[i + 2])
mad = np.median(np.abs(local_context - np.median(local_context)))
threshold = threshold_factor * (
1.4826 * mad
) # Scale MAD to approximate std dev
# More conservative: neighbors must be close to each other
neighbour_similarity = abs(prev_val - next_val) / (
abs(prev_val) + abs(next_val) + 1e-10
)
similarity_threshold = np.exp(-deviation / (threshold + 1e-10))
if (
deviation > max(threshold, 1e-10)
and neighbour_similarity < similarity_threshold
):
filtered_data[i] = np.nan
return pd.Series(filtered_data, index=input_data.index)
[docs]
def remove_range(
input_series: pd.Series | pd.DataFrame,
from_date: str | None,
to_date: str | None,
min_gap_length: int = 1,
insert_gaps: str = "none",
):
"""
Remove data from series in given range.
Returns the input series without data between from_date and to_date
inclusive.
A None to_date will remove all data since the from_date (and vice versa).
A double None for to_date/from_date removes all data.
Inserts gaps or not depending on insert_gaps
Parameters
----------
input_series : pd.Series | pd.DataFrame
The series or dataframe to have a section removed
from_date : str | None
Start of removed section
to_date : str | None
End of removed section
min_gap_length : int
Will insert gaps based on insert_gaps strategy if missing more data points than
min_gap_length in a row.
insert_gaps : str
If "all" will insert np.nan at every missing point.
If "start" will insert np.nan only at from_date.
If "end" will insert np.nan only at to_date.
If "none" will insert no np.nan values, and remove all timestamps completely.
Returns
-------
pd.Series
The series with relevant slice removed
"""
input_series = input_series.copy()
slice_to_remove = input_series.loc[from_date:to_date]
if len(slice_to_remove) >= min_gap_length:
if insert_gaps == "all":
series_to_return = input_series.copy()
series_to_return.loc[from_date:to_date] = np.nan
else:
series_to_return = input_series.drop(slice_to_remove.index)
if insert_gaps == "start":
start_idx = slice_to_remove.index[0]
series_to_return[start_idx] = np.nan
elif insert_gaps == "end":
end_idx = slice_to_remove.index[-1]
series_to_return[end_idx] = np.nan
elif insert_gaps == "none":
pass
else:
raise ValueError(
f"Unknown value for argument {insert_gaps}. "
"Choose one of 'all', 'start', 'end', 'none'."
)
else:
series_to_return = input_series.drop(slice_to_remove.index)
return series_to_return.sort_index()
[docs]
def trim_series(
std_series: pd.Series, check_series: pd.Series | pd.Timestamp
) -> pd.Series:
"""
Remove end of std series to match check series.
All data after the last entry in check_series is presumed to be unchecked,
so that data is removed from the std_series
If check_series is empty, returns the entire std_series
Parameters
----------
std_series : pd.Series
The series to be trimmed
check_series : pd.Series | pd.DataFrame | pd.Timestamp
Indicates the end of the usable data
Returns
-------
pd.Series
std_series with the unchecked elements trimmed
"""
if isinstance(check_series, (pd.DataFrame | pd.Series)):
if check_series.empty:
return std_series
else:
last_check_date = check_series.index[-1]
return std_series.loc[:last_check_date]
elif isinstance(check_series, pd.Timestamp):
last_check_date = check_series
return std_series.loc[:last_check_date]
else:
warnings.warn("Invalid trim filter used, no filtering was done", stacklevel=2)
return std_series
[docs]
def flatline_value_remover(
series: pd.Series,
span: int = 3,
):
"""
Remove repeated (flatlined) values in a series.
Examines the data to see if any values are exactly repeated over a period.
Where values exactly repeat it probably indicates a broken instrument.
Replaces all values after the first with NaN.
Uses math.isclose() to measure float "equality"
Parameters
----------
series : pd.Series
Data to examine for flatlined values
span : int
Amount of allowed repeated values in a row before duplicates are removed
Returns
-------
pd.Series
Data with the flatlined values replaced with np.nan
"""
# pandas bad day
consecutive_values = (
series.groupby((series.ne(series.shift())).cumsum()).cumcount() + 1
)
working_step = consecutive_values.loc[~consecutive_values.between(2, span)]
length_filter = working_step.reindex(consecutive_values.index).bfill()
filtered_data = pd.Series(series[length_filter < span])
return filtered_data.reindex(series.index)