import os
import copy
import fiona
import gdal
import logging
import numpy as np
from numpy import matlib
import rasterio.features
from typing import List, Tuple
from bfgn.configuration import configs
_logger = logging.getLogger(__name__)
[docs]def upper_left_pixel(trans, interior_x, interior_y):
x_ul = max((interior_x - trans[0])/trans[1], 0)
y_ul = max((interior_y - trans[3])/trans[5], 0)
return x_ul, y_ul
[docs]def get_overlapping_extent(dataset_list_of_lists: List[List[gdal.Dataset]]):
# Convert list of lists or list for interior convenience
dataset_list = [item for sublist in dataset_list_of_lists for item in sublist]
# Get list of all gdal geotransforms
trans_list = []
for _d in range(len(dataset_list)):
trans_list.append(dataset_list[_d].GetGeoTransform())
# Find the interior (UL) x,y coordinates in map-space
interior_x = np.nanmax([x[0] for x in trans_list])
interior_y = np.nanmin([x[3] for x in trans_list])
# calculate the UL coordinates in pixel-space
ul_list = []
for _d in range(len(dataset_list)):
ul_list.append(list(upper_left_pixel(trans_list[_d], interior_x, interior_y)))
# calculate the size of the matched interior extent
x_len = int(np.floor(np.min([dataset_list[_d].RasterXSize - ul_list[_d][0]
for _d in range(len(dataset_list))])))
y_len = int(np.floor(np.min([dataset_list[_d].RasterYSize - ul_list[_d][1]
for _d in range(len(dataset_list))])))
# separate out into list of lists for return
return_ul_list = []
idx = 0
for _l in range(len(dataset_list_of_lists)):
local_list = []
for _d in range(len(dataset_list_of_lists[_l])):
local_list.append(ul_list[idx])
idx += 1
# special case of no boundary file. Append a None if no value:
if (len(local_list) == 0):
local_list.append(None)
local_list = np.array(local_list)
return_ul_list.append(local_list)
return return_ul_list, x_len, y_len
[docs]def get_overlapping_extent_coordinates(dataset_list_of_lists: List[List[gdal.Dataset]]):
# Convert list of lists or list for interior convenience
dataset_list = [item for sublist in dataset_list_of_lists for item in sublist]
# Get list of all gdal geotransforms
trans_list = []
for _d in range(len(dataset_list)):
trans_list.append(dataset_list[_d].GetGeoTransform())
# Find the interior (UL) x,y coordinates in map-space
interior_x = np.nanmax([x[0] for x in trans_list])
interior_y = np.nanmin([x[3] for x in trans_list])
exterior_x = np.nanmin([trans_list[x][0]+dataset_list[x].RasterXSize*trans_list[x][1]
for x in range(len(trans_list))])
exterior_y = np.nanmax([trans_list[x][3]+dataset_list[x].RasterYSize*trans_list[x][5]
for x in range(len(trans_list))])
return [interior_x, interior_y], [exterior_x, exterior_y]
[docs]def get_all_interior_extent_subset_pixel_locations(gdal_datasets: List[List[gdal.Dataset]], window_radius: int, inner_window_radius: int = None, shuffle: bool = True, return_xy_size: bool = False):
if (inner_window_radius is None):
inner_window_radius = window_radius
all_upper_lefts, x_px_size, y_px_size = get_overlapping_extent(gdal_datasets)
_logger.debug('Calculate pixel-based interior offsets for data acquisition')
x_sample_locations = [x for x in range(0,
int(x_px_size - 2*window_radius)-1,
int(inner_window_radius*2))]
y_sample_locations = [y for y in range(0,
int(y_px_size - 2*window_radius)-1,
int(inner_window_radius*2))]
xy_sample_locations = np.zeros((len(x_sample_locations)*len(y_sample_locations), 2)).astype(int)
xy_sample_locations[:, 0] = np.matlib.repmat(
np.array(x_sample_locations).reshape((-1, 1)), 1, len(y_sample_locations)).flatten()
xy_sample_locations[:, 1] = np.matlib.repmat(
np.array(y_sample_locations).reshape((1, -1)), len(x_sample_locations), 1).flatten()
del x_sample_locations, y_sample_locations
if (shuffle):
xy_sample_locations = xy_sample_locations[np.random.permutation(xy_sample_locations.shape[0]), :]
if return_xy_size:
return all_upper_lefts, xy_sample_locations, [x_px_size, y_px_size]
else:
return all_upper_lefts, xy_sample_locations
[docs]def read_map_subset(datafiles: List[str], upper_lefts: List[List[int]], window_diameter: int, mask=None,
nodata_value: float = None, lower_bound: float = None, upper_bound: float = None,
reference_geotransform=None):
raster_count = 0
datasets = []
for _f in range(len(datafiles)):
if (datafiles[_f].split('.')[-1] not in configs.sections.VECTORIZED_FILENAMES):
datasets.append(gdal.Open(datafiles[_f], gdal.GA_ReadOnly))
raster_count += datasets[-1].RasterCount
else:
datasets.append(None)
raster_count += 1
local_array = np.zeros((window_diameter, window_diameter, raster_count))
if mask is None:
mask = np.zeros((window_diameter, window_diameter)).astype(bool)
idx = 0
for _file in range(len(datasets)):
if (datasets[_file] is not None):
file_set = datasets[_file]
file_upper_left = upper_lefts[_file]
file_array = np.zeros((window_diameter, window_diameter, file_set.RasterCount))
for _b in range(file_set.RasterCount):
file_array[:, :, _b] = file_set.GetRasterBand(
_b+1).ReadAsArray(int(file_upper_left[0]), int(file_upper_left[1]), window_diameter, window_diameter)
if (nodata_value is not None):
file_array[file_array == nodata_value] = np.nan
file_array[np.isfinite(file_array) is False] = np.nan
file_array[mask, :] = np.nan
with np.errstate(invalid='ignore'):
if (lower_bound is not None):
file_array[file_array < lower_bound] = np.nan
if (upper_bound is not None):
file_array[file_array > upper_bound] = np.nan
mask[np.any(np.isnan(file_array), axis=-1)] = True
else:
assert reference_geotransform is not None
file_array = rasterize_vector(datafiles[_f], reference_geotransform,
(window_diameter, window_diameter, 1))
if np.all(mask):
return None, None
local_array[..., idx:idx+file_array.shape[-1]] = file_array
idx += file_array.shape[-1]
return local_array, mask
[docs]def get_boundary_sets_from_boundary_files(config: configs.Config) -> List[gdal.Dataset]:
if not config.raw_files.boundary_files:
boundary_sets = [None] * len(config.raw_files.feature_files)
else:
boundary_sets = [noerror_open(loc_file) if loc_file is not None else None
for loc_file in config.raw_files.boundary_files]
return boundary_sets
[docs]def get_site_boundary_set(config: configs.Config, _site) -> gdal.Dataset:
if not config.raw_files.boundary_files:
boundary_set = None
else:
boundary_set = noerror_open(config.raw_files.boundary_files[_site])
return boundary_set
[docs]def get_site_boundary_vector_file(config: configs.Config, _site) -> str:
if not config.raw_files.boundary_files:
boundary_file = None
else:
boundary_file = config.raw_files.boundary_files[_site]
if (boundary_file.split('.')[-1] not in configs.sections.VECTORIZED_FILENAMES):
boundary_file = None
return boundary_file
[docs]def rasterize_vector(vector_file: str, geotransform: List[float], output_shape: Tuple) -> np.array:
""" Rasterizes an input vector directly into a numpy array.
Args:
vector_file: Input vector file to be rasterized
geotransform: A gdal style geotransform
output_shape: The shape of the output file to be generated
Returns:
mask: A rasterized 2-d numpy array
"""
ds = fiona.open(vector_file, 'r')
trans = copy.deepcopy(geotransform)
trans = [trans[1], trans[2], trans[0],
trans[4], trans[5], trans[3]]
mask = np.zeros(output_shape)
for n in range(0, len(ds)):
rasterio.features.rasterize([ds[n]['geometry']], transform=trans, default_value=1, out=mask)
ds.close()
return mask
[docs]def read_mask_chunk(
_site: int,
upper_left: List[int],
window_diameter: int,
reference_geotransform: List[float],
config: configs.Config
) -> np.array:
mask = None
boundary_set = get_site_boundary_set(config, _site)
if (boundary_set is not None):
mask = boundary_set.ReadAsArray(int(upper_left[0]), int(upper_left[1]), window_diameter, window_diameter)
else:
boundary_vector_file = get_site_boundary_vector_file(config, _site)
if (boundary_vector_file is not None):
mask = rasterize_vector(boundary_vector_file, reference_geotransform,
(window_diameter, window_diameter))
if (mask is None):
mask = np.zeros((window_diameter, window_diameter)).astype(bool)
else:
mask = mask == config.raw_files.boundary_bad_value
return mask
[docs]def noerror_open(filename: str, file_handle=gdal.GA_ReadOnly) -> gdal.Dataset:
gdal.PushErrorHandler('CPLQuietErrorHandler')
dataset = gdal.Open(filename, file_handle)
gdal.PopErrorHandler()
return dataset
[docs]def convert_envi_file(original_file: str, destination_basename: str, output_format: str, cleanup: bool = False, creation_options: List = []) -> None:
"""
Convert an ENVI file to another output format with a gdal_translate call
Args:
original_file: Source envi file
destination_basename: Base of the output file (will get appropriate extension)
output_format: A viable gdal output data format.
cleanup: boolean indicating whether or not to cleanup original envi files
creation_options: GDAL creation options to pass for output file, e.g.: ['TILED=YES', 'COMPRESS=DEFLATE']
Returns:
None
"""
final_outname = destination_basename
if (output_format == 'GTiff'):
final_outname += '.tif'
elif (output_format == 'JPEG'):
final_outname += '.jpg'
elif (output_format == 'PNG'):
final_outname += '.png'
_logger.debug('Output format {}. Converting ENVI file to output data with creation options {}'.
format(output_format, creation_options))
options = ''
for co in creation_options:
options += ' -co {}'.format(co)
gdal.Translate(final_outname, gdal.Open(original_file, gdal.GA_ReadOnly), options=options)
test_outdataset = gdal.Open(final_outname, gdal.GA_ReadOnly)
if (cleanup):
if (test_outdataset is not None):
_logger.debug('Format transform successfull, cleanup ENVI files {}, {}, {}'.
format(original_file, original_file + '.hdr', original_file + '.aux.xml'))
os.remove(original_file)
os.remove(original_file + '.hdr')
try:
os.remove(original_file + '.aux.xml')
except OSError:
pass
else:
_logger.error('Failed to successfully convert output ENVI file to {}. ENVI file available at: {}'.
format(output_format, original_file))
[docs]def read_chunk_by_row(datasets: List[gdal.Dataset], pixel_upper_lefts: List[List[int]], x_size: int, y_size: int,
line_offset: int, nodata_value: float = None) -> np.array:
"""
Read a chunk of multiple datasets line-by-line.
Args:
datasets: each feature dataset to read
pixel_upper_lefts: upper left hand pixel of each dataset
x_size: size of x data to read
y_size: size of y data to read
line_offset: line offset from UL of each set to start reading at
nodata_value: value to encode to np.nan
Returns:
feature_array: feature array
"""
total_features = np.sum([f.RasterCount for f in datasets])
output_array = np.zeros((y_size, x_size, total_features))
feat_ind = 0
for _feat in range(len(datasets)):
dat = np.zeros((datasets[_feat].RasterCount, y_size, x_size))
for _line in range(y_size):
dat[:, _line:_line+1, :] = datasets[_feat].ReadAsArray(int(pixel_upper_lefts[_feat][0]),
int(pixel_upper_lefts[_feat][1] + line_offset + _line), x_size, 1).astype(np.float32)
# Swap dat from (b,y,x) to (y,x,b)
dat = np.moveaxis(dat, [0, 1, 2], [2, 0, 1])
output_array[..., feat_ind:feat_ind+dat.shape[-1]] = dat
feat_ind += dat.shape[-1]
if (output_array is not None):
output_array[output_array == nodata_value] = np.nan
return output_array