Source code for hydrobot.processor

"""Processor class."""

import warnings
from datetime import datetime

import numpy as np
import pandas as pd
import ruamel.yaml
from annalist.annalist import Annalist
from annalist.decorators import ClassLogger
from hilltoppy import Hilltop

import hydrobot
from hydrobot import (
    data_acquisition,
    data_sources,
    data_structure,
    evaluator,
    filters,
    plotter,
    utils,
)

annalizer = Annalist()

EMPTY_STANDARD_DATA = pd.DataFrame(
    columns=[
        "Time",
        "Raw",
        "Value",
        "Changes",
        "Remove",
    ]
).set_index("Time")
EMPTY_CHECK_DATA = pd.DataFrame(
    columns=[
        "Time",
        "Raw",
        "Value",
        "Changes",
        "Recorder Time",
        "Comment",
        "Source",
        "QC",
    ]
).set_index("Time")
EMPTY_QUALITY_DATA = pd.DataFrame(
    columns=[
        "Time",
        "Raw",
        "Value",
        "Code",
        "Details",
    ]
).set_index("Time")


[docs] class Processor: """ Processor class for handling data processing. Attributes ---------- _defaults : dict The default settings. _site : str The site to be processed. _standard_measurement_name : str The standard measurement to be processed. _check_measurement_name : str The measurement to be checked. _base_url : str The base URL of the Hilltop server. _standard_hts_filename : str The standard Hilltop service. _check_hts_filename : str The Hilltop service to be checked. _frequency : str The frequency of the data. _from_date : str The start date of the data. _to_date : str The end date of the data. _quality_code_evaluator : QualityCodeEvaluator The quality code evaluator. _interval_dict : dict Determines how data with old checks is downgraded. _standard_data : pd.Series The standard series data. _check_data : pd.Series The series containing check data. _quality_data : pd.Series The quality series data. standard_item_name : str The name of the standard item. standard_data_source_name : str The name of the standard data source. check_item_name : str The name of the check item. check_data_source_name : str The name of the check data source. export_file_name : str Where the data is exported to. Used as default when exporting without specified """ @ClassLogger # type:ignore def __init__( self, base_url: str, site: str, standard_hts_filename: str, standard_measurement_name: str, frequency: str | None, data_family: str, from_date: str | None = None, to_date: str | None = None, check_hts_filename: str | None = None, check_measurement_name: str | None = None, defaults: dict | None = None, interval_dict: dict | None = None, constant_check_shift: float = 0, fetch_quality: bool = False, export_file_name: str | None = None, archive_base_url: str | None = None, archive_standard_hts_filename: str | None = None, archive_check_hts_filename: str | None = None, provisional_wq_filename: str | None = None, archive_standard_measurement_name: str | None = None, depth: float | None = None, infer_frequency: bool = True, **kwargs, ): """ Construct all the necessary attributes for the Processor object. Parameters ---------- base_url : str The base URL of the Hilltop server. site : str The site to be processed. standard_hts_filename : str The standard Hilltop service. standard_measurement_name : str The standard measurement to be processed. frequency : str The frequency of the data. data_family : str The type of data processing to be done from_date : str, optional The start date of the data (default is None). to_date : str, optional The end date of the data (default is None). check_hts_filename : str, optional The Hilltop service to be checked (default is None). check_measurement_name : str, optional The measurement to be checked (default is None). defaults : dict, optional The default settings (default is None). interval_dict : dict, optional Determines how data with old checks is downgraded export_file_name : string, optional Where the data is exported to. Used as default when exporting without specified filename. provisional_wq_filename : str, optional Filename for provisional WQ data to be converted to check archive_standard_measurement_name : str, optional standard_measurement_name used in the archive file used to find last processed time and for final exported data depth : numeric, optional Depth of measurement - used for lake buoys. Number in positive mm. kwargs : dict Additional keyword arguments. """ # Processing issues reporting setup self.processing_issues = pd.DataFrame( { "start_time": [], "end_time": [], "code": [], "comment": [], "series_type": [], "message_type": [], } ).astype(str) self.report_processing_issue( comment=f"Hydrobot Version: {hydrobot.__version__}", message_type="info", start_time=datetime.now().strftime("%Y-%m-%d %H:%M"), ) # replacements if check_measurement_name is None: check_measurement_name = standard_measurement_name if interval_dict is None: interval_dict = {} if archive_standard_measurement_name is None: archive_standard_measurement_name = standard_measurement_name if pd.isna(from_date): from_date = utils.find_last_time( archive_base_url, archive_standard_hts_filename, site, archive_standard_measurement_name, ) self.report_processing_issue( comment=f"from_date inferred as: {str(from_date)}", message_type="info", ) if pd.isna(to_date): to_date = pd.Timestamp.now().round("s") self.report_processing_issue( comment=f"to_date inferred as: {str(to_date)}", message_type="info", ) self.data_family = data_family # set input values self._base_url = base_url self._site = site self._standard_hts_filename = standard_hts_filename self._standard_measurement_name = standard_measurement_name self._frequency = frequency self._from_date = pd.Timestamp(from_date) self._to_date = pd.Timestamp(to_date) self._check_hts_filename = check_hts_filename self._check_measurement_name = check_measurement_name self._defaults = defaults self._interval_dict = interval_dict self.export_file_name = export_file_name self.archive_base_url = archive_base_url self.archive_standard_hts_filename = archive_standard_hts_filename self.archive_check_hts_filename = archive_check_hts_filename self.archive_standard_measurement_name = archive_standard_measurement_name self.provisional_wq_filename = provisional_wq_filename self.depth = depth self.infer_frequency = infer_frequency # Set other value initial values self._standard_data = EMPTY_STANDARD_DATA.copy() self._check_data = EMPTY_CHECK_DATA.copy() self._quality_data = EMPTY_QUALITY_DATA.copy() # standard hilltop standard_hilltop = Hilltop(base_url, standard_hts_filename) data_acquisition.enforce_site_in_hts(standard_hilltop, self.site) self.enforce_measurement_at_site(standard_measurement_name, standard_hilltop) ( self.standard_item_name, self.standard_data_source_name, ) = utils.measurement_datasource_splitter(standard_measurement_name) ( self.archive_standard_item_name, self.archive_standard_data_source_name, ) = utils.measurement_datasource_splitter(archive_standard_measurement_name) # check hilltop if check_hts_filename is not None: check_hilltop = Hilltop(base_url, check_hts_filename) data_acquisition.enforce_site_in_hts(check_hilltop, self.site) self.enforce_measurement_at_site(check_measurement_name, check_hilltop) ( self.check_item_name, self.check_data_source_name, ) = utils.measurement_datasource_splitter(check_measurement_name) self.standard_item_info = { "item_name": self.standard_item_name, "item_format": "F", "divisor": 1, "units": "", "number_format": "###.##", } self.archive_standard_item_info = { "item_name": self.archive_standard_item_name, "item_format": "F", "divisor": 1, "units": "", "number_format": "###.##", } self.check_item_info = { "item_name": self.check_item_name, "item_format": "F", "divisor": 1, "units": "", "number_format": "$$$", } self.standard_data_source_info = { "ts_type": "StdSeries", "data_type": "SimpleTimeSeries", "interpolation": "Instant", "item_format": "1", } self.check_data_source_info = { "ts_type": "CheckSeries", "data_type": "SimpleTimeSeries", "interpolation": "Discrete", "item_format": "45", } self._quality_code_evaluator = data_sources.get_qc_evaluator(self.data_family) self._quality_code_evaluator.constant_check_shift = constant_check_shift # Load data for the first time get_check = (self.check_hts_filename is not None) and self.depth is None self.import_data( from_date=self.from_date, to_date=self.to_date, check=get_check, quality=fetch_quality, )
[docs] def enforce_measurement_at_site(self, measurement_name, hilltop): """Unimplemented test that measurement is in a given hilltop.""" pass """ available_measurements = hilltop.get_measurement_list(self.site) if measurement_name not in list(available_measurements.MeasurementName): raise ValueError( f"Measurement name '{measurement_name}' not found at" f" site '{self.site}'. " "Available measurements are " f"{list(available_measurements.MeasurementName)}" ) """
[docs] @classmethod def from_processing_parameters_dict( cls, processing_parameters, fetch_quality=False ): """ Initialises a Processor class given a config file. Parameters ---------- processing_parameters : dict Dictionary of processing parameters fetch_quality : bool, optional Whether to fetch any existing quality data, default false Returns ------- Processor, Annalist """ ################################################################################### # Setting up logging with Annalist ################################################################################### ann = Annalist() ann.configure( logfile=processing_parameters.get("logfile", None), analyst_name=processing_parameters["analyst_name"], stream_format_str=processing_parameters["format"].get("stream", None), file_format_str=processing_parameters["format"].get("file", None), ) ################################################################################### # Creating a Hydrobot Processor object which contains the data to be processed ################################################################################### return cls(**processing_parameters, fetch_quality=fetch_quality), ann
[docs] @classmethod def from_config_yaml(cls, config_path, fetch_quality=False): """ Initialises a Processor class given a config file. Parameters ---------- config_path : string Path to config.yaml. fetch_quality : bool, optional Whether to fetch any existing quality data, default false Returns ------- Processor, Annalist """ cls.complete_yaml_parameters(config_path) processing_parameters = data_acquisition.config_yaml_import(config_path) processing_parameters = data_acquisition.convert_inspection_expiry( processing_parameters ) return cls.from_processing_parameters_dict(processing_parameters, fetch_quality)
@staticmethod def _keys_to_be_set_to_none_if_missing(): keys = [ "frequency", "from_date", "check_hts_filename", "check_measurement_name", "export_file_name", "archive_base_url", "archive_standard_hts_filename", "archive_check_hts_filename", ] return keys
[docs] @classmethod def complete_yaml_parameters(cls, config_path): """Ensure a yaml holds all relevant parameters, filling in missing from/to dates.""" yaml = ruamel.yaml.YAML() with open(config_path) as fp: config_string = fp.read() processing_parameters = yaml.load(config_string) # Set to_date if missing if ( "to_date" not in processing_parameters or processing_parameters["to_date"] is None ): processing_parameters["to_date"] = datetime.now().strftime("%Y-%m-%d %H:%M") def key_and_substitute(key, sub, values): """Return values[key] if that is valid, otherwise returns values[sub].""" if key in values and values[key] is not None: return values[key] else: return values[sub] # Set from_date if missing if ( "from_date" not in processing_parameters or processing_parameters["from_date"] is None ): try: processing_parameters["from_date"] = utils.find_last_time( base_url=key_and_substitute( "archive_base_url", "base_url", processing_parameters ), hts=key_and_substitute( "archive_standard_hts_filename", "standard_hts_filename", processing_parameters, ), site=processing_parameters["site"], measurement=key_and_substitute( "archive_standard_measurement_name", "standard_measurement_name", processing_parameters, ), ).strftime("%Y-%m-%d %H:%M") except ValueError as e: warnings.warn( f"Could not infer from_date from archive data: {str(e)}. " "Please check your archive settings are correct.", stacklevel=1, ) # Amend measurement names if depth if "depth" in processing_parameters: # standard measurement name processing_parameters[ "standard_measurement_name" ] = data_sources.depth_standard_measurement_name_by_data_family( processing_parameters["data_family"], processing_parameters["depth"] ) qc_evaluator_type = data_sources.DATA_FAMILY_DICT[ processing_parameters["data_family"] ]["QC_evaluator_type"] if qc_evaluator_type != "Unchecked": # check measurement name processing_parameters[ "check_measurement_name" ] = data_sources.depth_check_measurement_name_by_data_family( processing_parameters["data_family"], processing_parameters["depth"] ) with open(config_path, "w") as fp: yaml.dump(processing_parameters, fp) # Ensure these keys are not missing - raises error if it is utils.enforce_config_values_not_missing( config_path, cls._keys_to_be_set_to_none_if_missing() )
@property def standard_measurement_name(self): # type: ignore """str: The site to be processed.""" return self._standard_measurement_name @property def site(self): # type: ignore """str: The site to be processed.""" return self._site @property def from_date(self): # type: ignore """str: The start date of the data.""" return self._from_date @property def to_date(self): # type: ignore """str: The end date of the data.""" return self._to_date @property def frequency(self): # type: ignore """str: The frequency of the data.""" return self._frequency @property def base_url(self): # type: ignore """str: The base URL of the Hilltop server.""" return self._base_url @property def standard_hts_filename(self): # type: ignore """str: The standard Hilltop service.""" return self._standard_hts_filename @property def check_hts_filename(self): # type: ignore """str: The Hilltop service to be checked.""" return self._check_hts_filename @property def quality_code_evaluator(self): # type: ignore """Measurement property.""" return self._quality_code_evaluator @ClassLogger @quality_code_evaluator.setter def quality_code_evaluator(self, value): self._quality_code_evaluator = value @property def defaults(self): # type: ignore """dict: The default settings.""" return self._defaults @property # type: ignore def standard_data(self) -> pd.DataFrame: # type: ignore """pd.Series: The standard series data.""" return self._standard_data @ClassLogger # type: ignore @standard_data.setter def standard_data(self, value): self._standard_data = value @property # type: ignore def check_data(self) -> pd.DataFrame: # type: ignore """pd.Series: The series containing check data.""" return self._check_data @ClassLogger # type: ignore @check_data.setter def check_data(self, value): self._check_data = value @property # type: ignore def quality_data(self) -> pd.DataFrame: # type: ignore """pd.Series: The quality series data.""" return self._quality_data @ClassLogger # type: ignore @quality_data.setter def quality_data(self, value): self._quality_data = value
[docs] @ClassLogger def import_standard( self, standard_hts_filename: str | None = None, site: str | None = None, standard_measurement_name: str | None = None, standard_data_source_name: str | None = None, standard_item_info: dict | None = None, standard_data: pd.DataFrame | None = None, from_date: str | None = None, to_date: str | None = None, frequency: str | None = None, base_url: str | None = None, infer_frequency: bool | None = None, ): """ Import standard data. Parameters ---------- standard_hts_filename : str or None, optional The standard Hilltop service. If None, defaults to the standard HTS. site : str or None, optional The site to be processed. If None, defaults to the site on the processor object. standard_measurement_name : str or None, optional The standard measurement to be processed. If None, defaults to the standard measurement name on the processor object. standard_data_source_name : str or None, optional The name of the standard data source. If None, defaults to the standard data source name on the processor object. standard_item_info : dict or None, optional The item information for the standard data. If None, defaults to the standard item info on the processor object. standard_data : pd.DataFrame or None, optional The standard data. If None, makes an empty standard_data object from_date : str or None, optional The start date for data retrieval. If None, defaults to the earliest available data. to_date : str or None, optional The end date for data retrieval. If None, defaults to latest available data. frequency : str or None, optional The frequency of the data. If None and infer_frequency, defaults to the frequency on the processor object. If that's also None, self.infer_frequency is consulted to determine whether to infer the frequency from the data. base_url : str or None, optional URL to look for hilltop server. Will use self.base_url if None. infer_frequency : str or None, optional. Whether to look for frequency. Uses self.infer_frequency if None. If True and frequency is provided will issue a warning. Returns ------- pd.DataFrame The standard data Raises ------ ValueError - If no standard data is found within the specified date range. TypeError If the parsed Standard data is not a pandas.Series. Notes ----- This method imports Standard data from the specified server based on the provided parameters. It retrieves data using the `data_acquisition.get_data` function and updates the Standard Series in the instance. The data is parsed and formatted according to the item_info in the data source. Examples -------- >>> processor = Processor(...) # initialize processor instance >>> processor.import_standard( ... from_date='2022-01-01', to_date='2022-01-10' ... ) """ if standard_hts_filename is None: standard_hts_filename = self._standard_hts_filename if site is None: site = self._site if standard_measurement_name is None: standard_measurement_name = self._standard_measurement_name if standard_data_source_name is None: standard_data_source_name = self.standard_data_source_name if standard_item_info is None: standard_item_info = self.standard_item_info if from_date is None: from_date = self.from_date if to_date is None: to_date = self.to_date if standard_data is None: standard_data = EMPTY_STANDARD_DATA.copy() if base_url is None: base_url = self._base_url if infer_frequency is None: infer_frequency = self.infer_frequency if frequency is None and infer_frequency: frequency = self._frequency xml_tree, blob_list = data_acquisition.get_data( base_url, standard_hts_filename, site, standard_measurement_name, from_date, to_date, tstype="Standard", ) blob_found = False date_format = "Calendar" data_source_list = [] raw_standard_data = EMPTY_STANDARD_DATA.copy() raw_standard_blob = None if blob_list is None or len(blob_list) == 0: self.report_processing_issue( start_time=from_date, end_time=to_date, series_type="Standard", message_type="error", comment="No standard data found within specified date range.", code="MSD", ) else: for blob in blob_list: data_source_list += [blob.data_source.name] if ( (blob.data_source.name == standard_data_source_name) and (blob.data_source.ts_type == "StdSeries") and (blob.data.timeseries is not None) ): if blob_found: # Already found something, duplicated StdSeries raise ValueError( f"Multiple StdSeries found. Already found: {raw_standard_data}, " f"also found: {blob.data.timeseries}." ) blob_found = True raw_standard_data = blob.data.timeseries date_format = blob.data.date_format raw_standard_blob = blob standard_item_info["item_name"] = blob.data_source.item_info[ 0 ].item_name standard_item_info["item_format"] = blob.data_source.item_info[ 0 ].item_format standard_item_info["divisor"] = blob.data_source.item_info[ 0 ].divisor standard_item_info["units"] = blob.data_source.item_info[0].units standard_item_info["number_format"] = blob.data_source.item_info[ 0 ].number_format if not blob_found: raise ValueError( f"Standard Data Not Found under name " f"{standard_measurement_name}. " f"Available data sources are: {data_source_list}" ) if not isinstance(raw_standard_data, pd.DataFrame): raise TypeError( "Expecting pd.DataFrame for Standard data, " f"but got {type(raw_standard_data)} from parser." ) if not raw_standard_data.empty: if date_format == "mowsecs": raw_standard_data.index = utils.mowsecs_to_datetime_index( raw_standard_data.index ) else: raw_standard_data.index = pd.to_datetime(raw_standard_data.index) if frequency is not None: # Frequency is provided raw_standard_data = raw_standard_data.asfreq( frequency, fill_value=np.nan ) if infer_frequency: warnings.warn( f"infer_frequency is true, but frequency has been provided as {frequency}. Will not " f"attempt to find frequency from data.", stacklevel=1, ) else: if infer_frequency: # We have been asked to infer the frequency frequency = utils.infer_frequency( raw_standard_data.index, method="mode" ) raw_standard_data = raw_standard_data.asfreq( frequency, fill_value=np.nan ) self.report_processing_issue( code="IRR", comment=f"frequency inferred as {frequency}", message_type="info", ) self._frequency = frequency else: # infer_frequency is explicitly set to false and frequency is None # Assuming irregular data self.report_processing_issue( code="IRR", comment=f"No frequency provided and infer_frequency" f" is set to False. Assuming irregular data for {standard_measurement_name}.", message_type="info", ) if raw_standard_blob is not None: fmt = standard_item_info["item_format"] div = standard_item_info["divisor"] else: self.report_processing_issue( code="HXD", comment="Could not extract standard data format from data source. " "Defaulting to float format.", series_type="standard", message_type="error", ) fmt = "F" div = 1 if div is None or div == "None": div = 1 if fmt == "I": raw_standard_data.iloc[:, 0] = raw_standard_data.iloc[:, 0].astype( int ) / int(div) elif fmt == "F": raw_standard_data.iloc[:, 0] = raw_standard_data.iloc[:, 0].astype( np.float32 ) / float(div) elif fmt == "D": # Not sure if this would ever really happen, but... raw_standard_data.iloc[:, 0] = utils.mowsecs_to_datetime_index( raw_standard_data.iloc[:, 0] ) else: raise ValueError(f"Unknown Format Spec: {fmt}") standard_data["Raw"] = raw_standard_data.iloc[:, 0] standard_data["Value"] = standard_data["Raw"] return standard_data
[docs] @ClassLogger def import_quality( self, standard_hts_filename: str | None = None, site: str | None = None, standard_measurement_name: str | None = None, standard_data_source_name: str | None = None, quality_data: pd.DataFrame | None = None, from_date: str | None = None, to_date: str | None = None, base_url: str | None = None, ): """ Import quality data. Parameters ---------- standard_hts_filename : str or None, optional Where to get quality data from site : str or None, optional Which site to get data from standard_measurement_name : str or None, optional Name for measurement to get standard_data_source_name : str or None, optional Name for data source to get quality_data : pd.DataFrame or None, optional data which just gets overwritten I think? should maybe be removed from_date : str or None, optional The start date for data retrieval. If None, defaults to the earliest available data. to_date : str or None, optional The end date for data retrieval. If None, defaults to latest available data. base_url : str, optional Base of the url to use for the hilltop server request. Defaults to the Processor value. Returns ------- pd.DataFrame Raises ------ TypeError If the parsed Quality data is not a pandas.Series. Notes ----- This method imports Quality data from the specified server based on the provided parameters. It retrieves data using the `data_acquisition.get_data` function and updates the Quality Series in the instance. The data is parsed and formatted according to the item_info in the data source. Examples -------- >>> processor = Processor(...) # initialize processor instance >>> processor.import_quality( ... from_date='2022-01-01', to_date='2022-01-10', overwrite=True ... ) """ if standard_hts_filename is None: standard_hts_filename = self._standard_hts_filename if site is None: site = self.site if standard_measurement_name is None: standard_measurement_name = self._standard_measurement_name if standard_data_source_name is None: standard_data_source_name = self.standard_data_source_name if from_date is None: from_date = self.from_date if to_date is None: to_date = self.to_date if quality_data is None: quality_data = EMPTY_QUALITY_DATA.copy() if base_url is None: base_url = self._base_url xml_tree, blob_list = data_acquisition.get_data( base_url, standard_hts_filename, site, standard_measurement_name, from_date, to_date, tstype="Quality", ) blob_found = False raw_quality_data = EMPTY_QUALITY_DATA.copy() if blob_list is None or len(blob_list) == 0: self.report_processing_issue( start_time=from_date, end_time=to_date, series_type="Quality", message_type="error", comment="No quality data found within specified date range, len0", code="MQD", ) else: date_format = "Calendar" data_source_options = [] for blob in blob_list: if blob.data_source.ts_type == "StdQualSeries": data_source_options += [blob.data_source.name] if blob.data_source.name == standard_data_source_name: if blob_found: # Already found something, duplicated StdQualSeries raise ValueError( f"Multiple StdQualSeries found. Just found: {blob}, " f"all candidates are: {blob_list}." ) # Found it. Now we extract it. blob_found = True raw_quality_data = blob.data.timeseries date_format = blob.data.date_format if not blob_found: self.report_processing_issue( start_time=from_date, end_time=to_date, series_type="Quality", message_type="error", comment="No quality data found within specified date range " "and with correct standard data source name" f"Quality data {standard_data_source_name} not found in server " f"response. Available options are {data_source_options}", code="MQD", ) if not isinstance(raw_quality_data, pd.DataFrame): raise TypeError( f"Expecting pd.DataFrame for Quality data, but got " f"{type(raw_quality_data)} from parser." ) if not raw_quality_data.empty: if date_format == "mowsecs": raw_quality_data.index = utils.mowsecs_to_datetime_index( raw_quality_data.index ) else: raw_quality_data.index = pd.to_datetime(raw_quality_data.index) raw_quality_data.iloc[:, 0] = raw_quality_data.iloc[:, 0].astype( int, errors="ignore" ) quality_data["Raw"] = raw_quality_data.iloc[:, 0] quality_data["Value"] = quality_data["Raw"] return quality_data
[docs] @ClassLogger def import_check( self, check_hts_filename: str | None = None, site: str | None = None, check_measurement_name: str | None = None, check_data_source_name: str | None = None, check_item_info: dict | None = None, check_item_name: str | None = None, check_data: pd.DataFrame | None = None, from_date: str | None = None, to_date: str | None = None, base_url: str | None = None, ): """ Import Check data. Parameters ---------- check_hts_filename : str or None, optional Where to get check data from site : str or None, optional Which site to get data from check_measurement_name : str or None, optional Name for measurement to get check_data_source_name : str or None, optional Name for data source to get check_item_info : dict or None, optional ItemInfo to be used in hilltop xml check_item_name : str or None, optional ItemName to be used in hilltop xml check_data : pd.DataFrame or None, optional data which just gets overwritten I think? should maybe be removed from_date : str or None, optional The start date for data retrieval. If None, defaults to the earliest available data. to_date : str or None, optional The end date for data retrieval. If None, defaults to latest available data. base_url : str, optional Base of the url to use for the hilltop server request. Defaults to the Processor value. Returns ------- check_data: pd.DataFrame Raises ------ TypeError If the parsed Check data is not a pandas.DataFrame. Notes ----- This method imports Check data from the specified server based on the provided parameters. It retrieves data using the `data_acquisition.get_data` function. The data is parsed and formatted according to the item_info in the data source. Examples -------- >>> processor = Processor(...) # initialize processor instance >>> processor.import_check( ... from_date='2022-01-01', to_date='2022-01-10', overwrite=True ... ) """ if check_hts_filename is None: check_hts_filename = self.check_hts_filename if site is None: site = self._site if check_measurement_name is None: check_measurement_name = self._check_measurement_name if check_data_source_name is None: check_data_source_name = self.check_data_source_name if check_item_info is None: check_item_info = self.check_item_info if check_item_name is None: check_item_name = self.check_item_name if check_data is None: check_data = EMPTY_CHECK_DATA.copy() if from_date is None: from_date = self._from_date if to_date is None: to_date = self._to_date if base_url is None: base_url = self._base_url xml_tree, blob_list = data_acquisition.get_data( base_url, check_hts_filename, site, check_measurement_name, from_date, to_date, tstype="Check", ) raw_check_data = EMPTY_CHECK_DATA.copy() raw_check_blob = None blob_found = False date_format = "Calendar" if blob_list is None or len(blob_list) == 0: self.report_processing_issue( start_time=from_date, end_time=to_date, series_type="Check", message_type="error", comment="No check data found within specified date range.", code="MCD", ) else: data_source_options = [] for blob in blob_list: data_source_options += [blob.data_source.name] if ( blob.data_source.name in [check_data_source_name, self.standard_data_source_name] ) and (blob.data_source.ts_type == "CheckSeries"): if blob_found: # Already found something, duplicated CheckSeries raise ValueError( f"Multiple CheckSeries found. Just found: {blob}, " f"all candidates are: {blob_list}." ) # Found it. Now we extract it. blob_found = True date_format = blob.data.date_format # This could be a pd.Series if blob.data.timeseries is not None: raw_check_blob = blob raw_check_data = blob.data.timeseries check_item_info["item_name"] = blob.data_source.item_info[ 0 ].item_name check_item_info["item_format"] = blob.data_source.item_info[ 0 ].item_format check_item_info["divisor"] = blob.data_source.item_info[ 0 ].divisor check_item_info["units"] = blob.data_source.item_info[0].units check_item_info["number_format"] = blob.data_source.item_info[ 0 ].number_format if not blob_found: self.report_processing_issue( start_time=from_date, end_time=to_date, series_type="Check", message_type="error", comment=f"Check data {check_data_source_name} not found in server " f"response. Available options are {data_source_options}", code="MCD", ) if not isinstance(raw_check_data, pd.DataFrame): raise TypeError( f"Expecting pd.DataFrame for Check data, but got {type(raw_check_data)}" "from parser." ) if not raw_check_data.empty: if date_format == "mowsecs": raw_check_data.index = utils.mowsecs_to_datetime_index( raw_check_data.index ) else: raw_check_data.index = pd.to_datetime(raw_check_data.index) if not raw_check_data.empty and raw_check_blob is not None: # TODO: Maybe this should happen in the parser? for i, item in enumerate(raw_check_blob.data_source.item_info): fmt = item.item_format div = item.divisor col = raw_check_data.iloc[:, i] if fmt == "I": raw_check_data.iloc[:, i] = col.astype(int) / int(div) elif fmt == "F": raw_check_data.iloc[:, i] = col.astype(np.float32) / float(div) elif fmt == "D": if raw_check_data.iloc[:, i].dtype != pd.Timestamp: if date_format == "mowsecs": raw_check_data.iloc[ :, i ] = utils.mowsecs_to_datetime_index(col) else: raw_check_data.iloc[:, i] = col.astype(pd.Timestamp) elif fmt == "S": raw_check_data.iloc[:, i] = col.astype(str) if not raw_check_data.empty: check_data["Raw"] = raw_check_data[check_item_name] check_data["Value"] = check_data["Raw"] check_data["Recorder Time"] = raw_check_data["Recorder Time"] check_data["Comment"] = raw_check_data["Comment"] check_data["Source"] = "HTP" check_data["QC"] = True return check_data
[docs] def import_data( self, from_date: pd.Timestamp | str | None = None, to_date: pd.Timestamp | str | None = None, standard: bool = True, check: bool = True, quality: bool = True, ): """ Import data using the class parameter range. Parameters ---------- from_date : str or None, optional start of data to be imported, if None will use defaults to_date : str or None, optional end of data to be imported, if None will use defaults standard : bool, optional Whether to import standard data, by default True. check : bool, optional Whether to import check data, by default True. quality : bool, optional Whether to import quality data, by default False. Returns ------- None Notes ----- This method imports data for the specified date range, using the class parameters `_from_date` and `_to_date`. It updates the internal series data in the Processor instance for standard, check, and quality measurements separately. Examples -------- >>> processor = Processor(base_url="https://hilltop-server.com", site="Site1") >>> processor.import_data("2022-01-01", "2022-12-31",standard=True, check=True) False """ if standard: self._standard_data = self.import_standard( standard_hts_filename=self.standard_hts_filename, site=self.site, standard_measurement_name=self._standard_measurement_name, standard_data_source_name=self.standard_data_source_name, standard_item_info=self.standard_item_info, standard_data=self._standard_data, from_date=from_date, to_date=to_date, frequency=self._frequency, ) if quality: self._quality_data = self.import_quality( standard_hts_filename=self.standard_hts_filename, site=self._site, standard_measurement_name=self._standard_measurement_name, standard_data_source_name=self.standard_data_source_name, quality_data=self.quality_data, from_date=from_date, to_date=to_date, ) if check: self._check_data = self.import_check( check_hts_filename=self.check_hts_filename, site=self._site, check_measurement_name=self._check_measurement_name, check_data_source_name=self.check_data_source_name, check_item_info=self.check_item_info, check_item_name=self.check_item_name, check_data=self.check_data, from_date=from_date, to_date=to_date, )
[docs] @ClassLogger def add_standard(self, extra_standard): """ Incorporate extra standard data into the standard series using utils.merge_series. Parameters ---------- extra_standard extra standard data Returns ------- None, but adds data to self.standard_data """ combined = utils.merge_series(self.standard_data["Value"], extra_standard) self.standard_data["Value"] = combined
[docs] @ClassLogger def add_check(self, extra_check): """ Incorporate extra check data into the check series using utils.merge_series. Parameters ---------- extra_check extra check data Returns ------- None, but adds data to self.check_series """ combined = utils.merge_series(self.check_data["Value"], extra_check) self.check_data["Value"] = combined
[docs] @ClassLogger def add_quality(self, extra_quality): """ Incorporate extra quality data into the quality series using utils.merge_series. Parameters ---------- extra_quality extra quality data Returns ------- None, but adds data to self.quality_series """ combined = utils.merge_series(self.quality_data["Value"], extra_quality) self.quality_data["Value"] = combined
[docs] @ClassLogger def gap_closer(self, gap_limit: int | None = None): """ Close small gaps in the standard series. DEPRECATED: The use of this method is discouraged as it completely removes rows from the dataframes. The gap closing functionality has been moved to data_exporter, where gaps are handled automatically before data export. Parameters ---------- gap_limit : int or None, optional The maximum number of consecutive missing values to close, by default None. If None, the gap limit from the class defaults is used. Returns ------- None Notes ----- This method closes small gaps in the standard series by replacing consecutive missing values with interpolated or backfilled values. The gap closure is performed using the evaluator.small_gap_closer function. Examples -------- >>> processor = Processor(base_url="https://hilltop-server.com", site="Site1") >>> processor.gap_closer(gap_limit=5) >>> processor.standard_data["Value"] <updated standard series with closed gaps> """ warnings.warn( "DEPRECATED: The use of gap_closer is discouraged as it completely " "removes rows from the dataframes.", category=DeprecationWarning, stacklevel=1, ) if gap_limit is None: if "gap_limit" not in self._defaults: raise ValueError("gap_limit value required, no value found in defaults") else: gap_limit = int(self._defaults["gap_limit"]) gapless = evaluator.small_gap_closer( self._standard_data["Value"].squeeze(), gap_limit=gap_limit ) self._standard_data = self._standard_data.loc[gapless.index]
[docs] @ClassLogger def quality_encoder( self, gap_limit: int | None = None, max_qc: int | float | None = None, interval_dict: dict | None = None, ): """ Encode quality information in the quality series. Parameters ---------- gap_limit : int or None, optional The maximum number of consecutive missing values to consider as gaps, by default None. If None, the gap limit from the class defaults is used. max_qc : numeric or None, optional Maximum quality code possible at site If None, the max qc from the class defaults is used. interval_dict : dict or None, optional Dictionary that dictates when to downgrade data with old checks Takes pd.DateOffset:quality_code pairs If None, the interval_dict from the class defaults is used. Returns ------- None Notes ----- This method encodes quality information in the quality series based on the provided standard series, check series, and measurement information. It uses the evaluator.quality_encoder function to determine the quality flags for the data. Examples -------- >>> processor = Processor(base_url="https://hilltop-server.com", site="Site1") >>> processor.quality_encoder(gap_limit=5) >>> processor.quality_data["Value"] <updated quality series with encoded quality flags> """ if gap_limit is None: if "gap_limit" not in self._defaults: raise ValueError("gap_limit value required, no value found in defaults") else: gap_limit = int(self._defaults["gap_limit"]) if max_qc is None: max_qc = self._defaults["max_qc"] if "max_qc" in self._defaults else np.nan if interval_dict is None: interval_dict = self._interval_dict qc_checks = self.check_data[self.check_data["QC"]] qc_series = qc_checks["Value"] if "Value" in qc_checks else pd.Series({}) if self.check_data.empty: self.quality_data.loc[pd.Timestamp(self.from_date), "Value"] = 200 self.quality_data.loc[pd.Timestamp(self.to_date), "Value"] = 0 self.quality_data.loc[pd.Timestamp(self.from_date), "Code"] = "EMT" self.quality_data.loc[pd.Timestamp(self.to_date), "Code"] = "EMT, END" self.quality_data.loc[ pd.Timestamp(self.from_date), "Details" ] = "Empty data, start time set to qc200" self.quality_data.loc[ pd.Timestamp(self.to_date), "Details" ] = "Empty data, qc0 at end" else: chk_frame = evaluator.check_data_quality_code( self.standard_data["Value"], qc_series, self._quality_code_evaluator, ) self._apply_quality(chk_frame, replace=True) oov_frame = evaluator.bulk_downgrade_out_of_validation( self.quality_data, qc_series, interval_dict ) self._apply_quality(oov_frame) msg_frame = evaluator.missing_data_quality_code( self.standard_data["Value"], self.quality_data, gap_limit=gap_limit, ) self._apply_quality(msg_frame) lim_frame = evaluator.max_qc_limiter(self.quality_data, max_qc) self._apply_quality(lim_frame)
def _apply_quality( self, changed_data, replace=False, ): if replace: self.quality_data = changed_data else: # Step 1: Merge the dataframes using an outer join merged_df = self.quality_data.merge( changed_data, how="outer", left_index=True, right_index=True, suffixes=("_old", "_new"), ) # Step 2: Replace NaN values in df1 with corresponding values from df2 with pd.option_context("future.no_silent_downcasting", True): # This context + infer_objects protects against pandas deprecation + warning merged_df["Value"] = ( merged_df["Value_old"] .fillna(merged_df["Value_new"]) .infer_objects(copy=False) ) merged_df["Code"] = ( merged_df["Code_old"] .fillna(merged_df["Code_new"]) .infer_objects(copy=False) ) merged_df["Details"] = ( merged_df["Details_old"] .fillna(merged_df["Details_new"]) .infer_objects(copy=False) ) # Step 3: Combine the two dataframes, prioritizing non-null values from df2 self.quality_data = merged_df[["Value", "Code", "Details"]].combine_first( self.quality_data )
[docs] def clip(self, low_clip: float | None = None, high_clip: float | None = None): """ Clip data within specified low and high values. Parameters ---------- low_clip : float or None, optional The lower bound for clipping, by default None. If None, the low clip value from the class defaults is used. high_clip : float or None, optional The upper bound for clipping, by default None. If None, the high clip value from the class defaults is used. Returns ------- None Notes ----- This method clips the data in both the standard and check series within the specified low and high values. It uses the filters.clip function for the actual clipping process. Examples -------- >>> processor = Processor(base_url="https://hilltop-server.com", site="Site1") >>> processor.clip(low_clip=0, high_clip=100) >>> processor.standard_data["Value"] <clipped standard series within the specified range> >>> processor.check_data["Value"] <clipped check series within the specified range> """ if low_clip is None: low_clip = ( float(self._defaults["low_clip"]) if "low_clip" in self._defaults else np.nan ) if high_clip is None: high_clip = ( float(self._defaults["high_clip"]) if "high_clip" in self._defaults else np.nan ) clipped = filters.clip(self._standard_data["Value"], low_clip, high_clip) self._standard_data = self._apply_changes( self._standard_data, clipped, "CLP", mark_remove=True )
@staticmethod def _apply_changes( dataframe, changed_values, change_code, mark_remove=False, ): both_none_mask = pd.isna(dataframe["Value"]) & pd.isna(changed_values) # Create a mask for cases where values are different excluding both being None-like diffs_mask = (dataframe["Value"] != changed_values) & ~both_none_mask if mark_remove: dataframe.loc[diffs_mask, "Remove"] = mark_remove dataframe.loc[diffs_mask, "Changes"] = change_code dataframe["Value"] = changed_values return dataframe
[docs] @ClassLogger def remove_outliers(self, span: int | None = None, delta: float | None = None): """ Remove outliers from the data. Parameters ---------- span : int or None, optional The span parameter for smoothing, by default None. If None, the span value from the class defaults is used. delta : float or None, optional The delta parameter for identifying outliers, by default None. If None, the delta value from the class defaults is used. Returns ------- None Notes ----- This method removes outliers from the standard series using the specified span and delta values. It utilizes the filters.remove_outliers function for the actual outlier removal process. Examples -------- >>> processor = Processor(base_url="https://hilltop-server.com", site="Site1") >>> processor.remove_outliers(span=10, delta=2.0) >>> processor.standard_data["Value"] <standard series with outliers removed> """ if span is None: if "span" not in self._defaults: raise ValueError("span value required, no value found in defaults") else: span = int(self._defaults["span"]) if delta is None: if "delta" not in self._defaults: raise ValueError("delta value required, no value found in defaults") else: delta = float(self._defaults["delta"]) rm_outliers = filters.remove_outliers( self._standard_data["Value"].squeeze(), span, delta ) self._standard_data = self._apply_changes( self._standard_data, rm_outliers, "OUT", mark_remove=True )
[docs] @ClassLogger def remove_spikes( self, low_clip: float | None = None, high_clip: float | None = None, span: int | None = None, delta: float | None = None, ): """ Remove spikes from the data. Parameters ---------- low_clip : float or None, optional The lower clipping threshold, by default None. If None, the low_clip value from the class defaults is used. high_clip : float or None, optional The upper clipping threshold, by default None. If None, the high_clip value from the class defaults is used. span : int or None, optional The span parameter for smoothing, by default None. If None, the span value from the class defaults is used. delta : float or None, optional The delta parameter for identifying spikes, by default None. If None, the delta value from the class defaults is used. Returns ------- None Notes ----- This method removes spikes from the standard series using the specified parameters. It utilizes the filters.remove_spikes function for the actual spike removal process. Examples -------- >>> processor = Processor(base_url="https://hilltop-server.com", site="Site1") >>> processor.remove_spikes(low_clip=10, high_clip=20, span=5, delta=2.0) >>> processor.standard_data["Value"] <standard series with spikes removed> """ if low_clip is None: low_clip = ( float(self._defaults["low_clip"]) if "low_clip" in self._defaults else np.nan ) if high_clip is None: high_clip = ( float(self._defaults["high_clip"]) if "low_clip" in self._defaults else np.nan ) if span is None: if "span" not in self._defaults: raise ValueError("span value required, no value found in defaults") else: span = int(self._defaults["span"]) if delta is None: if "delta" not in self._defaults: raise ValueError("delta value required, no value found in defaults") else: delta = float(self._defaults["delta"]) rm_spikes = filters.remove_spikes( self._standard_data["Value"].squeeze(), span, low_clip, high_clip, delta, ) self._standard_data = self._apply_changes( self._standard_data, rm_spikes, "SPK", mark_remove=True )
[docs] @ClassLogger def remove_one_spikes( self, threshold_factor: float = 3.0, window_size: int = 5, ): """ Remove one-spikes from the data. A one-point spike is defined as a data point that deviates significantly from both its preceding and following points and the local trend. For the removal of more complex multi-spikes, use the remove_spikes() function. NOTE: This function only works when baseline data is fairly stable. If baseline data is noisy or has high variability, use one_spike_filter_mad() instead. Parameters ---------- threshold_factor: float Multiplier for the standard deviation to define the spike threshold. Default is 3.0. Increasing this value makes the spike detection less sensitive. window_size: int The size of the rolling window to compute local statistics. Default is 5. Increasing this value makes the spike detection less sensitive. Returns ------- None Notes ----- This method removes spikes from the standard series using the specified parameters. It utilizes the filters.remove_one_spikes function for the actual spike removal process. Examples -------- >>> processor = Processor(base_url="https://hilltop-server.com", site="Site1") >>> processor.remove_one_spikes(threshold_factor=3.0, window_size=5) >>> processor.standard_data["Value"] <standard series with spikes removed> """ rm_spikes = filters.remove_one_spikes( self._standard_data["Value"].squeeze(), threshold_factor=threshold_factor, window_size=window_size, ) self._standard_data = self._apply_changes( self._standard_data, rm_spikes, "OSK", mark_remove=True )
[docs] @ClassLogger def remove_one_spikes_mad( self, threshold_factor: float = 2.5, ): """ Remove one-spikes from the data using Median Absolute Deviation (MAD). A one-point spike is defined as a data point that deviates significantly from both its preceding and following points and the local trend. For the removal of more complex multi-spikes, use the remove_spikes() function. NOTE: This function is more robust to noisy or variable baseline data than remove_one_spikes(). Parameters ---------- input_data: pandas.Series The input time series data. threshold_factor: float Multiplier for the MAD to define the spike threshold. Default is 2.5. Returns ------- None Notes ----- This method removes spikes from the standard series using the specified parameters. It utilizes the filters.remove_one_spikes_mad function for the actual spike removal process. Examples -------- >>> processor = Processor(base_url="https://hilltop-server.com", site="Site1") >>> processor.remove_one_spikes_mad(threshold_factor=2.5) >>> processor.standard_data["Value"] <standard series with spikes removed> """ rm_spikes = filters.remove_one_spikes_mad( self._standard_data["Value"].squeeze(), threshold_factor=threshold_factor, ) self._standard_data = self._apply_changes( self._standard_data, rm_spikes, "OSK", mark_remove=True )
[docs] @ClassLogger def remove_flatlined_values(self, span: int = 3): """Remove repeated values in std series a la flatline_value_remover().""" rm_fln = filters.flatline_value_remover(self._standard_data["Value"], span=span) self._standard_data = self._apply_changes( self._standard_data, rm_fln, "FLN", mark_remove=True )
[docs] @ClassLogger def remove_range( self, from_date, to_date, ): """ Mark a range in standard_data for removal. Parameters ---------- from_date : str The start date of the range to delete. to_date : str The end date of the range to delete. Returns ------- None Notes ----- This method deletes a specified range of data from the selected time series types. The range is defined by the `from_date` and `to_date` parameters. Examples -------- >>> processor = Processor(base_url="https://hilltop-server.com", site="Site1") >>> processor.remove_range(from_date="2022-01-01", to_date="2022-12-31", \ tstype_standard=True) >>> processor.standard_data <standard series with specified range deleted> >>> processor.remove_range(from_date="2022-01-01", to_date="2022-12-31", \ tstype_check=True) >>> processor.check_data <check series with specified range deleted> """ rm_range = filters.remove_range( self._standard_data["Value"], from_date, to_date, insert_gaps="all", ) self.standard_data = self._apply_changes( self._standard_data, rm_range, "MAN", mark_remove=True )
[docs] @ClassLogger def delete_range( self, from_date, to_date, tstype_standard=True, tstype_check=False, tstype_quality=False, gap_limit=None, ): """ Delete a range of data from specified time series types. DEPRECATED: The use of this method is discouraged as it completely removes rows from the dataframes. User is encouraged to use 'remove_range' which marks rows for removal, but retains the timestamp to be associated with the other values in the row such as the raw value, reason for removal, etc. Parameters ---------- from_date : str The start date of the range to delete. to_date : str The end date of the range to delete. tstype_standard : bool, optional Flag to delete data from the standard series, by default True. tstype_check : bool, optional Flag to delete data from the check series, by default False. tstype_quality : bool, optional Flag to delete data from the quality series, by default False. gap_limit : int, optional How big missing data is required to insert a gap. Returns ------- None Notes ----- This method deletes a specified range of data from the selected time series types. The range is defined by the `from_date` and `to_date` parameters. Examples -------- >>> processor = Processor(base_url="https://hilltop-server.com", site="Site1") >>> processor.delete_range(from_date="2022-01-01", to_date="2022-12-31", \ tstype_standard=True) >>> processor.standard_data <standard series with specified range deleted> >>> processor.delete_range(from_date="2022-01-01", to_date="2022-12-31", \ tstype_check=True) >>> processor.check_data <check series with specified range deleted> """ warnings.warn( message="DEPRECATED: The use of delete_range is discouraged as it completely " "removes rows from the dataframes. User is encouraged to use " "'remove_range' which marks rows for removal, but retains the timestamp " "to be associated with the other values " "in the row such as the raw value, reason for removal, etc.", category=DeprecationWarning, stacklevel=1, ) if gap_limit is None: if "gap_limit" in self._defaults: gap_limit = self._defaults["gap_limit"] else: raise ValueError("gap_limit value required, no value found in defaults") if tstype_standard: self.standard_data = filters.remove_range( self._standard_data, from_date, to_date, min_gap_length=gap_limit, insert_gaps="start", ) if tstype_check: self.check_data = filters.remove_range( self._check_data, from_date, to_date, min_gap_length=gap_limit, insert_gaps="start", ) if tstype_quality: self.quality_data = filters.remove_range( self._quality_data, from_date, to_date, min_gap_length=gap_limit, insert_gaps="start", )
[docs] @ClassLogger def pad_data_with_nan_to_set_freq(self): """ Set the data to the correct frequency, filled with NaNs as appropriate. Returns ------- None Notes ----- This method adjusts the time series data to the correct frequency, filling missing values with NaNs as appropriate. It modifies the standard series in-place. Examples -------- >>> processor = Processor(base_url="https://hilltop-server.com", site="Site1") >>> processor.pad_data_with_nan_to_set_freq() >>> processor.standard_data <standard series with missing values filled with NaNs> """ self.standard_data = self._standard_data.asfreq(self._frequency)
[docs] @ClassLogger def data_exporter( self, file_location=None, ftype="xml", standard: bool = True, quality: bool = True, check: bool = True, trimmed=True, ): """ Export data to file. Parameters ---------- file_location : str | None The file path where the file will be saved. If 'ftype' is "csv" or "xml", this should be a full file path including extension. If 'ftype' is "hilltop_csv", multiple files will be created, so 'file_location' should be a prefix that will be appended with "_std_qc.csv" for the file containing the standard and quality data, and "_check.csv" for the check data file. If None, uses self.export_file_name ftype : str, optional Avalable options are "xml", "hilltop_csv", "csv", "check". standard : bool, optional Whether standard data is exported, default true check : bool, optional Whether check data is exported, default true quality : bool, optional Whether quality data is exported, default true trimmed : bool, optional If True, export trimmed data; otherwise, export the full data. Default is True. Returns ------- None Raises ------ ValueError - If ftype is not a recognised string Notes ----- This method exports data to a CSV file. Examples -------- >>> processor = Processor(base_url="https://hilltop-server.com", site="Site1") >>> processor.data_exporter("output.xml", trimmed=True) >>> # Check the generated XML file at 'output.xml' """ if file_location is None: file_location = self.export_file_name export_selections = [standard, quality, check] if trimmed: std_data = filters.trim_series( self._standard_data["Value"], self._check_data["Value"], ) else: std_data = self._standard_data match ftype: case "xml": if self.check_data.empty or self.check_data.Value.isna().all(): check = False blob_list = self.to_xml_data_structure( standard=standard, quality=quality, check=check ) data_structure.write_hilltop_xml(blob_list, file_location) case "csv": all_data = [ self._standard_data["Value"], self._quality_data["Value"], self._check_data["Value"], ] columns = ["Standard", "Quality", "Check"] for data, col in zip(all_data, columns, strict=True): data.name = col export_list = [ i for (i, v) in zip(all_data, export_selections, strict=True) if v ] data_sources.series_export_to_csv(file_location, series=export_list) case "hilltop_csv": data_sources.hilltop_export( file_location, self._site, std_data, self._check_data["Value"], self._quality_data["Value"], ) case _: raise ValueError("Invalid ftype (filetype)")
[docs] def diagnosis(self): """ Provide a diagnosis of the data. Returns ------- None Notes ----- This method analyzes the state of the data, including the standard, check, and quality series. It provides diagnostic information about the data distribution, gaps, and other relevant characteristics. Examples -------- >>> processor = Processor(base_url="https://hilltop-server.com", site="Site1") >>> processor.import_data() >>> processor.diagnosis() >>> # View diagnostic information about the data. """ evaluator.diagnose_data( self._standard_data["Value"], self._check_data["Value"], self._quality_data["Value"], self._frequency, )
[docs] def plot_raw_data(self, fig=None, **kwargs): """Implement plotting.plot_raw_data.""" fig = plotter.plot_raw_data(self.standard_data["Raw"], fig=fig, **kwargs) return fig
[docs] def plot_qc_codes(self, fig=None, **kwargs): """Implement plotting.plot_qc_codes.""" fig = plotter.plot_qc_codes( self.standard_data["Value"], self.quality_data["Value"], fig=fig, **kwargs, ) return fig
[docs] def add_qc_limit_bars(self, fig=None, **kwargs): """Implement plotting.add_qc_limit_bars.""" fig = plotter.add_qc_limit_bars( self.quality_code_evaluator.qc_500_limit, self.quality_code_evaluator.qc_600_limit, fig=fig, **kwargs, ) return fig
[docs] def plot_check_data( self, tag_list=None, check_names=None, ghosts=False, diffs=False, align_checks=False, fig=None, **kwargs, ): """Implement plotting.plot_qc_codes.""" fig = plotter.plot_check_data( self.standard_data["Value"], self.quality_data, self.quality_code_evaluator.constant_check_shift, tag_list=tag_list, check_names=check_names, ghosts=ghosts, diffs=diffs, align_checks=align_checks, fig=fig, **kwargs, ) return fig
[docs] def plot_processing_overview_chart(self, fig=None, **kwargs): """ Plot a processing overview chart. Parameters ---------- fig : plotly.graph_objects.Figure, optional The figure to plot on, by default None. kwargs : dict Additional keyword arguments to pass to the plot Returns ------- plotly.graph_objects.Figure The figure with the processing overview chart. """ tag_list = ["HTP", "INS", "SOE", "DPF"] check_names = ["Check data", "Inspections", "SOE checks", "Depth profile"] fig = plotter.plot_processing_overview_chart( self.standard_data, self.quality_data, self.check_data, self.quality_code_evaluator.constant_check_shift, self.quality_code_evaluator.qc_500_limit, self.quality_code_evaluator.qc_600_limit, tag_list=tag_list, check_names=check_names, fig=fig, **kwargs, ) return fig
[docs] def to_xml_data_structure(self, standard=True, quality=True, check=True): """ Convert Processor object data to a list of XML data structures. Returns ------- list of data_structure.DataSourceBlob List of DataSourceBlob instances representing the data in the Processor object. Notes ----- This method converts the data in the Processor object, including standard, check, and quality series, into a list of DataSourceBlob instances. Each DataSourceBlob contains information about the site, data source, and associated data. Examples -------- >>> processor = Processor(base_url="https://hilltop-server.com", site="Site1") >>> processor.import_data() >>> xml_data_list = processor.to_xml_data_structure() >>> # Convert Processor data to a list of XML data structures. """ data_blob_list = [] # If standard data is present, add it to the list of data blobs if standard: data_blob_list += [ data_structure.standard_to_xml_structure( self.archive_standard_item_info, self.archive_standard_data_source_name, self.standard_data_source_info, self.standard_data["Value"], self.site, self._defaults.get("gap_limit"), ) ] # If check data is present, add it to the list of data blobs if check: recorder_time_item_info = { "item_name": "Recorder Time", "item_format": "D", "divisor": "1", "units": "", "number_format": "###", } comment_item_info = { "item_name": "Comment", "item_format": "S", "divisor": "1", "units": "", "number_format": "###", } data_blob_list += [ data_structure.check_to_xml_structure( item_info_dicts=[ self.check_item_info, recorder_time_item_info, comment_item_info, ], check_data_source_name=self.check_data_source_name, check_data_source_info=self.check_data_source_info, check_item_info=self.check_item_info, check_data=self.check_data, site=self.site, check_data_selector=["Value", "Recorder Time", "Comment"], ) ] # If quality data is present, add it to the list of data blobs if quality: data_blob_list += [ data_structure.quality_to_xml_structure( data_source_name=self.archive_standard_data_source_name, quality_series=self.quality_data["Value"], site=self.site, ) ] return data_blob_list
[docs] def report_processing_issue( self, start_time=None, end_time=None, code=None, comment=None, series_type=None, message_type=None, ): """ Add an issue to be reported for processing usage. This method adds an issue to the processing_issues DataFrame. Parameters ---------- start_time : str | None The start time of the issue. end_time : str | None The end time of the issue. code : str | None The code of the issue. comment : str | None The comment of the issue. series_type : str | None The type of the series the issue is related to. message_type : str | None Should be one of: ["debug", "info", "warning", "error"] """ self.processing_issues = pd.concat( [ pd.DataFrame( [ [ start_time, end_time, code, comment, series_type, message_type, ] ], columns=self.processing_issues.columns, dtype=object, ), self.processing_issues, ], ignore_index=True, )
[docs] def get_measurement_dataframe(self, measurement, hts_type): """Get a dataframe of a given measurement for other processor parameters.""" if hts_type == "standard": hts = self._standard_hts_filename elif hts_type == "check": hts = self._check_hts_filename else: raise ValueError(f"Invalid hts_type {hts_type}") try: frame = data_acquisition.get_server_dataframe( self._base_url, hts, self.site, measurement, self.from_date, self.to_date, ) except KeyError: frame = pd.DataFrame( columns=[ "Time", "Raw", "Value", "Changes", "Comment", "Source", "QC", ] ) return frame
[docs] def interpolate_depth_profiles( self, depth: int | float, measurement: str, site: str | None = None, from_date: str | None | pd.Timestamp = None, to_date: str | None | pd.Timestamp = None, ): """ Looks up depth profile and find interpolates for given depth. Parameters ---------- depth : numeric what depth to interpolate to, in meters measurement : str measurement + data source name e.g. "Water Temperature (Depth Profile)" site : str | None site to use to look for depth profiles, if none will use default from_date : str | pd.Timestamp | None start of period to look for, if none will use to_date : str | pd.Timestamp | None """ if site is None: site = self.site if from_date is None: from_date = pd.Timestamp(self.from_date) if to_date is None: to_date = pd.Timestamp(self.to_date) profiles = data_acquisition.get_depth_profiles( self._base_url, "HydrobotCheckData.hts", site, measurement, from_date, to_date, ) interpolated_data = {} for sample in profiles: series = profiles[sample] lower_index = series.index[series.index <= depth].max() higher_index = series.index[series.index >= depth].min() if not pd.isna(lower_index): if pd.isna(higher_index): weighted_average = series[lower_index] elif lower_index == higher_index: weighted_average = ( series[lower_index] + series[higher_index] ) / 2.0 else: weighted_average = ( series[lower_index] * (depth - lower_index) + series[higher_index] * (higher_index - depth) ) / (higher_index - lower_index) interpolated_data[sample] = weighted_average return pd.Series(interpolated_data)