Source code for hydrobot.data_sources

"""Handling for different types of data sources."""

import numpy as np
import pandas as pd

DATA_FAMILY_DICT = {
    "dissolved_oxygen": {
        "QC_evaluator_type": "DO",
        "QC_evaluator_values": [6, 3, 0.1, 0.05],
        "depth_unit": "mm",
    },
    "water_temperature": {
        "QC_evaluator_type": "Base",
        "QC_evaluator_values": [1.2, 0.8],
        "depth_unit": "mm",
    },
    "atmospheric_pressure": {
        "QC_evaluator_type": "Base",
        "QC_evaluator_values": [5, 2.5],
    },
    "rainfall": {
        "QC_evaluator_type": "Base",
        "QC_evaluator_values": [20, 10],
    },
    "stage": {
        "QC_evaluator_type": "TwoLevel",
        "QC_evaluator_values": [10, 3, 0.5, 0.2, 2000],
    },
    "groundwater": {
        "QC_evaluator_type": "Base",
        "QC_evaluator_values": [20, 10],
    },
    "ph": {
        "QC_evaluator_type": "BaseWith200",
        "QC_evaluator_values": [0.5, 0.2, 0.8],
        "depth_unit": "mm",
    },
    "conductivity": {
        "QC_evaluator_type": "BaseWith200",
        "QC_evaluator_values": [10, 3, 15],
        "depth_unit": "mm",
    },
    "bg_algae": {
        "QC_evaluator_type": "Unchecked",
        "QC_evaluator_values": [],
        "depth_unit": "mm",
    },
    "soil_moisture": {
        "QC_evaluator_type": "Unchecked",
        "QC_evaluator_values": [],
        "depth_unit": "cm",
    },
    "soil_temperature": {
        "QC_evaluator_type": "Unchecked",
        "QC_evaluator_values": [],
        "depth_unit": "cm",
    },
    "orp": {
        "QC_evaluator_type": "Unchecked",
        "QC_evaluator_values": [],
        "depth_unit": "mm",
    },
    "unchecked": {
        "QC_evaluator_type": "Unchecked",
        "QC_evaluator_values": [],
    },
}


[docs] def depth_standard_measurement_name_by_data_family(data_family, depth): """ Return standard measurement name for the data family at depth. Many data sources have separate measurement name formats for lake sampling, so this maps the data_family/depth to the appropriate standard measurement name Parameters ---------- data_family : str data family to find standard measurement name for depth : int depth of the measurement, in mm Returns ------- str The standard measurement name """ match data_family: case "soil_moisture": return f"{str(depth)}cm VWC" case "soil_temperature": return f"{str(depth)}cm TS" case "ph": return f"pH (-{str(depth)} mm)" case "orp": return f"ORP (-{str(depth)} mm)" case "conductivity": return f"SP Conductivity (-{str(depth)} mm)" case "water_temperature": return f"Water Temperature (-{str(depth)} mm)" case "dissolved_oxygen": return f"Dissolved Oxygen Saturation (-{str(depth)} mm)" case _: raise ValueError(f"Unimplemented depth data family {data_family}. ")
[docs] def depth_check_measurement_name_by_data_family(data_family, depth): """ Return check measurement name for the data family at depth. Many data sources have separate measurement name formats for lake sampling, so this maps the data_family/depth to the appropriate check measurement name Parameters ---------- data_family : str data family to find check measurement name for depth : int depth of the measurement, in mm Returns ------- str The check measurement name """ match data_family: case "ph": return f"pH Check (-{str(depth)} mm) [pH (-{str(depth)} mm)]" case "orp": return f"ORP Check (-{str(depth)} mm) [ORP (-{str(depth)} mm)]" case "conductivity": return ( f"SP Cond Check (-{str(depth)} mm) [SP Conductivity (-{str(depth)} mm)]" ) case "water_temperature": return f"Water Temperature Check (-{str(depth)} mm) [Water Temperature (-{str(depth)} mm)]" case "dissolved_oxygen": return f"Sample Check [Dissolved Oxygen Saturation (-{str(depth)} mm)]" case _: raise ValueError( f"Unimplemented depth data family {data_family}. " f"Either remove depth as parameter or implement " )
[docs] class QualityCodeEvaluator: """Basic QualityCodeEvaluator only compares magnitude of differences.""" def __init__(self, qc_500_limit, qc_600_limit, constant_check_shift=0): """Initialize QualityCodeEvaluator. Parameters ---------- qc_500_limit : numerical Threshold between QC 400 and QC 500 qc_600_limit : numerical Threshold between QC 500 and QC 600 constant_check_shift : numerical Shifts the check data by a fixed amount """ self.qc_500_limit = qc_500_limit self.qc_600_limit = qc_600_limit self.constant_check_shift = constant_check_shift def __repr__(self): """Quality Code Evaluator representation.""" return repr( f"QualityCodeEvaluator or it's child: '{self.__class__.__name__}' " f"with attributes: {self.__dict__}" )
[docs] def find_qc(self, base_datum, check_datum): """ Find the base quality codes. Parameters ---------- base_datum : numerical Closest continuum datum point to the check check_datum : numerical The check data to verify the continuous data, shifted by any constant_check_shift Returns ------- int The Quality code """ check_datum = check_datum + self.constant_check_shift diff = np.abs(base_datum - check_datum) if diff < self.qc_600_limit: qc = 600 elif diff < self.qc_500_limit: qc = 500 else: qc = 400 return qc
[docs] class TwoLevelQualityCodeEvaluator(QualityCodeEvaluator): """QualityCodeEvaluator for standards such as water level. Fixed error up to given threshold, percentage error after that. """ def __init__( self, qc_500_limit, qc_600_limit, qc_500_percent, qc_600_percent, limit_percent_threshold, constant_check_shift=0, ): """ Initialize TwoLevelQualityCodeEvaluator. Parameters ---------- qc_500_limit : numerical Threshold between QC 400 and QC 500 for linear portion qc_600_limit : numerical Threshold between QC 500 and QC 600 for linear portion qc_500_percent : numerical Threshold between QC 400 and QC 500 for percentage portion qc_600_percent : numerical Threshold between QC 500 and QC 600 for percentage portion limit_percent_threshold Value at which the evaluator transitions between linear and percentage QC comparison constant_check_shift : numerical Shifts the check data by a fixed amount """ QualityCodeEvaluator.__init__( self, qc_500_limit, qc_600_limit, constant_check_shift ) self.qc_500_percent = qc_500_percent self.qc_600_percent = qc_600_percent self.limit_percent_threshold = limit_percent_threshold
[docs] def find_qc(self, base_datum, check_datum): """Find the base quality codes with two stages. The two stages are: a flat and percentage QC threshold. Parameters ---------- base_datum : numerical Closest continuum datum point to the check check_datum : numerical The check data to verify the continuous data, shifted by any constant_check_shift Returns ------- int The Quality code """ check_datum = check_datum + self.constant_check_shift if base_datum < self.limit_percent_threshold: # flat qc check diff = np.abs(base_datum - check_datum) if diff < self.qc_600_limit: qc = 600 elif diff < self.qc_500_limit: qc = 500 else: qc = 400 else: # percent qc check diff = np.abs(base_datum / check_datum - 1) * 100 if diff < self.qc_600_percent: qc = 600 elif diff < self.qc_500_percent: qc = 500 else: qc = 400 return qc
[docs] class With200QualityCodeEvaluator(QualityCodeEvaluator): """For standard quality code evaluators that also have QC200 data. Examples: pH and Conductivity. """ def __init__( self, qc_500_limit, qc_600_limit, qc_400_limit, constant_check_shift=0, ): """ Initialize TwoLevelQualityCodeEvaluator. Parameters ---------- qc_500_limit : numerical Threshold between QC 400 and QC 500 qc_600_limit : numerical Threshold between QC 500 and QC 600 qc_400_limit : numerical Threshold between QC 200 and QC 400 constant_check_shift : numerical Shifts the check data by a fixed amount """ QualityCodeEvaluator.__init__( self, qc_500_limit, qc_600_limit, constant_check_shift ) self.qc_400_limit = qc_400_limit
[docs] def find_qc(self, base_datum, check_datum): """ Find the base quality codes. Parameters ---------- base_datum : numerical Closest continuum datum point to the check check_datum : numerical The check data to verify the continuous data, shifted by any constant_check_shift Returns ------- int The Quality code """ check_datum = check_datum + self.constant_check_shift diff = np.abs(base_datum - check_datum) if diff < self.qc_600_limit: qc = 600 elif diff < self.qc_500_limit: qc = 500 elif diff < self.qc_400_limit: qc = 400 else: qc = 200 return qc
[docs] class UncheckedQualityCodeEvaluator(QualityCodeEvaluator): """QualityCodeEvaluator for data without checks. Returns 200 for QC. """ def __init__( self, ): """Initialize UncheckedQualityCodeEvaluator.""" QualityCodeEvaluator.__init__(self, -1, -2)
[docs] def find_qc(self, base_datum, check_datum): """ Return 200 quality code. Parameters ---------- base_datum : numerical Closest continuum datum point to the check check_datum : numerical The check data to verify the continuous data, shifted by any constant_check_shift Returns ------- int The Quality code 200 """ return 200
[docs] class DissolvedOxygenQualityCodeEvaluator(QualityCodeEvaluator): """QualityCodeEvaluator for DO NEMS. Constant error plus percentage error. """ def __init__( self, qc_500_limit, qc_600_limit, qc_500_percent, qc_600_percent, constant_check_shift=0, ): """ Initialize TwoLevelQualityCodeEvaluator. Parameters ---------- qc_500_limit : numerical Constant contribution to QC 500 limit qc_600_limit : numerical Constant contribution to QC 600 limit qc_500_percent : numerical Variable contribution to QC 500 limit qc_600_percent : numerical Variable contribution to QC 600 limit """ QualityCodeEvaluator.__init__( self, qc_500_limit, qc_600_limit, constant_check_shift ) self.qc_500_percent = qc_500_percent self.qc_600_percent = qc_600_percent
[docs] def find_qc(self, base_datum, check_datum): """Find the base quality codes for DO. Parameters ---------- base_datum : numerical Closest continuum datum point to the check check_datum : numerical The check data to verify the continuous data, shifted by any constant_check_shift Returns ------- int The Quality code """ check_datum = check_datum + self.constant_check_shift diff = np.abs(base_datum - check_datum) threshold_500 = self.qc_500_limit + self.qc_500_percent * base_datum threshold_600 = self.qc_600_limit + self.qc_600_percent * base_datum if diff < threshold_600: qc = 600 elif diff < threshold_500: qc = 500 else: qc = 400 return qc
[docs] def series_export_to_csv( file_location: str, series: list[pd.Series], ) -> None: """Export the 3 main series to csv. Parameters ---------- file_location : str Where the files are exported to series : pd.Series Pandas series to be exported Returns ------- None, but makes files """ export_df = pd.DataFrame(series).T export_df.to_csv(str(file_location))
[docs] def hilltop_export( file_location: str, site_name: str, std_series: pd.Series, check_series: pd.Series, qc_series: pd.Series, ): """ Export the 3 main series to csv files ready to import into hilltop. Parameters ---------- file_location : str Where the files are exported to site_name : str Site name std_series : pd.Series Standard series check_series : pd.Series Check series qc_series : pd.Series Quality code series Returns ------- None, but makes files """ qc_series = qc_series.reindex(std_series.index, method="ffill") std_series.name = "std" qc_series.name = "qual" export_df = std_series.to_frame().join(qc_series) export_df.to_csv(str(file_location) + "_std_qc.csv") keys = [ "Sitename", "Inspection_Date", "Inspection_Time", "External S.G.", "Recorder Time", "Internal S.G.", "Comment", ] export_check_df = pd.concat( [ pd.Series(site_name, index=check_series.index), pd.Series( [str(dt.date()) for dt in check_series.index], index=check_series.index ), pd.Series( [str(dt.time()) for dt in check_series.index], index=check_series.index ), check_series, pd.Series(check_series.index, index=check_series.index), pd.Series(-1, index=check_series.index), pd.Series("hydrobot comment", index=check_series.index), ], axis=1, keys=keys, ) export_check_df.to_csv(str(file_location) + "_check.csv")
[docs] def get_qc_evaluator(family: str): """Get QC evaluator from data family name.""" qc_string = DATA_FAMILY_DICT[family]["QC_evaluator_type"] match qc_string: case "Base": qc_evaluator = QualityCodeEvaluator case "BaseWith200": qc_evaluator = With200QualityCodeEvaluator case "TwoLevel": qc_evaluator = TwoLevelQualityCodeEvaluator case "DO": qc_evaluator = DissolvedOxygenQualityCodeEvaluator case "Unchecked": qc_evaluator = UncheckedQualityCodeEvaluator case _: raise KeyError(f"QC_evaluator: {qc_string} has not been implemented yet") qc_evaluator = qc_evaluator(*DATA_FAMILY_DICT[family]["QC_evaluator_values"]) return qc_evaluator