Source code for hydrobot.filters

"""General filtering utilities."""

import warnings

import numpy as np
import pandas as pd
from annalist.annalist import Annalist

annalizer = Annalist()


[docs] def clip(unclipped: pd.Series, low_clip: float, high_clip: float): """Clip values in a pandas Series within a specified range. Parameters ---------- unclipped : pandas.Series Input data to be clipped. high_clip : float Upper bound for clipping. Values greater than this will be set to NaN. low_clip : float Lower bound for clipping. Values less than this will be set to NaN. Returns ------- pandas.Series A Series containing the clipped values with the same index as the input Series. """ unclipped_arr = unclipped.to_numpy() # np.nan gives warning with np.errstate(invalid="ignore"): # Create a boolean condition for values that need to be clipped clip_cond = (unclipped_arr > high_clip) | (unclipped_arr < low_clip) # Use pandas' where function to clip values to NaN where the condition is # True clipped_series = unclipped.where(~clip_cond, np.nan) return clipped_series
# noinspection SpellCheckingInspection
[docs] def fbewma(input_data, span: int): """Calculate the Forward-Backward Exponentially Weighted Moving Average (FBEWMA). Parameters ---------- input_data : pandas.Series Input time series data to calculate the FBEWMA on. span : int Span parameter for exponential weighting. Returns ------- pandas.Series A Series containing the FBEWMA values with the same index as the input Series. """ # Calculate the Forward EWMA. fwd = input_data.ewm(span=span).mean() # Calculate the Backward EWMA. (x[::-1] is the reverse of x) bwd = input_data[::-1].ewm(span=span).mean() # Stack fwd and the reverse of bwd on top of each other. stacked_ewma = pd.concat([fwd, bwd[::-1]]) # Calculate the FB-EWMA by taking the mean between fwd and bwd. return stacked_ewma.groupby(level=0).mean()
[docs] def remove_outliers(input_data: pd.Series, span: int, delta: float): """Remove outliers. Remove outliers from a time series by comparing it to the Forward-Backward Exponentially Weighted Moving Average (FBEWMA). Parameters ---------- input_data : pandas.Series Input time series data. span : int Span parameter for exponential weighting used in the FBEWMA. delta : float Threshold for identifying outliers. Values greater than this threshold will be set to NaN. Returns ------- pandas.Series A Series containing the time series with outliers removed with the same index as the input Series. """ # Calculate the FBEWMA of the time series fbewma_series = fbewma(input_data, span) # Create a condition to identify outliers based on the absolute difference # between input_data and fbewma_series delta_cond = np.abs(input_data - fbewma_series) > delta # Set values to NaN where the condition is True return input_data.where(~delta_cond, np.nan)
[docs] def remove_spikes( input_data: pd.Series, span: int, low_clip: float, high_clip: float, delta: float ) -> pd.Series: """Remove spikes. Remove spikes from a time series data using a combination of clipping and interpolation. Parameters ---------- input_data : pandas.Series Input time series data. span : int Span parameter for exponential weighting used in outlier detection. low_clip : float Lower bound for clipping. Values less than this will be set to NaN. high_clip : float Upper bound for clipping. Values greater than this will be set to NaN. delta : float Threshold for identifying outliers. Values greater than this threshold will be considered spikes. Returns ------- pandas.Series A Series containing the time series with spikes removed with the same index as the input Series. """ # Clip values in the input data within the specified range clipped = clip(input_data, low_clip, high_clip) # Remove outliers using the remove_outliers function gaps_series = remove_outliers(clipped, span, delta) # Could use pandas' .interpolate() on the Series # interp_series = gaps_series.interpolate() return gaps_series
[docs] def remove_one_spikes( input_data: pd.Series, threshold_factor=3.0, window_size=5 ) -> pd.Series: """ Detect and remove single-point spikes in a time series. A one-point spike is defined as a data point that deviates significantly from both its preceding and following points and the local trend. For the removal of more complex multi-spikes, use the remove_spikes() function. NOTE: This function only works when baseline data is fairly stable. If baseline data is noisy or has high variability, use one_spike_filter_mad() instead. Parameters ---------- input_data: pandas.Series The input time series data. threshold_factor: float Multiplier for the standard deviation to define the spike threshold. Default is 3.0. window_size: int The size of the rolling window to compute local statistics. Default is 5. Returns ------- filtered_data: pandas.Series The time series with one-point spikes removed (set to NaN). """ data = input_data.to_numpy(dtype=float) n = len(input_data) filtered_data = data.copy() # Need at least 3 points to identify a spike if n < 3: return input_data # Check each point (except the first and last) for i in range(1, n - 1): prev_val = data[i - 1] curr_val = data[i] next_val = data[i + 1] # Calculate expected value (average of neighbors) expected = (prev_val + next_val) / 2 deviation = abs(curr_val - expected) # Calulate local std for context window start_idx = max(0, i - window_size // 2) end_idx = min(n, i + window_size // 2 + 1) local_window = np.concatenate([data[start_idx:i], data[i + 1 : end_idx]]) if len(local_window) > 0: local_std = np.std(local_window) local_mean = np.mean(local_window) # Spike detection criteria: # 1. Current point deviates from expected by more than threshold # 2. Neighbors are close to each other (not spikes themselves) # 3. Current point deviates significantly from local mean neighbour_diff = abs(prev_val - next_val) threshold = threshold_factor * local_std is_spike = ( deviation > threshold and neighbour_diff < threshold and abs(curr_val - local_mean) > threshold ) if is_spike: # Replace spike with NaN filtered_data[i] = np.nan return pd.Series(filtered_data, index=input_data.index)
[docs] def remove_one_spikes_mad(input_data: pd.Series, threshold_factor=2.5) -> pd.Series: """ Detect and remove single-point spikes using Median Absolute Deviation (MAD). A one-point spike is defined as a data point that deviates significantly from both its preceding and following points and the local trend. For the removal of more complex multi-spikes, use the remove_spikes() function. NOTE: This function is more robust to noisy or variable baseline data than remove_one_spikes(). ALSO NOTE: This function is... not very good. I think I need to play with desmos a bit more to get a better thresholding mechanism. Parameters ---------- input_data: pandas.Series The input time series data. threshold_factor: float Multiplier for the MAD to define the spike threshold. Default is 2.5. Returns ------- filtered_data: pandas.Series The time series with one-point spikes removed (set to NaN). """ data = input_data.to_numpy(dtype=float) n = len(data) filtered_data = data.copy() # Need at least 3 points to identify a spike if n < 3: return input_data # Check each point (except the first and last) for i in range(1, n - 1): prev_val = data[i - 1] curr_val = data[i] next_val = data[i + 1] # Calculate expected value (average of neighbors) expected = np.median([prev_val, next_val]) deviation = abs(curr_val - expected) # Local MAD calculation local_context = [prev_val, next_val] if i > 1: local_context.append(data[i - 2]) if i < n - 2: local_context.append(data[i + 2]) mad = np.median(np.abs(local_context - np.median(local_context))) threshold = threshold_factor * ( 1.4826 * mad ) # Scale MAD to approximate std dev # More conservative: neighbors must be close to each other neighbour_similarity = abs(prev_val - next_val) / ( abs(prev_val) + abs(next_val) + 1e-10 ) similarity_threshold = np.exp(-deviation / (threshold + 1e-10)) if ( deviation > max(threshold, 1e-10) and neighbour_similarity < similarity_threshold ): filtered_data[i] = np.nan return pd.Series(filtered_data, index=input_data.index)
[docs] def remove_range( input_series: pd.Series | pd.DataFrame, from_date: str | None, to_date: str | None, min_gap_length: int = 1, insert_gaps: str = "none", ): """ Remove data from series in given range. Returns the input series without data between from_date and to_date inclusive. A None to_date will remove all data since the from_date (and vice versa). A double None for to_date/from_date removes all data. Inserts gaps or not depending on insert_gaps Parameters ---------- input_series : pd.Series | pd.DataFrame The series or dataframe to have a section removed from_date : str | None Start of removed section to_date : str | None End of removed section min_gap_length : int Will insert gaps based on insert_gaps strategy if missing more data points than min_gap_length in a row. insert_gaps : str If "all" will insert np.nan at every missing point. If "start" will insert np.nan only at from_date. If "end" will insert np.nan only at to_date. If "none" will insert no np.nan values, and remove all timestamps completely. Returns ------- pd.Series The series with relevant slice removed """ input_series = input_series.copy() slice_to_remove = input_series.loc[from_date:to_date] if len(slice_to_remove) >= min_gap_length: if insert_gaps == "all": series_to_return = input_series.copy() series_to_return.loc[from_date:to_date] = np.nan else: series_to_return = input_series.drop(slice_to_remove.index) if insert_gaps == "start": start_idx = slice_to_remove.index[0] series_to_return[start_idx] = np.nan elif insert_gaps == "end": end_idx = slice_to_remove.index[-1] series_to_return[end_idx] = np.nan elif insert_gaps == "none": pass else: raise ValueError( f"Unknown value for argument {insert_gaps}. " "Choose one of 'all', 'start', 'end', 'none'." ) else: series_to_return = input_series.drop(slice_to_remove.index) return series_to_return.sort_index()
[docs] def trim_series( std_series: pd.Series, check_series: pd.Series | pd.Timestamp ) -> pd.Series: """ Remove end of std series to match check series. All data after the last entry in check_series is presumed to be unchecked, so that data is removed from the std_series If check_series is empty, returns the entire std_series Parameters ---------- std_series : pd.Series The series to be trimmed check_series : pd.Series | pd.DataFrame | pd.Timestamp Indicates the end of the usable data Returns ------- pd.Series std_series with the unchecked elements trimmed """ if isinstance(check_series, (pd.DataFrame | pd.Series)): if check_series.empty: return std_series else: last_check_date = check_series.index[-1] return std_series.loc[:last_check_date] elif isinstance(check_series, pd.Timestamp): last_check_date = check_series return std_series.loc[:last_check_date] else: warnings.warn("Invalid trim filter used, no filtering was done", stacklevel=2) return std_series
[docs] def flatline_value_remover( series: pd.Series, span: int = 3, ): """ Remove repeated (flatlined) values in a series. Examines the data to see if any values are exactly repeated over a period. Where values exactly repeat it probably indicates a broken instrument. Replaces all values after the first with NaN. Uses math.isclose() to measure float "equality" Parameters ---------- series : pd.Series Data to examine for flatlined values span : int Amount of allowed repeated values in a row before duplicates are removed Returns ------- pd.Series Data with the flatlined values replaced with np.nan """ # pandas bad day consecutive_values = ( series.groupby((series.ne(series.shift())).cumsum()).cumcount() + 1 ) working_step = consecutive_values.loc[~consecutive_values.between(2, span)] length_filter = working_step.reindex(consecutive_values.index).bfill() filtered_data = pd.Series(series[length_filter < span]) return filtered_data.reindex(series.index)