import logging
import os
from typing import List, Tuple
import gdal
import keras
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.gridspec as gridspec
import numpy as np
import subprocess
from tqdm import tqdm
from bfgn.data_management import common_io, ooc_functions
from bfgn.data_management.data_core import DataContainer
plt.switch_backend('Agg') # Needed for remote server plotting
_logger = logging.getLogger(__name__)
[docs]def apply_model_to_site(
cnn: keras.Model,
data_container: DataContainer,
feature_files: List[str],
destination_basename: str,
output_format: str = 'GTiff',
creation_options: List[str] = [],
CNN_MODE: bool = False,
exclude_feature_nodata: bool = False
) -> None:
""" Apply a trained model to a raster file.
Args:
cnn: Pre-trained keras CNN model
data_container: Holds info like scalers
feature_files: Per-site feature files to apply the model to
destination_basename: Base of the output file (will get appropriate extension)
output_format: A viable gdal output data format.
creation_options: GDAL creation options to pass for output file, e.g.: ['TILED=YES', 'COMPRESS=DEFLATE']
CNN_MODE: Should the model be applied in CNN mode?
exclude_feature_nodata: Flag to remove all pixels in features without data from applied model
:Returns
None
"""
assert CNN_MODE is False, 'CNN mode application not yet supported'
config = data_container.config
assert os.path.dirname(destination_basename), 'Output directory does not exist'
assert type(feature_files) is list, 'Feature files for given site must be provided as a list'
valid_output_formats = ['GTiff', 'ENVI']
err_str = 'Not a viable output format, options are: {}'.format(valid_output_formats)
assert output_format in valid_output_formats, err_str
_logger.debug('Open feature datasets')
feature_sets = [gdal.Open(f, gdal.GA_ReadOnly) for f in feature_files]
_logger.debug('Check file validity')
invalid_files = ''
for _f in range(len(feature_sets)):
if feature_sets[_f] is None:
invalid_files += 'Invalid file: {}\n'.format(feature_files[_f])
assert invalid_files == '', invalid_files
_logger.debug('Get common feature file interior')
[internal_ul_list], x_len, y_len = common_io.get_overlapping_extent([feature_sets])
assert x_len > 0 and y_len > 0, 'No common feature file interior'
n_classes = len(data_container.response_band_types)
# Initialize Output Dataset
driver = gdal.GetDriverByName('ENVI')
driver.Register()
temporary_outname = destination_basename
if (output_format != 'ENVI'):
temporary_outname = temporary_outname + '_temporary_ENVI_file'
_logger.debug('Creating output envi file: {}'.format(temporary_outname))
else:
_logger.debug('Creating temporary output envi file: {}'.format(temporary_outname))
outDataset = driver.Create(temporary_outname, x_len, y_len, n_classes, gdal.GDT_Float32)
out_trans = list(feature_sets[0].GetGeoTransform())
out_trans[0] = out_trans[0] + internal_ul_list[0][0]*out_trans[1]
out_trans[3] = out_trans[3] + internal_ul_list[0][0]*out_trans[5]
outDataset.SetProjection(feature_sets[0].GetProjection())
outDataset.SetGeoTransform(out_trans)
del outDataset
for _row in tqdm(range(0, y_len, config.data_build.loss_window_radius*2), ncols=80):
_row = min(_row, y_len - 2*config.data_build.window_radius)
col_dat = common_io.read_chunk_by_row(feature_sets, internal_ul_list, x_len,
config.data_build.window_radius*2, _row)
_logger.debug('Read data chunk of shape: {}'.format(col_dat.shape))
tile_dat, x_index = _convert_chunk_to_tiles(
col_dat, config.data_build.loss_window_radius, config.data_build.window_radius)
_logger.debug('Data tiled to shape: {}'.format(tile_dat.shape))
tile_dat = ooc_functions.one_hot_encode_array(data_container.feature_raw_band_types, tile_dat,
per_band_encoding=data_container.feature_per_band_encoded_values)
_logger.debug('Data one_hot_encoded. New shape: {}'.format(tile_dat.shape))
if (config.data_build.feature_mean_centering is True):
tile_dat -= np.nanmean(tile_dat, axis=(1, 2))[:, np.newaxis, np.newaxis, :]
if (data_container.feature_scaler is not None):
tile_dat = data_container.feature_scaler.transform(tile_dat)
tile_dat[np.isnan(tile_dat)] = config.data_samples.feature_nodata_encoding
pred_y = cnn.predict(tile_dat)
if (data_container.response_scaler is not None):
pred_y = data_container.response_scaler.inverse_transform(pred_y)
if (exclude_feature_nodata):
nd_set = np.all(np.isnan(tile_dat), axis=-1)
pred_y[nd_set, ...] = config.raw_files.response_nodata_value
del tile_dat
window_radius_difference = config.data_build.window_radius - config.data_build.loss_window_radius
_logger.debug('Creating output dataset, using window_radius_difference {}'.format(window_radius_difference))
if (window_radius_difference > 0):
pred_y = pred_y[:, window_radius_difference:-window_radius_difference,
window_radius_difference:-window_radius_difference, :]
output = np.zeros((config.data_build.loss_window_radius*2, x_len, pred_y.shape[-1]))
for _c in range(len(x_index)):
output[:, x_index[_c]+window_radius_difference:x_index[_c]+window_radius_difference +
config.data_build.loss_window_radius*2, :] = pred_y[_c, ...]
_logger.debug('Convert output shape from (y,x,b) to (b,y,x)')
output = np.moveaxis(output, [0, 1, 2], [1, 2, 0])
output_memmap = np.memmap(temporary_outname, mode='r+', shape=(n_classes, y_len, x_len), dtype=np.float32)
outrow = _row + window_radius_difference
output_memmap[:, outrow:outrow+config.data_build.loss_window_radius*2, :] = output
del output_memmap
common_io.convert_envi_file(temporary_outname, destination_basename, output_format, True, creation_options)
[docs]def maximum_likelihood_classification(
likelihood_file: str,
data_container: DataContainer,
destination_basename: str,
output_format: str = 'GTiff',
creation_options: List[str] = [],
) -> None:
""" Convert a n-band map of probabilities to a classified image using maximum likelihood.
Args:
likelihood_file: File with per-class likelihoods
data_container: Holds info like scalers
destination_basename: Base of the output file (will get appropriate extension)
creation_options: GDAL creation options to pass for output file, e.g.: ['TILED=YES', 'COMPRESS=DEFLATE']
:Returns
None
"""
dataset = gdal.Open(likelihood_file, gdal.GA_ReadOnly)
assert dataset is not None, 'Invalid liklihood_file'
# Initialize Output Dataset
driver = gdal.GetDriverByName('ENVI')
driver.Register()
temporary_outname = destination_basename
if (output_format != 'ENVI'):
temporary_outname = temporary_outname + '_temporary_ENVI_file'
_logger.debug('Creating output envi file: {}'.format(temporary_outname))
else:
_logger.debug('Creating temporary output envi file: {}'.format(temporary_outname))
outDataset = driver.Create(temporary_outname, dataset.RasterXSize, dataset.RasterYSize, 1, gdal.GDT_Int16)
outDataset.SetProjection(dataset.GetProjection())
outDataset.SetGeoTransform(dataset.GetGeoTransform())
del outDataset
for _row in tqdm(range(0, dataset.RasterYSize), ncols=80):
col_dat = common_io.read_chunk_by_row([dataset], [[0, 0]], dataset.RasterXSize, 1, _row)
_logger.debug('Read data chunk of shape: {}'.format(col_dat.shape))
col_dat = np.argmax(col_dat, axis=-1)
out_dat = np.zeros(col_dat.shape) + data_container.config.raw_files.response_nodata_value
for _encoded_value in data_container.response_per_band_encoded_values[0]:
out_dat[col_dat == _encoded_value] = data_container.response_per_band_encoded_values[0][int(_encoded_value)]
out_dat = np.reshape(out_dat, (out_dat.shape[0], out_dat.shape[1], 1))
_logger.debug('Convert output shape from (y,x,b) to (b,y,x)')
out_dat = np.moveaxis(out_dat, [0, 1, 2], [1, 2, 0])
output_memmap = np.memmap(temporary_outname, mode='r+',
shape=(1, dataset.RasterYSize, dataset.RasterXSize), dtype=np.int16)
output_memmap[:, _row:_row+1, :] = out_dat
del output_memmap
common_io.convert_envi_file(temporary_outname, destination_basename, output_format, True, creation_options)
def _convert_chunk_to_tiles(feature_data: np.array, loss_window_radius: int, window_radius: int) -> \
Tuple[np.array, np.array]:
""" Convert a set of rows to tiles to run through keras. Assumes only one vertical layer is possible
Args:
feature_data:
loss_window_radius:
window_radius:
Returns:
output_array: array read to be passed through keras
col_index: array of the left-coordinate of each output_array tile
"""
output_array = []
col_index = []
for _col in range(0, feature_data.shape[1], loss_window_radius*2):
col_index.append(min(_col, feature_data.shape[1]-window_radius*2))
output_array.append(feature_data[:, col_index[-1]:col_index[-1]+window_radius*2, :])
output_array = np.stack(output_array)
output_array = np.reshape(
output_array, (output_array.shape[0], output_array.shape[1], output_array.shape[2], feature_data.shape[-1]))
col_index = np.array(col_index)
return output_array, col_index