Source code for IHEWAwapor.WPdownload.download.GIS_functions

# -*- coding: utf-8 -*-
"""
Created on Tue Jul 30 14:16:35 2019

@author: Bert Coerver
"""
import os
import psutil
import numpy as np

import gdal
import osr


[docs]def GetGeoInfo(fh, subdataset=0, print_job=False): """ Substract metadata from a geotiff, HDF4 or netCDF file. Parameters ---------- fh : str Filehandle to file to be scrutinized. subdataset : int, optional Layer to be used in case of HDF4 or netCDF format, default is 0. print_job: bool Print job details, default True. Returns ------- driver : str Driver of the fh. NDV : float No-data-value of the fh. xsize : int Amount of pixels in x direction. ysize : int Amount of pixels in y direction. GeoT : list List with geotransform values. Projection : str Projection of fh. """ print('WaPOR GIS: Getting Geo Information...') checkMemory('GetGeoInfo Start') SourceDS = gdal.Open(fh, gdal.GA_ReadOnly) Type = SourceDS.GetDriver().ShortName if Type == 'HDF4' or Type == 'netCDF': SourceDS = gdal.Open(SourceDS.GetSubDatasets()[subdataset][0]) driver = gdal.GetDriverByName(Type) meta = SourceDS.GetMetadata() xsize = SourceDS.RasterXSize ysize = SourceDS.RasterYSize GeoT = SourceDS.GetGeoTransform() Projection = osr.SpatialReference() Projection.ImportFromWkt(SourceDS.GetProjectionRef()) subNDV = SourceDS.GetRasterBand(1).GetNoDataValue() subMeta = SourceDS.GetRasterBand(1).GetMetadata() if print_job: print('WaPOR GIS: Metadata : {v}, {t}'.format( v=meta, t=type(meta))) print('WaPOR GIS: xsize : {v}, {t}'.format( v=xsize, t=type(xsize))) print('WaPOR GIS: ysize : {v}, {t}'.format( v=ysize, t=type(ysize))) print('WaPOR GIS: GeoT : {v}, {t}'.format( v=GeoT, t=type(GeoT))) print('WaPOR GIS: Projection : {v}, {t}'.format( v=Projection.GetAttrValue('AUTHORITY', 1), t=type(Projection))) print('WaPOR GIS: sub NoDataValue : {v}, {t}'.format( v=subNDV, t=type(subNDV))) print('WaPOR GIS: sub Metadata : {v}, {t}'.format( v=subMeta, t=type(subMeta))) SourceDS = None checkMemory('GetGeoInfo End') return driver, subNDV, xsize, ysize, GeoT, Projection
[docs]def OpenAsArray(fh, bandnumber=1, dtype='float32', nan_values=False, print_job=False): """ Open a map as an numpy array. Parameters ---------- fh: str Filehandle to map to open. bandnumber : int, optional Band or layer to open as array, default is 1. dtype : str, optional Datatype of output array, default is 'float32'. nan_values : boolean, optional Convert he no-data-values into np.nan values, note that dtype needs to be a float if True. Default is False. print_job: bool Print job details, default True. Returns ------- Array: :obj:`numpy.ndarray` Array with the pixel values. """ print('WaPOR GIS: Opening file...') checkMemory('OpenAsArray Start') datatypes = { "uint8": np.uint8, "int8": np.int8, "uint16": np.uint16, "int16": np.int16, "Int16": np.int16, "uint32": np.uint32, "int32": np.int32, "Int32": np.int32, "float32": np.float32, "float64": np.float64, "Float32": np.float32, "Float64": np.float64, "complex64": np.complex64, "complex128": np.complex128, "Complex64": np.complex64, "Complex128": np.complex128, } DataSet = gdal.Open(fh, gdal.GA_ReadOnly) checkMemory('OpenAsArray Opened') Type = DataSet.GetDriver().ShortName if Type == 'HDF4': Subdataset = gdal.Open(DataSet.GetSubDatasets()[bandnumber][0]) NDV = int(Subdataset.GetMetadata()['_FillValue']) else: Subdataset = DataSet.GetRasterBand(bandnumber) NDV = Subdataset.GetNoDataValue() if print_job: print('WaPOR GIS: Band DataType : {v}'.format( v=Subdataset.DataType)) print('WaPOR GIS: Band DataTypeName : {v}'.format( v=gdal.GetDataTypeName(Subdataset.DataType))) print('WaPOR GIS: NoDataValue : {v}, {t}'.format( v=NDV, t=type(NDV))) Array = Subdataset.ReadAsArray().astype(datatypes[dtype]) # Array = Subdataset.ReadAsArray() if print_job: print('WaPOR GIS: Band Array dtype : {v} {sp} {sz}'.format( v=Array.dtype.name, sp=Array.shape, sz=Array.size)) checkMemory('OpenAsArray Loaded') if nan_values: Array[Array == NDV] = np.nan DataSet = None checkMemory('OpenAsArray End') return Array
[docs]def CreateGeoTiff(fh, Array, driver, NDV, xsize, ysize, GeoT, Projection, explicit=True, compress=None, print_job=False): """ Creates a geotiff from a numpy array. Parameters ---------- fh : str Filehandle for output. Array: ndarray Array to convert to geotiff. driver : str Driver of the fh. NDV : float No-data-value of the fh. xsize : int Amount of pixels in x direction. ysize : int Amount of pixels in y direction. GeoT : list List with geotransform values. Projection : str Projection of fh. print_job: bool Print job details, default True. """ print('WaPOR GIS: Creating tiff file...') checkMemory('CreateGeoTiff Start') if print_job: print('WaPOR GIS: Array DataTypeName : {t}'.format( t=Array.dtype.name)) print('WaPOR GIS: No Data Value : {v} {t}'.format( v=NDV, t=NDV.dtype.name)) datatypes = { "uint8": 1, "int8": 1, "uint16": 2, "int16": 3, "Int16": 3, "uint32": 4, "int32": 5, "Int32": 5, "float32": 6, "float64": 7, "Float32": 6, "Float64": 7, "complex64": 10, "complex128": 11, "Complex64": 10, "Complex128": 11, } if compress is not None: DataSet = driver.Create( fh, xsize, ysize, 1, datatypes[Array.dtype.name], [ 'COMPRESS={0}'.format(compress), ]) else: # 'COMPRESS=LZW', # 'BIGTIFF=YES', DataSet = driver.Create( fh, xsize, ysize, 1, datatypes[Array.dtype.name], [ 'BLOCKXSIZE=256', 'BLOCKYSIZE=256' ]) if NDV is None: NDV = -9999 if explicit: Array[np.isnan(Array)] = NDV DataSet.SetGeoTransform(GeoT) DataSet.SetProjection(Projection.ExportToWkt()) DataSet.GetRasterBand(1).SetNoDataValue(float(NDV)) DataSet.GetRasterBand(1).WriteArray(Array) if "nt" not in Array.dtype.name: Array[Array == NDV] = np.nan DataSet = None checkMemory('CreateGeoTiff End')
[docs]def MatchProjResNDV(source_file, target_fhs, output_dir, resample='near', dtype='float32', scale=None, ndv_to_zero=False, print_job=False): """ Matches the projection, resolution and no-data-value of a list of target-files with a source-file and saves the new maps in output_dir. Parameters ---------- source_file : str The file to match the projection, resolution and ndv with. target_fhs : list The files to be reprojected. output_dir : str Folder to store the output. resample : str, optional Resampling method to use, default is 'near' (nearest neighbour). dtype : str, optional Datatype of output, default is 'float32'. scale : int, optional Multiple all maps with this value, default is None. print_job: bool Print job details, default True. Returns ------- output_files: :obj:`numpy.ndarray` Filehandles of the created files. """ print('WaPOR GIS: Matching projection...') output_files = np.array([]) dst_info = gdal.Info(gdal.Open(source_file), format='json') if not os.path.exists(output_dir): os.makedirs(output_dir) for target_file in target_fhs: folder, fn = os.path.split(target_file) output_file = os.path.join(output_dir, fn) src_info = gdal.Info(gdal.Open(target_file), format='json') gdal.Warp(output_file, target_file, format='GTiff', srcSRS=src_info['coordinateSystem']['wkt'], dstSRS=dst_info['coordinateSystem']['wkt'], srcNodata=src_info['bands'][0]['noDataValue'], dstNodata=dst_info['bands'][0]['noDataValue'], width=dst_info['size'][0], height=dst_info['size'][1], outputBounds=(dst_info['cornerCoordinates']['lowerLeft'][0], dst_info['cornerCoordinates']['lowerLeft'][1], dst_info['cornerCoordinates']['upperRight'][0], dst_info['cornerCoordinates']['upperRight'][1]), outputBoundsSRS=dst_info['coordinateSystem']['wkt'], resampleAlg=resample) output_files = np.append(output_files, output_file) if not np.any([scale == 1.0, scale is None, scale == 1]): driver, NDV, xsize, ysize, GeoT, Projection = GetGeoInfo( output_file) DATA = OpenAsArray(output_file, nan_values=True) * scale CreateGeoTiff( output_file, DATA, driver, NDV, xsize, ysize, GeoT, Projection) if ndv_to_zero: driver, NDV, xsize, ysize, GeoT, Projection = GetGeoInfo( output_file) DATA = OpenAsArray(output_file, nan_values=False) DATA[DATA == NDV] = 0.0 CreateGeoTiff( output_file, DATA, driver, NDV, xsize, ysize, GeoT, Projection) return output_files
[docs]def checkMemory(txt='', print_job=False): """Check available memory. Parameters ---------- txt : str Prefix text. print_job: bool Print job details, default True. """ mem = psutil.virtual_memory() if print_job: print('WaPOR GIS: > Memory available : {t} {v:.2f} MB'.format( t=txt, v=mem.available / 1024 / 1024))