Source code for pyroSAR.S1.auxil

##############################################################
# general utilities for Sentinel-1
# John Truckenbrodt 2016-2019
##############################################################

import os
import re
import sys
import requests
import zipfile as zf
from datetime import datetime
import xml.etree.ElementTree as ET
import numpy as np
from osgeo import gdal
from osgeo.gdalconst import GA_Update
from . import linesimplify as ls
from pyroSAR.examine import ExamineSnap
import progressbar as pb

from spatialist.ancillary import finder, urlQueryParser

try:
    import argparse
except ImportError:
    try:
        os.remove(os.path.join(os.path.dirname(sys.argv[0]), 'locale.pyc'))
    finally:
        import argparse


def init_parser():
    """
    initialize argument parser for S1 processing utilities
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('-t', '--transform', action='store_true', help='transform the final DEM to UTM coordinates')
    parser.add_argument('-l', '--logfiles', action='store_true', help='create logfiles of the executed GAMMA commands')
    parser.add_argument('-i', '--intermediates', action='store_true', help='keep intermediate files')
    parser.add_argument('-q', '--quiet', action='store_true', help='suppress standard console prints')
    parser.add_argument('-tr', '--targetresolution', default=20, help='the target resolution in meters for x and y',
                        type=int)
    parser.add_argument('-fg', '--func_geoback', default=2, help='backward geocoding interpolation function; '
                                                                 '0 - Nearest Neighbor, 1 - Bicubic Spline, 2 - Bicubic Spline-Log; '
                                                                 'method 1: negative values possible (e.g. in urban areas) - use method 2 to avoid this',
                        type=int)
    parser.add_argument('-fi', '--func_interp', default=0,
                        help='function for interpolation of layover/shadow/foreshortening/DEM gaps; '
                             '0 - set to 0, 1 - linear interpolation, 2 - actual value, 3 - nn-thinned', type=int)
    parser.add_argument('-poe', '--poedir', default=None,
                        help='directory containing aux_poeorb (precise orbit ephemerides) orbit state vector files')
    parser.add_argument('-res', '--resdir', default=None,
                        help='directory containing aux_resorb (restituted orbit) orbit state vector files')
    parser.add_argument('zipfile', help='S1 zipped scene archive to be used')
    parser.add_argument('tempdir', help='temporary directory for intermediate files')
    parser.add_argument('outdir', help='output directory')
    parser.add_argument('srtmdir', help='directory containing SRTM hgt tiles (subdirectories possible)')
    return parser


# todo check existence not by file name but by start and stop time; files are sometimes re-published
[docs]class OSV(object): """ interface for management of S1 Orbit State Vector (OSV) files input is a directory which is supposed to contain, or already contains, OSV files. Two subdirectories are expected and created otherwise: one for Precise Orbit Ephemerides (POE) named POEORB and one for Restituted Orbit (RES) files named RESORB Using method :meth:`match` the corresponding POE (priority) or RES file is returned for a timestamp. Timestamps are always handled in the format YYYYmmddTHHMMSS. Parameters ---------- osvdir: str the directory to write the orbit files to """ def __init__(self, osvdir=None): if osvdir is None: try: auxdatapath = ExamineSnap().auxdatapath except AttributeError: auxdatapath = os.path.join(os.path.expanduser('~'), '.snap', 'auxdata') osvdir = os.path.join(auxdatapath, 'Orbits', 'Sentinel-1') self.url = 'https://qc.sentinel1.eo.esa.int/api/v1/' self.outdir_poe = os.path.join(osvdir, 'POEORB') self.outdir_res = os.path.join(osvdir, 'RESORB') self.pattern = r'S1[AB]_OPER_AUX_(?:POE|RES)ORB_OPOD_[0-9TV_]{48}\.EOF' self.pattern_fine = r'(?P<sensor>S1[AB])_OPER_AUX_' \ r'(?P<type>(?:POE|RES)ORB)_OPOD_' \ r'(?P<publish>[0-9]{8}T[0-9]{6})_V' \ r'(?P<start>[0-9]{8}T[0-9]{6})_' \ r'(?P<stop>[0-9]{8}T[0-9]{6})\.EOF' self._init_dir() self._reorganize() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): return def _init_dir(self): """ create directories if they don't exist yet """ for dir in [self.outdir_poe, self.outdir_res]: if not os.path.isdir(dir): os.makedirs(dir) def _parse(self, file): basename = os.path.basename(file) groups = re.match(self.pattern_fine, basename).groupdict() return groups def _reorganize(self): """ compress and move EOF files into subdirectories Returns ------- """ message = True for subdir in [self.outdir_poe, self.outdir_res]: if not os.path.isdir(subdir): continue files = finder(subdir, [self.pattern], recursive=False, regex=True) for eof in files: base = os.path.basename(eof) target = os.path.join(self._subdir(eof), base + '.zip') os.makedirs(os.path.dirname(target), exist_ok=True) if not os.path.isfile(target): if message: print('compressing and reorganizing EOF files') message = False with zf.ZipFile(file=target, mode='w', compression=zf.ZIP_DEFLATED) as zip: zip.write(filename=eof, arcname=base) os.remove(eof) def _typeEvaluate(self, osvtype): """ evaluate the 'osvtype' method argument and return the corresponding remote repository and local directory Parameters ---------- osvtype: str the type of orbit files required; either 'POE' or 'RES' Returns ------- tuple of str the remote repository and local directory of the osv type """ if osvtype not in ['POE', 'RES']: raise IOError('type must be either "POE" or "RES"') if osvtype == 'POE': return self.outdir_poe else: return self.outdir_res
[docs] def catch(self, sensor, osvtype='POE', start=None, stop=None): """ check a server for files Parameters ---------- sensor: str or list The S1 mission(s): - 'S1A' - 'S1B' - ['S1A', 'S1B'] osvtype: {'POE', 'RES'} the type of orbit files required start: str the date to start searching for files in format YYYYmmddTHHMMSS stop: str the date to stop searching for files in format YYYYmmddTHHMMSS Returns ------- list the URLs of the remote OSV files """ # a dictionary for storing the url arguments query = {} if osvtype == 'POE': query['product_type'] = 'AUX_POEORB' elif osvtype == 'RES': query['product_type'] = 'AUX_RESORB' else: raise RuntimeError("osvtype must be either 'POE' or 'RES'") if sensor in ['S1A', 'S1B']: query['sentinel1__mission'] = sensor elif sorted(sensor) == ['S1A', 'S1B']: pass else: raise RuntimeError('unsupported input for parameter sensor') # the collection of files to be returned collection = [] # set the defined date or the date of the first existing OSV file otherwise # two days are added/subtracted from the defined start and stop dates since the # online query does only allow for searching the start time; hence, if e.g. # the start date is 2018-01-01T000000, the query would not return the corresponding # file, whose start date is 2017-12-31 (V20171231T225942_20180102T005942) if start is not None: date_start = datetime.strptime(start, '%Y%m%dT%H%M%S').strftime('%Y-%m-%dT%H:%M:%S') else: date_start = '2014-07-31' # set the defined date or the current date otherwise if stop is not None: date_stop = datetime.strptime(stop, '%Y%m%dT%H%M%S').strftime('%Y-%m-%dT%H:%M:%S') else: date_stop = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') # append the time frame to the query dictionary query['validity_start__gte'] = date_start query['validity_stop__lte'] = date_stop print('searching for new {} files'.format(osvtype)) target = urlQueryParser(self.url, query).replace('%3A', ':') pbar = None while target is not None: response = requests.get(target).json() if pbar is None: print(target) pbar = pb.ProgressBar(max_value=response['count']).start() remotes = [item['remote_url'] for item in response['results']] collection += remotes pbar.update(len(collection)) target = response['next'] pbar.finish() if osvtype == 'RES' and self.maxdate('POE', 'stop') is not None: collection = [x for x in collection if self.date(x, 'start') > self.maxdate('POE', 'stop')] return collection
[docs] def date(self, file, datetype): """ extract a date from an OSV file name Parameters ---------- file: str the OSV file datetype: {'publish', 'start', 'stop'} one of three possible date types contained in the OSV filename Returns ------- str a time stamp in the format YYYYmmddTHHMMSS """ return self._parse(file)[datetype]
[docs] def clean_res(self): """ delete all RES files for whose date a POE file exists """ maxdate_poe = self.maxdate('POE', 'stop') if maxdate_poe is not None: deprecated = [x for x in self.getLocals('RES') if self.date(x, 'stop') < maxdate_poe] print('deleting {} RES file{}'.format(len(deprecated), '' if len(deprecated) == 1 else 's')) for item in deprecated: os.remove(item)
[docs] def getLocals(self, osvtype='POE'): """ get a list of local files of specific type Parameters ---------- osvtype: {'POE', 'RES'} the type of orbit files required Returns ------- list a selection of local OSV files """ directory = self._typeEvaluate(osvtype) return finder(directory, [self.pattern], regex=True)
[docs] def maxdate(self, osvtype='POE', datetype='stop'): """ return the latest date of locally existing POE/RES files Parameters ---------- osvtype: {'POE', 'RES'} the type of orbit files required datetype: {'publish', 'start', 'stop'} one of three possible date types contained in the OSV filename Returns ------- str a timestamp in format YYYYmmddTHHMMSS """ directory = self._typeEvaluate(osvtype) files = finder(directory, [self.pattern], regex=True) return max([self.date(x, datetype) for x in files]) if len(files) > 0 else None
[docs] def mindate(self, osvtype='POE', datetype='start'): """ return the earliest date of locally existing POE/RES files Parameters ---------- osvtype: {'POE', 'RES'} the type of orbit files required datetype: {'publish', 'start', 'stop'} one of three possible date types contained in the OSV filename Returns ------- str a timestamp in format YYYYmmddTHHMMSS """ directory = self._typeEvaluate(osvtype) files = finder(directory, [self.pattern], regex=True) return min([self.date(x, datetype) for x in files]) if len(files) > 0 else None
[docs] def match(self, sensor, timestamp, osvtype='POE'): """ return the corresponding OSV file for the provided sensor and time stamp. The file returned is one which covers the acquisition time and, if multiple exist, the one which was published last. In case a list of options is provided as osvtype, the file of higher accuracy (i.e. POE over RES) is returned. Parameters ---------- sensor: str The S1 mission: - 'S1A' - 'S1B' timestamp: str the time stamp in the format 'YYYmmddTHHMMSS' osvtype: {'POE', 'RES'} or list the type of orbit files required; either 'POE', 'RES' or a list of both Returns ------- str the best matching orbit file (overlapping time plus latest publication date) """ # list all locally existing files of the defined type if osvtype in ['POE', 'RES']: locals = self.getLocals(osvtype) # filter the files to those which contain data for the defined time stamp files = [x for x in locals if self.date(x, 'start') <= timestamp <= self.date(x, 'stop')] files = [x for x in files if os.path.basename(x).startswith(sensor)] if len(files) > 0: # select the file which was published last best = self.sortByDate(files, 'publish')[-1] return best elif len(files) == 1: return files[0] return None elif sorted(osvtype) == ['POE', 'RES']: best = self.match(sensor=sensor, timestamp=timestamp, osvtype='POE') if not best: best = self.match(sensor=sensor, timestamp=timestamp, osvtype='RES') return best
[docs] def retrieve(self, files): """ download a list of remote files into the respective subdirectories, i.e. POEORB or RESORB Parameters ---------- files: list a list of remotely existing OSV files as returned by method :meth:`catch` Returns ------- """ downloads = [] for remote in files: outdir = self._subdir(remote) os.makedirs(outdir, exist_ok=True) basename = os.path.basename(remote) local = os.path.join(outdir, basename) + '.zip' if not os.path.isfile(local): downloads.append((remote, local, basename)) if len(downloads) == 0: return print('downloading {} file{}'.format(len(downloads), '' if len(downloads) == 1 else 's')) with pb.ProgressBar(max_value=len(downloads)) as pbar: for remote, local, basename in downloads: infile = requests.get(remote) with zf.ZipFile(file=local, mode='w', compression=zf.ZIP_DEFLATED) \ as outfile: outfile.writestr(zinfo_or_arcname=basename, data=infile.content) infile.close() pbar.update(1) self.clean_res()
[docs] def sortByDate(self, files, datetype='start'): """ sort a list of OSV files by a specific date type Parameters ---------- files: list some OSV files datetype: {'publish', 'start', 'stop'} one of three possible date types contained in the OSV filename Returns ------- list the input OSV files sorted by the defined date """ return sorted(files, key=lambda x: self.date(x, datetype))
def _subdir(self, file): """ | return the subdirectory in which to store the EOF file, | i.e. basedir/{type}ORB/{sensor}/{year}/{month} | e.g. basedir/POEORB/S1A/2018/12 Parameters ---------- file: str the EOF filename Returns ------- str the target directory """ attr = self._parse(file) outdir = self._typeEvaluate(attr['type'][:3]) start = self.date(file, datetype='start') start = datetime.strptime(start, '%Y%m%dT%H%M%S') month = '{:02d}'.format(start.month) outdir = os.path.join(outdir, attr['sensor'], str(start.year), month) return outdir
[docs]def removeGRDBorderNoise(scene, method='pyroSAR'): """ Mask out Sentinel-1 image border noise. This function implements the method for removing GRD border noise as recommended by ESA and implemented in SNAP and additionally adds further refinement of the result using an image border line simplification approach. In this approach the border between valid and invalid pixels is first simplified using the method by Visvalingam and Whyatt references below. The line segments of the new border are then shifted until all pixels considered invalid before the simplification are again on one side of the line. See image below for further clarification. References: - 'Masking "No-value" Pixels on GRD Products generated by the Sentinel-1 ESA IPF' (issue 2.1 Jan 29 2018); available online under https://sentinel.esa.int/web/sentinel/user-guides/sentinel-1-sar/document-library - Visvalingam, M. and Whyatt J.D. (1993): "Line Generalisation by Repeated Elimination of Points", Cartographic J., 30 (1), 46 - 51 Parameters ---------- scene: ~pyroSAR.drivers.SAFE the Sentinel-1 scene object method: str the border noise removal method to be applied; one of the following: - 'ESA': the pure implementation as described by ESA - 'pyroSAR': the ESA method plus the custom pyroSAR refinement .. figure:: figures/S1_bnr.png :scale: 30% Demonstration of the border nose removal for a vertical left image border. The area under the respective lines covers pixels considered valid, everything above will be masked out. The blue line is the result of the noise removal as recommended by ESA, in which a lot of noise is still present. The red line is the over-simplified result using the Visvalingam-Whyatt method of poly-line vertex reduction. The green line is the final result after further correcting the VW-simplified result. """ if scene.compression is not None: raise RuntimeError('scene is not yet unpacked') if method not in ['pyroSAR', 'ESA']: raise AttributeError("parameter 'method' must be either 'pyroSAR' or 'ESA'") blocksize = 2000 # compute noise scaling factor if scene.meta['IPF_version'] >= 2.9: print('border noise removal not necessary for IPF version {}'.format(scene.meta['IPF_version'])) return elif scene.meta['IPF_version'] <= 2.5: knoise = {'IW': 75088.7, 'EW': 56065.87}[scene.acquisition_mode] cads = scene.getFileObj(scene.findfiles('calibration-s1[ab]-[ie]w-grd-(?:hh|vv)')[0]) caltree = ET.fromstring(cads.read()) cads.close() adn = float(caltree.find('.//calibrationVector/dn').text.split()[0]) if scene.meta['IPF_version'] < 2.34: scalingFactor = knoise * adn else: scalingFactor = knoise * adn * adn else: scalingFactor = 1 # read noise vectors from corresponding annotation xml noisefile = scene.getFileObj(scene.findfiles('noise-s1[ab]-[ie]w-grd-(?:hh|vv)')[0]) noisetree = ET.fromstring(noisefile.read()) noisefile.close() noiseVectors = noisetree.findall('.//noiseVector') # define boundaries of image subsets to be masked (4x the first lines/samples of the image boundaries) subsets = [(0, 0, blocksize, scene.lines), (0, 0, scene.samples, blocksize), (scene.samples - blocksize, 0, scene.samples, scene.lines), (0, scene.lines - blocksize, scene.samples, scene.lines)] # extract column indices of noise vectors yi = np.array([int(x.find('line').text) for x in noiseVectors]) # create links to the tif files for a master co-polarization and all other polarizations as slaves master = scene.findfiles('s1.*(?:vv|hh).*tiff')[0] ras_master = gdal.Open(master, GA_Update) ras_slaves = [gdal.Open(x, GA_Update) for x in scene.findfiles('s1.*tiff') if x != master] outband_master = ras_master.GetRasterBand(1) outband_slaves = [x.GetRasterBand(1) for x in ras_slaves] # iterate over the four image subsets for subset in subsets: print(subset) xmin, ymin, xmax, ymax = subset xdiff = xmax - xmin ydiff = ymax - ymin # linear interpolation of noise vectors to array noise_interp = np.empty((ydiff, xdiff), dtype=float) for i in range(0, len(noiseVectors)): if ymin <= yi[i] <= ymax: # extract row indices of noise vector xi = [int(x) for x in noiseVectors[i].find('pixel').text.split()] # extract noise values noise = [float(x) for x in noiseVectors[i].find('noiseLut').text.split()] # interpolate values along rows noise_interp[yi[i] - ymin, :] = np.interp(range(0, xdiff), xi, noise) for i in range(0, xdiff): yi_t = yi[(ymin <= yi) & (yi <= ymax)] - ymin # interpolate values along columns noise_interp[:, i] = np.interp(range(0, ydiff), yi_t, noise_interp[:, i][yi_t]) # read subset of image to array and subtract interpolated noise (denoising) mat_master = outband_master.ReadAsArray(*[xmin, ymin, xdiff, ydiff]) denoisedBlock = mat_master.astype(float) ** 2 - noise_interp * scalingFactor # mask out all pixels with a value below 0.5 in the denoised block or 30 in the original block denoisedBlock[(denoisedBlock < 0.5) | (mat_master < 30)] = 0 denoisedBlock = np.sqrt(denoisedBlock) if method == 'pyroSAR': # helper functions for masking out negative values def helper1(x): return len(x) - np.argmax(x > 0) def helper2(x): return len(x) - np.argmax(x[::-1] > 0) # mask out negative values and simplify borders (custom implementation) if subset == (0, 0, blocksize, scene.lines): border = np.apply_along_axis(helper1, 1, denoisedBlock) border = blocksize - ls.reduce(border) for j in range(0, ydiff): denoisedBlock[j, :border[j]] = 0 denoisedBlock[j, border[j]:] = 1 elif subset == (0, scene.lines - blocksize, scene.samples, scene.lines): border = np.apply_along_axis(helper2, 0, denoisedBlock) border = ls.reduce(border) for j in range(0, xdiff): denoisedBlock[border[j]:, j] = 0 denoisedBlock[:border[j], j] = 1 elif subset == (scene.samples - blocksize, 0, scene.samples, scene.lines): border = np.apply_along_axis(helper2, 1, denoisedBlock) border = ls.reduce(border) for j in range(0, ydiff): denoisedBlock[j, border[j]:] = 0 denoisedBlock[j, :border[j]] = 1 elif subset == (0, 0, scene.samples, blocksize): border = np.apply_along_axis(helper1, 0, denoisedBlock) border = blocksize - ls.reduce(border) for j in range(0, xdiff): denoisedBlock[:border[j], j] = 0 denoisedBlock[border[j]:, j] = 1 mat_master[denoisedBlock == 0] = 0 # write modified array back to original file outband_master.WriteArray(mat_master, xmin, ymin) outband_master.FlushCache() # perform reading, masking and writing for all other polarizations for outband in outband_slaves: mat = outband.ReadAsArray(*[xmin, ymin, xdiff, ydiff]) mat[denoisedBlock == 0] = 0 outband.WriteArray(mat, xmin, ymin) outband.FlushCache() # detach file links outband_master = None ras_master = None for outband in outband_slaves: outband = None for ras in ras_slaves: ras = None