Source code for bfgn.reporting.samples

import keras
import numpy as np
from typing import List

from bfgn.configuration import configs
from bfgn.data_management import scalers, sequences


[docs]class Samples(object): data_sequence = None data_sequence_label = None model = None is_model_trained = None config = None num_samples = None num_features = None num_responses = None has_features_transform = None has_responses_transform = None raw_features = None raw_features_range = None trans_features = None trans_features_range = None raw_responses = None raw_responses_range = None trans_responses = None trans_responses_range = None raw_predictions = None raw_predictions_range = None trans_predictions = None trans_predictions_range = None weights = None weights_range = None feature_band_types = None response_band_types = None def __init__( self, data_sequence: sequences.BaseSequence, model: keras.Model, config: configs.Config, is_model_trained: bool, feature_band_types: List[str], response_band_types: List[str], data_sequence_label: str = None ) -> None: self.data_sequence = data_sequence self.model = model self.is_model_trained = is_model_trained self.config = config self.data_sequence_label = data_sequence_label self.feature_band_types = feature_band_types self.response_band_types = response_band_types self._get_sampled_features_responses_and_set_metadata_and_weights() if is_model_trained: self.trans_predictions = model.predict(self.trans_features) self.raw_predictions = self.data_sequence.response_scaler.inverse_transform(self.trans_predictions) self._set_has_transforms() self._set_raw_and_transformed_ranges() def _get_sampled_features_responses_and_set_metadata_and_weights(self) -> None: (raw_features, raw_responses), (trans_features, trans_responses) = \ self.data_sequence.get_raw_and_transformed_sample(0) # We expect weights to be the last element in the responses array self.weights = trans_responses[0][..., -1] # Unpack features and responses, inverse transform to get raw values self.raw_features = raw_features[0] self.trans_features = trans_features[0] self.raw_responses = raw_responses[0][..., :-1] self.trans_responses = trans_responses[0][..., :-1] # Set sample metadata self.num_samples = self.trans_features.shape[0] self.num_features = self.trans_features.shape[-1] self.num_responses = self.trans_responses.shape[-1] def _set_has_transforms(self) -> None: if type(self.data_sequence.feature_scaler) is scalers.NullScaler: self.has_features_transform = False else: self.has_features_transform = True if type(self.data_sequence.response_scaler) is scalers.NullScaler: self.has_responses_transform = False else: self.has_responses_transform = True def _set_raw_and_transformed_ranges(self): # Note: ranges are a (num_values, 2) array, e.g., raw_features_range with 3 features is a (3, 2) array with # raw_features_range[2, 0] being the minimum of the 3rd feature and raw_features_range[2, 1] being the maximum self.raw_features_range = self._get_range(self.raw_features) tmp_features = self.trans_features.copy() tmp_features[tmp_features == self.data_sequence.nan_replacement_value] = np.nan self.trans_features_range = self._get_range(tmp_features) self.raw_responses_range = self._get_range(self.raw_responses) tmp_responses = self.trans_responses.copy() tmp_responses[tmp_responses == self.data_sequence.nan_replacement_value] = np.nan self.trans_responses_range = self._get_range(tmp_responses) if self.raw_predictions is not None and self.trans_predictions is not None: self.raw_predictions_range = self._get_range(self.raw_predictions) tmp_predictions = self.trans_predictions.copy() tmp_predictions[tmp_predictions == self.data_sequence.nan_replacement_value] = np.nan self.trans_predictions_range = self._get_range(tmp_predictions) tmp_weights = self.weights.copy() tmp_weights[tmp_weights == 0] = np.nan self.weights_range = self._get_range(tmp_weights) def _get_range(self, data: np.array) -> np.array: if data is None: return None last_dim = data.shape[-1] if len(data.shape) == 3: last_dim = 1 mins = np.nanpercentile(data.reshape((-1, last_dim)), 0, axis=0) maxs = np.nanpercentile(data.reshape((-1, last_dim)), 100, axis=0) return np.dstack([mins, maxs])[0]