"""Tools for checking quality and finding problems in the data."""
import numpy as np
import pandas as pd
from annalist.annalist import Annalist
import hydrobot.utils as utils
from hydrobot.data_sources import QualityCodeEvaluator
annalizer = Annalist()
[docs]
def gap_finder(data: pd.Series) -> list:
"""
Find the indices and lengths of gaps (sequences of NaN values) in a pandas Series.
Parameters
----------
data : pd.Series
Input Series containing NaN values.
Returns
-------
list :
List of tuples, each containing the index of a NaN value, the length of the gap
containing it, and True for strictness.
"""
# Find the indices where NaN values start and end
idx0 = np.flatnonzero(np.r_[True, np.diff(pd.isna(data)) != 0, True])
# Calculate the length of each gap
count = np.diff(idx0)
# Mask for the gaps that contain NaN values
valid_mask = pd.isna(data.iloc[idx0[:-1]])
# Select indices of gaps that contain NaN values
out_idx = idx0[:-1][valid_mask]
# Select lengths of gaps that contain NaN values
out_count = count[valid_mask]
# Select indices of NaN values in the original Series
indices = data.iloc[out_idx].index
# Create a list of tuples containing index, gap length, and strictness
out = list(zip(indices, out_count, [True] * len(indices), strict=True))
return out
[docs]
def small_gap_closer(series: pd.Series, gap_limit: int) -> pd.Series:
"""
Remove small gaps from a series.
Gaps are defined by a sequential number of np.nan values
Small gaps are defined as gaps of length gap_length or less.
Will return series with the nan values in the short gaps removed, and the
long gaps untouched.
Parameters
----------
series : pandas.Series
Data which has gaps to be closed
gap_limit : integer
Maximum length of gaps removed, will remove all np.nan's in consecutive runs
of gap_length or less
Returns
-------
pandas.Series
Data with any short gaps removed
"""
gaps = gap_finder(series)
for gap in gaps:
if gap[1] <= gap_limit:
# Determine the range of rows to remove
mask = ~series.index.isin(
series.index[
series.index.get_loc(gap[0]) : series.index.get_loc(gap[0]) + gap[1]
]
)
# Remove the bad rows
series = pd.Series(series[mask])
return series
[docs]
def check_data_quality_code(
series: pd.Series,
check_series: pd.Series,
qc_evaluator: QualityCodeEvaluator,
gap_limit=10800,
) -> pd.DataFrame:
"""
Quality Code Check Data.
Quality codes data based on the difference between the standard series and
the check data
Parameters
----------
series : pd.Series
Data to be quality coded
check_series : pd.Series
Check data - must not be empty
qc_evaluator : data_sources.QualityCodeEvaluator
Handler for QC comparisons
gap_limit : integer (seconds)
If the nearest real data point is more than this many seconds away, return 200
Returns
-------
pd.Series
The QC values of the series, indexed by the END time of the QC period
"""
first_data_date = series.index[0]
last_data_date = series.index[-1]
if check_series.empty:
raise ValueError("No check data")
first_check_date = check_series.index[0]
last_check_date = check_series.index[-1]
if (
isinstance(first_data_date, pd.Timestamp)
and isinstance(last_data_date, pd.Timestamp)
and isinstance(first_check_date, pd.Timestamp)
and isinstance(last_check_date, pd.Timestamp)
):
# qc_series = pd.Series({first_data_date: np.nan})
qc_frame = pd.DataFrame(
columns=["Value", "Code", "Details"],
index=[first_data_date],
)
if first_check_date < first_data_date or last_check_date > last_data_date:
# Can't check something that's not there
raise KeyError(
"Error: check data out of range. "
f"First check date: {first_check_date}. "
f"First data date: {first_data_date}. "
f"Last check date: {last_check_date}. "
f"Last data date: {last_data_date}. "
)
else:
# Stuff actually working (hopefully)
for check_time, check_value in check_series.items():
if isinstance(check_time, pd.Timestamp):
adjusted_time = find_nearest_valid_time(series, check_time)
if abs((adjusted_time - check_time).total_seconds()) < gap_limit:
qc_value = qc_evaluator.find_qc(
series[adjusted_time], check_value
)
else:
qc_value = 200
qc_frame.loc[check_time, "Value"] = qc_value
qc_frame.loc[check_time, "Code"] = "CHK"
qc_frame.loc[check_time, "Details"] = (
f"Check value at {check_time} used to validate "
f"data value at {adjusted_time}."
)
else:
raise KeyError("Series indices should be pandas.Timestamp.")
qc_frame = qc_frame.shift(periods=-1)
qc_frame.loc[qc_frame.index[-1], "Value"] = 0
return qc_frame
else:
raise KeyError("Series indices should be pandas.Timestamp.")
[docs]
def missing_data_quality_code(std_series, qc_data, gap_limit):
"""
Make sure that missing data is QC100.
Returns qc_frame with QC100 values added where std_series is NaN
Parameters
----------
std_series : pd.Series
Base series which may contain NaNs
qc_data
QC series for base std_series without QC100 values
gap_limit
Maximum size of gaps which will be ignored
Returns
-------
pd.Series
The modified QC series, indexed by the start time of the QC period
"""
for gap in gap_finder(std_series):
if gap[1] > gap_limit:
end_idx = std_series.index.get_loc(gap[0]) + gap[1]
# end of gap should recover the value from previous
if end_idx < len(std_series):
prev_qc_data = qc_data[qc_data.index <= std_series.index[end_idx]]
prev_qc_data = prev_qc_data[prev_qc_data["Value"] > 100]
prev_qc_data = prev_qc_data.sort_index()
qc_data.loc[std_series.index[end_idx]] = prev_qc_data.iloc[-1]
if std_series.index[end_idx] in prev_qc_data.index:
qc_data.loc[std_series.index[end_idx], "Details"] = (
qc_data.loc[std_series.index[end_idx], "Details"]
+ f" [End of gap which started at {gap[0]}]"
)
else:
qc_data.loc[std_series.index[end_idx], "Details"] = (
f"End of gap which started at {gap[0]}. "
f"Returning to QC code first assigned at {prev_qc_data.index[-1]}"
)
qc_data = qc_data.sort_index()
# getting rid of any stray QC codes in the middle
drop_series = qc_data["Value"]
drop_series = drop_series[drop_series.index > gap[0]]
drop_series = drop_series[
drop_series.index <= std_series.index[end_idx - 1]
]
qc_data = qc_data.drop(drop_series.index)
# start of gap
if std_series.index.get_loc(gap[0]) == 0:
start_gap = std_series.index[std_series.index.get_loc(gap[0])]
else:
start_gap = std_series.index[std_series.index.get_loc(gap[0]) - 1]
qc_data.loc[start_gap, "Value"] = 100
qc_data.loc[start_gap, "Code"] = "GAP"
if end_idx >= len(std_series):
gap_end = std_series.index[-1]
else:
gap_end = std_series.index[end_idx]
qc_data.loc[
start_gap, "Details"
] = f"Missing data amounting to {(gap_end - gap[0])}"
qc_data = qc_data.sort_index()
qc_data = qc_data.sort_index()
qc_data = qc_data.loc[
(qc_data.Code != "GAP") | (qc_data.Value.shift(1) != qc_data.Value)
]
return qc_data
[docs]
def find_nearest_time(series, dt):
"""
Find the time in the series that is closest to dt.
For example for the series::
pd.Timestamp("2021-01-01 02:00"): 0.0,
pd.Timestamp("2021-01-01 02:15"): 0.0,
with dt::
pd.Timestamp("2021-01-01 02:13"): 0.0,
the result should be the closer ``pd.Timestamp("2021-01-01 02:15")`` value
Parameters
----------
series : pd.Series
The series indexed by time
dt : Datetime
Time that may or may nor exactly line up with the series
Returns
-------
Datetime
The value of dt rounded to the nearest timestamp of the series
"""
# Make sure it is in the range
first_timestamp = series.index[0]
last_timestamp = series.index[-1]
if dt < first_timestamp or dt > last_timestamp:
raise KeyError("Timestamp not within data range")
output_index = series.index.get_indexer([dt], method="nearest")
return series.index[output_index][0]
[docs]
def find_nearest_valid_time(series, dt) -> pd.Timestamp:
"""
Find the time in the series that is closest to dt, but ignoring NaN values (gaps).
Parameters
----------
series : pd.Series
The series indexed by time
dt : Datetime
Time that may or may nor exactly line up with the series
Returns
-------
Datetime
The value of dt rounded to the nearest timestamp of the series
"""
# Make sure it is in the range
first_timestamp = series.index[0]
last_timestamp = series.index[-1]
if dt < first_timestamp or dt > last_timestamp:
raise KeyError("Timestamp not within data range")
series = series.dropna()
output_index = series.index.get_indexer([dt], method="nearest")
return series.index[output_index][0]
[docs]
def base_data_qc_filter(std_series, qc_filter):
"""
Filter out data based on quality code filter.
Return only the base series data for which the next date in the qc_filter
is 'true'
Parameters
----------
std_series : pandas.Series
Data to be filtered
qc_filter : pandas.Series of booleans
Dates for which some condition is met or not
Returns
-------
pandas.Series
The filtered data
"""
base_filter = qc_filter.reindex(std_series.index, method="ffill").fillna(False)
return std_series[base_filter]
[docs]
def base_data_meets_qc(std_series, qc_series, target_qc):
"""
Find all data where QC targets are met.
Returns only the base series data for which the next date in the qc_filter is
equal to target_qc
Parameters
----------
std_series: pandas.Series
Data to be filtered
qc_series: pandas.Series
quality code data series, some of which are presumably target_qc
target_qc: int
target quality code
Returns
-------
pandas.Series
Filtered data
"""
return base_data_qc_filter(std_series, qc_series.eq(target_qc))
[docs]
def diagnose_data(std_series, check_series, qc_series, frequency):
"""
Return description of how much missing data, how much for each QC, etc.
This function feels like a mess, I'm sorry.
The good news is that it is only a diagnostic, so feel free to change the hell
out of it
Parameters
----------
std_series : pandas.Series
processed base time series data
check_series : pandas.Series
Check datatime series
qc_series : pandas.Series
QC time series
frequency : DateOffset or str
Frequency to which the data gets set to
Returns
-------
None
Prints statements that describe the state of the data
"""
# total time
first_timestamp = std_series.index[0]
last_timestamp = std_series.index[-1]
total_time = last_timestamp - first_timestamp
print(f"Time examined is {total_time} from {first_timestamp} to {last_timestamp}")
print(
f"Have check data for {check_series.index[-1] - first_timestamp} "
f"(last check {check_series.index[-1]})"
)
# periods
ave_period = pd.to_timedelta(frequency) # total_time / (len(raw_data) - 1)
gap_time = ave_period * (len(std_series) - len(std_series.dropna()) + 1)
print(f"Missing {gap_time} of data, that's {gap_time/total_time*100}%")
# QCs
split_data = splitter(std_series, qc_series)
for qc in split_data:
print(
f"Data that is QC{qc} makes up "
f"{len(split_data[qc].dropna()) / len(std_series.dropna()) * 100:.2f}% "
"of the workable data and "
f"{len(split_data[qc].dropna()) / len(std_series) * 100:.2f}% "
"of the time period"
)
[docs]
def splitter(std_series, qc_series):
"""
Split the data up by QC code.
Selects all data which meets a given QC code, pads the rest with NaN values
Does this for all current NEMs values ([0, 100, 200, 300, 400, 500, 600])
Parameters
----------
std_series : pd.Series
Time series data to be split up
qc_series : pd.Series
QC values to split the data by
Returns
-------
dict of int:pd.Series pairs
Keys are the QC values as ints, values are series of data that fits
"""
qc_list = [0, 100, 200, 300, 400, 500, 600]
return_dict = {}
for qc in qc_list:
if qc == 100:
return_dict[qc] = (
base_data_meets_qc(std_series, qc_series, qc)
.astype(np.float64)
.fillna(std_series.median())
)
else:
return_dict[qc] = base_data_meets_qc(std_series, qc_series, qc)
return_dict[qc] = return_dict[qc].reindex(std_series.index)
return return_dict
[docs]
def max_qc_limiter(qc_frame: pd.DataFrame, max_qc) -> pd.DataFrame:
"""
Enforce max_qc on a QC series.
Replaces all values with QCs above max_qc with max_qc
Parameters
----------
qc_frame : pd.DataFrame
The series to be limited.
max_qc : numerical
maximum allowed value. None imposes no limit.
Returns
-------
pd.DataFrame
qc_frame with too high QCs limited to max_qc
"""
clipped_data = qc_frame["Value"].clip(np.nan, max_qc)
diff_idxs = qc_frame[
(qc_frame["Value"] != clipped_data) & ~clipped_data.isna()
].index
if not diff_idxs.empty:
qc_frame.loc[diff_idxs, "Code"] = qc_frame.loc[diff_idxs, "Code"] + ", LIM"
qc_frame.loc[diff_idxs, "Details"] = (
qc_frame.loc[diff_idxs, "Details"]
+ f" [Site QC limit applies to a maximum of {max_qc}.]"
)
qc_frame["Value"] = clipped_data
return qc_frame
[docs]
def bulk_downgrade_out_of_validation(
qc_frame: pd.DataFrame,
check_series: pd.Series,
interval_dict: dict,
day_end_rounding: bool = True,
):
"""
Applies caps on quality codes for any data that has gaps between check data that is too large.
Utilises single_downgrade_out_of_validation multiple times for different time periods.
Parameters
----------
qc_frame : pd.DataFrame
Quality series that potentially needs downgrading
check_series : pd.Series
Check series to check for frequency of checks
interval_dict : dict
Key:Value pairs of max_interval:downgraded_qc for single_downgrade_out_of_validation
day_end_rounding : bool
Whether to round to the day end. If true, downgraded data starts at midnight
Returns
-------
pd.DataFrame
The qc_frame with any downgraded QCs added in
"""
if not qc_frame.empty and not check_series.empty:
for key in interval_dict:
qc_frame = single_downgrade_out_of_validation(
qc_frame, check_series, key, interval_dict[key], day_end_rounding
)
return qc_frame
[docs]
def single_downgrade_out_of_validation(
qc_frame: pd.DataFrame,
check_series: pd.Series,
max_interval: pd.DateOffset,
downgraded_qc: int = 200,
day_end_rounding: bool = True,
):
"""
Applies a cap on quality codes for any data that has gaps between check data that is too large.
Only applies a single cap quality code, see bulk_downgrade_out_of_validation for multiple steps.
Parameters
----------
qc_frame : pd.DataFrame
Quality series that potentially needs downgrading
check_series : pd.Series
Check series to check for frequency of checks
max_interval : pd.DateOffset
How long of a gap between checks before the data gets downgraded
downgraded_qc : int
Which code the quality data gets downgraded to
day_end_rounding : bool
Whether to round to the day end. If true, downgraded data starts at midnight
Returns
-------
pd.DataFrame
The qc_frame with any downgraded QCs added in
"""
if qc_frame.empty or check_series.empty:
raise ValueError(
"Cannot have empty qc series or check series. qc.empty = {}, check.empty = {}".format(
qc_frame.empty, check_series.empty
)
)
# When they should have their next check by
due_date = check_series.index + max_interval
due_date = due_date[:-1]
if day_end_rounding:
due_date = due_date.ceil("D")
# Whether there has been a check since then
overdue = (due_date < check_series.index[1:]) & (
qc_frame.loc[check_series.index[:-1], "Value"] > downgraded_qc
)
# Select overdue times
unvalidated = due_date[overdue]
downgraded_times = pd.DataFrame(
{
"Value": [downgraded_qc for _ in unvalidated],
"Code": ["OOV" for _ in unvalidated],
"Details": [
"Site inspection overdue. Last inspection at "
f"{check_series.index[idx]}. Data downgraded to QC{downgraded_qc} "
"until next inspection."
for idx in range(len(unvalidated))
],
},
index=unvalidated,
)
# combine and sort
if not downgraded_times.empty:
qc_frame = pd.concat([qc_frame, downgraded_times]).sort_index()
# qc_frame.loc[qc_frame.index[-1], "Value"] = 0
return qc_frame
[docs]
def cap_qc_where_std_high(std_frame, qc_frame, cap_qc, cap_threshold):
"""
Cap the quality code of data where the standard series exceeds some value.
Parameters
----------
std_frame : pd.DataFrame
qc_frame : pd.DataFrame
cap_qc : numeric
cap_threshold : numeric
Returns
-------
pd.DataFrame
the qc series to return
"""
if qc_frame.empty or std_frame.empty:
raise ValueError("qc series can't be empty for function cap_qc_where_std_high")
std_series = std_frame["Value"]
capped_data = std_series > cap_threshold
capped_qc_changes = capped_data.loc[capped_data.shift() != capped_data] # noqa
with pd.option_context("future.no_silent_downcasting", True):
potential_new_qc = (
capped_qc_changes.replace(True, cap_qc)
.replace(False, np.nan)
.infer_objects(copy=False)
)
new_qc = utils.compare_two_qc_take_min(potential_new_qc, qc_frame["Value"])
with pd.option_context("future.no_silent_downcasting", True):
qc_frame = qc_frame.reindex(new_qc.index, method="ffill").infer_objects(
copy=False
)
diff_idxs = qc_frame[qc_frame["Value"] != new_qc].index
# replacing this code with a wordier version to avoid a pandas error
# qc_frame.loc[diff_idxs, "Code"] = qc_frame.loc[diff_idxs, "Code"] + ", CAP"
code_series = qc_frame["Code"].copy()
code_series[diff_idxs] += ", CAP"
qc_frame["Code"] = code_series
# replacing this
# qc_frame.loc[diff_idxs, "Details"] = (
# qc_frame.loc[diff_idxs, "Details"]
# + f" [DO above {cap_qc} means apply a maximum qc of {cap_threshold}.]"
# )
detail_series = qc_frame["Code"].copy()
detail_series[
diff_idxs
] += f" [DO above {cap_threshold} means apply a maximum qc of {cap_qc}.]"
qc_frame["Details"] = detail_series
qc_frame["Value"] = new_qc
return qc_frame