import os
import re
import shutil
import requests
import subprocess as sp
from xml.dom import minidom
import xml.etree.ElementTree as ET
from osgeo import gdal
from osgeo.gdalconst import GA_Update
from pyroSAR import identify
from pyroSAR._dev_config import LOOKUP
from pyroSAR.examine import ExamineSnap
from spatialist.auxil import gdal_translate
from spatialist.ancillary import finder
import logging
log = logging.getLogger(__name__)
[docs]def parse_recipe(name):
"""
parse a SNAP recipe
Parameters
----------
name: str
the name of the recipe; current options:
* `blank`: a workflow without any nodes
* `geocode`: a basic workflow containing Read, Apply-Orbit-File, Calibration, Terrain-Flattening and Write nodes
Returns
-------
Workflow
the parsed recipe
Examples
--------
>>> from pyroSAR.snap.auxil import parse_recipe
>>> workflow = parse_recipe('base')
"""
name = name if name.endswith('.xml') else name + '.xml'
absname = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'recipes', name)
return Workflow(absname)
[docs]def parse_node(name):
"""
parse a XML node recipe
Parameters
----------
name: str
the name of the processing node, e.g. Terrain-Correction
Returns
-------
Node
the parsed node
Examples
--------
>>> ml = parse_node('ThermalNoiseRemoval')
>>> print(ml.parameters)
{'selectedPolarisations': None, 'removeThermalNoise': 'true', 'reIntroduceThermalNoise': 'false'}
"""
name = name if name.endswith('.xml') else name + '.xml'
absname = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'recipes', 'nodes', name)
with open(absname, 'r') as workflow:
element = ET.fromstring(workflow.read())
return Node(element)
def execute(xmlfile, cleanup=True, gpt_exceptions=None, gpt_args=None, verbose=True):
"""
execute SNAP workflows via the Graph Processing Tool gpt.
This function merely calls gpt with some additional command
line arguments and raises a RuntimeError on fail. This
function is used internally by function :func:`gpt`, which
should be used instead.
Parameters
----------
xmlfile: str
the name of the workflow XML file
cleanup: bool
should all files written to the temporary directory during function execution be deleted after processing?
gpt_exceptions: dict
a dictionary to override the configured GPT executable for certain operators;
each (sub-)workflow containing this operator will be executed with the define executable;
- e.g. ``{'Terrain-Flattening': '/home/user/snap/bin/gpt'}``
gpt_args: list or None
a list of additional arguments to be passed to the gpt call
- e.g. ``['-x', '-c', '2048M']`` for increased tile cache size and intermediate clearing
verbose: bool
print out status messages?
Returns
-------
Raises
------
RuntimeError
"""
# read the file and extract some information
workflow = Workflow(xmlfile)
write = workflow['Write']
outname = write.parameters['file']
infile = workflow['Read'].parameters['file']
workers = [x.id for x in workflow if x.operator not in ['Read', 'Write']]
message = ' -> '.join(workers)
gpt_exec = None
if gpt_exceptions is not None:
for item, exec in gpt_exceptions.items():
if item in workers:
gpt_exec = exec
message += ' (using {})'.format(exec)
break
if verbose:
print(message)
# try to find the GPT executable
if gpt_exec is None:
try:
gpt_exec = ExamineSnap().gpt
except AttributeError:
raise RuntimeError('could not find SNAP GPT executable')
# create the list of arguments to be passed to the subprocess module calling GPT
cmd = [gpt_exec, '-e']
if isinstance(gpt_args, list):
cmd.extend(gpt_args)
if format == 'GeoTiff-BigTIFF':
cmd.extend([
# '-Dsnap.dataio.reader.tileWidth=*',
# '-Dsnap.dataio.reader.tileHeight=1',
'-Dsnap.dataio.bigtiff.tiling.width=256',
'-Dsnap.dataio.bigtiff.tiling.height=256',
# '-Dsnap.dataio.bigtiff.compression.type=LZW',
# '-Dsnap.dataio.bigtiff.compression.quality=0.75'
])
cmd.append(xmlfile)
print(cmd)
# execute the workflow
proc = sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE)
out, err = proc.communicate()
out = out.decode('utf-8') if isinstance(out, bytes) else out
err = err.decode('utf-8') if isinstance(err, bytes) else err
# check for a message indicating an unknown parameter,
# which can easily be removed from the workflow
pattern = r"Error: \[NodeId: (?P<id>[a-zA-Z0-9-_]*)\] " \
r"Operator \'[a-zA-Z0-9-_]*\': " \
r"Unknown element \'(?P<par>[a-zA-Z]*)\'"
match = re.search(pattern, err)
if proc.returncode == 0:
return
# delete unknown parameters and run the modified workflow
elif proc.returncode == 1 and match is not None:
replace = match.groupdict()
with Workflow(xmlfile) as flow:
print(' removing parameter {id}:{par} and executing modified workflow'.format(**replace))
node = flow[replace['id']]
del node.parameters[replace['par']]
flow.write(xmlfile)
execute(xmlfile, cleanup=cleanup, gpt_exceptions=gpt_exceptions,
gpt_args=gpt_args, verbose=verbose)
# append additional information to the error message and raise an error
else:
if proc.returncode == -9:
submessage = '[{}] the process was killed by SNAP (process return code -9). ' \
'One possible cause is a lack of memory.'.format(os.path.basename(xmlfile))
else:
submessage = '{}{}\n[{}] failed with return code {}'
if cleanup:
if os.path.isfile(outname + '.tif'):
os.remove(outname + '.tif')
elif os.path.isdir(outname):
shutil.rmtree(outname)
raise RuntimeError(submessage.format(out, err, os.path.basename(xmlfile), proc.returncode))
[docs]def gpt(xmlfile, groups=None, cleanup=True,
gpt_exceptions=None, gpt_args=None,
removeS1BorderNoiseMethod='pyroSAR', basename_extensions=None):
"""
wrapper for ESA SNAP's Graph Processing Tool GPT.
Input is a readily formatted workflow XML file as
created by function :func:`~pyroSAR.snap.util.geocode`.
Additional to calling GPT, this function will
* execute the workflow in groups as defined by `groups`
* encode a nodata value into the output file if the format is GeoTiff-BigTIFF
* convert output files to GeoTiff if the output format is ENVI
Parameters
----------
xmlfile: str
the name of the workflow XML file
groups: list
a list of lists each containing IDs for individual nodes
cleanup: bool
should all files written to the temporary directory during function execution be deleted after processing?
gpt_exceptions: dict
a dictionary to override the configured GPT executable for certain operators;
each (sub-)workflow containing this operator will be executed with the define executable;
- e.g. ``{'Terrain-Flattening': '/home/user/snap/bin/gpt'}``
gpt_args: list or None
a list of additional arguments to be passed to the gpt call
- e.g. ``['-x', '-c', '2048M']`` for increased tile cache size and intermediate clearing
removeS1BorderNoiseMethod: str
the border noise removal method to be applied, See :func:`pyroSAR.S1.removeGRDBorderNoise` for details; one of the following:
- 'ESA': the pure implementation as described by ESA
- 'pyroSAR': the ESA method plus the custom pyroSAR refinement
basename_extensions: list of str
names of additional parameters to append to the basename, e.g. ['orbitNumber_rel']
Returns
-------
Raises
------
RuntimeError
"""
workflow = Workflow(xmlfile)
write = workflow['Write']
outname = write.parameters['file']
suffix = workflow.suffix
format = write.parameters['formatName']
dem_name = workflow.tree.find('.//demName').text
if dem_name == 'External DEM':
dem_nodata = float(workflow.tree.find('.//externalDEMNoDataValue').text)
else:
dem_nodata = 0
if 'Remove-GRD-Border-Noise' in workflow.ids and removeS1BorderNoiseMethod == 'pyroSAR':
if 'SliceAssembly' in workflow.operators:
raise RuntimeError("pyroSAR's custom border noise removal is not yet implemented for multiple scene inputs")
xmlfile = os.path.join(outname,
os.path.basename(xmlfile.replace('_bnr', '')))
if not os.path.isdir(outname):
os.makedirs(outname)
# border noise removal is done outside of SNAP and the node is thus removed from the workflow
del workflow['Remove-GRD-Border-Noise']
# remove the node name from the groups
i = 0
while i < len(groups) - 1:
if 'Remove-GRD-Border-Noise' in groups[i]:
del groups[i][groups[i].index('Remove-GRD-Border-Noise')]
if len(groups[i]) == 0:
del groups[i]
else:
i += 1
# identify the input scene, unpack it and perform the custom border noise removal
read = workflow['Read']
scene = identify(read.parameters['file'])
print('unpacking scene')
scene.unpack(outname)
print('removing border noise..')
scene.removeGRDBorderNoise(method=removeS1BorderNoiseMethod)
# change the name of the input file to that of the unpacked archive
read.parameters['file'] = scene.scene
# write a new workflow file
workflow.write(xmlfile)
print('executing node sequence{}..'.format('s' if groups is not None else ''))
try:
if groups is not None:
subs = split(xmlfile, groups)
for sub in subs:
execute(sub, cleanup=cleanup, gpt_exceptions=gpt_exceptions, gpt_args=gpt_args)
else:
execute(xmlfile, cleanup=cleanup, gpt_exceptions=gpt_exceptions, gpt_args=gpt_args)
except RuntimeError as e:
if cleanup:
shutil.rmtree(outname)
raise RuntimeError(str(e) + '\nfailed: {}'.format(xmlfile))
if format == 'ENVI':
print('converting to GTiff')
translateoptions = {'options': ['-q', '-co', 'INTERLEAVE=BAND', '-co', 'TILED=YES'],
'format': 'GTiff'}
for item in finder(outname, ['*.img'], recursive=False):
if re.search('[HV]{2}', item):
pol = re.search('[HV]{2}', item).group()
name_new = outname.replace(suffix, '{0}_{1}.tif'.format(pol, suffix))
else:
base = os.path.splitext(os.path.basename(item))[0] \
.replace('elevation', 'DEM')
name_new = outname.replace(suffix, '{0}.tif'.format(base))
nodata = dem_nodata if re.search('elevation', item) else 0
translateoptions['noData'] = nodata
gdal_translate(item, name_new, translateoptions)
# by default the nodata value is not registered in the GTiff metadata
elif format == 'GeoTiff-BigTIFF':
ras = gdal.Open(outname + '.tif', GA_Update)
for i in range(1, ras.RasterCount + 1):
ras.GetRasterBand(i).SetNoDataValue(0)
ras = None
###########################################################################
# write the Sentinel-1 manifest.safe file as addition to the actual product
readers = workflow['operator=Read']
for reader in readers:
infile = reader.parameters['file']
try:
id = identify(infile)
if id.sensor in ['S1A', 'S1B']:
manifest = id.getFileObj(id.findfiles('manifest.safe')[0])
basename = id.outname_base(basename_extensions)
basename = '{0}_{1}_manifest.safe'.format(basename, suffix)
outdir = os.path.dirname(outname)
outname_manifest = os.path.join(outdir, basename)
with open(outname_manifest, 'wb') as out:
out.write(manifest.read())
except RuntimeError:
continue
###########################################################################
if cleanup:
shutil.rmtree(outname)
print('done')
def is_consistent(workflow):
"""
check whether all nodes take either no source node or one that is in the list
Parameters
----------
workflow: Workflow
the workflow to be analyzed
Returns
-------
bool
is the list of nodes consistent?
"""
ids = workflow.ids
check = []
for node in workflow:
source = node.source
if source is None or source in ids or all([x in ids for x in source]):
check.append(True)
else:
check.append(False)
for node in workflow:
successors = workflow.successors(node.id, recursive=True)
operators = [workflow[x].operator for x in successors]
if node.operator == 'Write' or 'Write' in operators:
check.append(True)
else:
log.debug('node {} does not have a Write successor'.format(node.id))
check.append(False)
return all(check)
def split(xmlfile, groups):
"""
split a workflow file into groups and write them to separate workflows including source and write target linking.
The new workflows are written to a sub-directory 'temp' of the target directory defined in the input's 'Write' node.
Each new workflow is parametrized with a Read and Write node if they don't already exist. Temporary outputs are
written to BEAM-DIMAP files named after the workflow suffix sequence.
Parameters
----------
xmlfile: str
the workflow to be split
groups: list
a list of lists each containing IDs for individual nodes
Returns
-------
list of str
the names of the newly written temporary workflows
Raises
------
RuntimeError
"""
workflow = Workflow(xmlfile)
write = workflow['Write']
out = write.parameters['file']
tmp = os.path.join(out, 'temp')
if not os.path.isdir(tmp):
os.makedirs(tmp)
# the temporary XML files
outlist = []
# the names and format of temporary products
prod_tmp = {}
prod_tmp_format = {}
for position, group in enumerate(groups):
node_lookup = {}
log.debug('creating new workflow for group {}'.format(group))
new = parse_recipe('blank')
nodes = [workflow[x] for x in group]
for node in nodes:
id_old = node.id
sources = node.source
if sources is None:
sources = []
resetSuccessorSource = False
elif isinstance(sources, list):
resetSuccessorSource = False
else:
resetSuccessorSource = True
sources = [sources]
reset = []
for source in sources:
if source not in group:
read = new.insert_node(parse_node('Read'), void=False,
resetSuccessorSource=resetSuccessorSource)
reset.append(read.id)
read.parameters['file'] = prod_tmp[source]
read.parameters['formatName'] = prod_tmp_format[source]
node_lookup[read.id] = source
else:
reset.append(source)
if isinstance(sources, list):
sources_new_pos = [list(node_lookup.values()).index(x) for x in sources]
sources_new = [list(node_lookup.keys())[x] for x in sources_new_pos]
newnode = new.insert_node(node, before=sources_new, void=False)
else:
newnode = new.insert_node(node, void=False)
node_lookup[newnode.id] = id_old
if not resetSuccessorSource:
newnode.source = reset
# if possible, read the name of the SAR product for parsing names of temporary files
# this was found necessary for SliceAssembly, which expects the names in a specific format
products = [x.parameters['file'] for x in new['operator=Read']]
try:
id = identify(products[0])
filename = os.path.basename(id.scene)
except (RuntimeError, OSError):
filename = os.path.basename(products[0])
basename = os.path.splitext(filename)[0]
basename = re.sub(r'_tmp[0-9]+', '', basename)
# add a Write node to all dangling nodes
counter = 0
for node in new:
if new.successors(node.id) == [] and node.id != 'Write':
write = parse_node('Write')
new.insert_node(write, before=node.id)
id = str(position) if counter == 0 else '{}-{}'.format(position, counter)
tmp_out = os.path.join(tmp, '{}_tmp{}.dim'.format(basename, id))
prod_tmp[node_lookup[node.id]] = tmp_out
prod_tmp_format[node_lookup[node.id]] = 'BEAM-DIMAP'
write.parameters['file'] = tmp_out
write.parameters['formatName'] = 'BEAM-DIMAP'
counter += 1
if not is_consistent(new):
message = 'inconsistent group:\n {}'.format(' -> '.join(group))
raise RuntimeError(message)
outname = os.path.join(tmp, '{}_tmp{}.xml'.format(basename, position))
new.write(outname)
outlist.append(outname)
return outlist
def groupbyWorkers(xmlfile, n=2):
"""
split SNAP workflow into groups containing a maximum defined number of operators
Parameters
----------
xmlfile: str
the SNAP xml workflow
n: int
the maximum number of worker nodes in each group; Read and Write are excluded
Returns
-------
list
a list of lists each containing the IDs of all nodes belonging to the groups including Read and Write nodes;
this list can e.g. be passed to function :func:`split` to split the workflow into new sub-workflow files based
on the newly created groups
"""
workflow = Workflow(xmlfile)
workers_id = [x.id for x in workflow if x.operator not in ['Read', 'Write']]
readers_id = [x.id for x in workflow['operator=Read']]
writers_id = [x.id for x in workflow['operator=Write']]
workers_groups = [workers_id[i:i + n] for i in range(0, len(workers_id), n)]
nodes_groups = []
for group in workers_groups:
newgroup = []
for worker in group:
newgroup.append(worker)
source = workflow[worker].source
if not isinstance(source, list):
source = [source]
for item in source:
if item in readers_id:
newgroup.insert(newgroup.index(worker), item)
for writer in writers_id:
if workflow[writer].source == worker:
newgroup.append(writer)
nodes_groups.append(newgroup)
return nodes_groups
[docs]class Workflow(object):
"""
Class for convenient handling of SNAP XML workflows
Parameters
----------
xmlfile: str
the workflow XML file
"""
def __init__(self, xmlfile):
with open(xmlfile, 'r') as infile:
self.tree = ET.fromstring(infile.read())
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def __getitem__(self, item):
pattern = '(?P<key>[a-zA-Z-_]*)=(?P<value>[a-zA-Z-_]*)'
if isinstance(item, int):
return self.nodes()[item]
elif isinstance(item, str):
if re.search(pattern, item):
key, value = re.search(pattern, item).groups()
return [x for x in self if getattr(x, key) == value]
else:
try:
return Node(self.tree.find('.//node[@id="{}"]'.format(item)))
except TypeError:
raise KeyError('unknown key: {}'.format(item))
else:
raise TypeError('item must be of type int or str')
def __len__(self):
return len(self.tree.findall('node'))
def __delitem__(self, key):
if not isinstance(key, str):
raise TypeError('key must be of type str')
element = self.tree.find('.//node[@id="{}"]'.format(key))
node = Node(element)
source = node.source
successors = [x for x in self if x.source == key]
for node in successors:
node.source = source
self.tree.remove(element)
def __str__(self):
self.__optimize_appearance()
rough_string = ET.tostring(self.tree, 'utf-8')
reparsed = minidom.parseString(rough_string)
return reparsed.toprettyxml(indent='\t', newl='')
def __iter__(self):
return iter(self.nodes())
[docs] def successors(self, id, recursive=False):
"""
find the succeeding node(s) of a node
Parameters
----------
id: str
the ID of node
recursive: bool
find successors recursively?
Returns
-------
list of str
the ID(s) of the successors
"""
if not isinstance(id, str):
raise TypeError("'id' must be of type 'str', is {}".format(type(id)))
successors = []
for node in self:
if node.source == id or (isinstance(node.source, list) and id in node.source):
successors.append(node.id)
if recursive:
for item in successors:
new = self.successors(item, recursive=True)
successors.extend(new)
successors = list(set(successors))
return successors
def __reset_successor_source(self, id):
"""
reset the sources of nodes to that of a newly inserted one
Parameters
----------
id: str
the ID of the newly inserted node
Returns
-------
"""
def reset(id, source):
if isinstance(source, list):
for item in source:
reset(id, item)
else:
try:
# find the source nodes of the current node
if source is not None:
successors = self.successors(source)
else:
return # nothing to reset
# delete the ID of the current node from the successors
if id in successors:
del successors[successors.index(id)]
for successor in successors:
successor_source = self[successor].source
if isinstance(successor_source, list):
successor_source[successor_source.index(source)] = id
self[successor].source = successor_source
else:
self[successor].source = id
except IndexError:
# case where no successor exists because the new node
# is the new last node in the graph
pass
except RuntimeError:
# case where the successor node is of type Read
pass
reset(id, self[id].source)
def __optimize_appearance(self):
"""
assign grid coordinates to the nodes for display in the SNAP GraphBuilder GUI
This method is applied by :meth:`__str__` for the final formatting of the XML text representation
Returns
-------
"""
layout = self.tree.find('.//applicationData[@id="Presentation"]')
counter = 0
x = 5
for id in self.ids:
pres = layout.find('.//node[@id="{}"]'.format(id))
y = 20. if counter % 2 == 0 else 160.
if pres is None:
pres = ET.SubElement(layout, 'node', {'id': id})
pos = ET.SubElement(pres, 'displayPosition',
{'x': "{}".format(x), 'y': "{}".format(y)})
else:
pres.find('displayPosition').attrib['x'] = "{}".format(x)
pres.find('displayPosition').attrib['y'] = "{}".format(y)
counter += 1
x += len(id) * 8
@property
def ids(self):
"""
Returns
-------
list
the IDs of all nodes
"""
return [node.id for node in self]
[docs] def index(self, node):
"""
Parameters
----------
node: Node
a node in the workflow
Returns
-------
int
the index position of the node in the workflow
"""
return list(self.tree).index(node.element)
[docs] def insert_node(self, node, before=None, after=None, resetSuccessorSource=True, void=True):
"""
insert a node into the workflow including setting its source to its predecessor
and setting its ID as source of the successor.
Parameters
----------
node: Node
the node to be inserted
before: str or list
the ID(s) of the node(s) before the newly inserted node; a list of node IDs is intended for nodes that
require multiple sources, e.g. sliceAssembly
after: str
the ID of the node after the newly inserted node
resetSuccessorSource: bool
reset the source of the successor node to the ID of the newly inserted node?
void: bool
if false, the function returns the node
Returns
-------
Node or None
the new node or None, depending on arguement `void`
"""
if before is None and after is None and len(self) > 0:
before = self[len(self) - 1].id
if before and not after:
if isinstance(before, list):
indices = [self.index(self[x]) for x in before]
predecessor = self[before[indices.index(max(indices))]]
else:
predecessor = self[before]
log.debug('inserting node {} after {}'.format(node.id, predecessor.id))
position = self.index(predecessor) + 1
self.tree.insert(position, node.element)
newnode = Node(self.tree[position])
self.refresh_ids()
####################################################
# set the source product for the new node
if newnode.operator != 'Read':
newnode.source = before
####################################################
# set the source product for the node after the new node
if resetSuccessorSource:
self.__reset_successor_source(newnode.id)
########################################################
elif after and not before:
successor = self[after]
log.debug('inserting node {} before {}'.format(node.id, successor.id))
position = self.index(successor)
self.tree.insert(position, node.element)
newnode = Node(self.tree[position])
####################################################
# set the source product for the new node
if newnode.operator != 'Read':
source = successor.source
newnode.source = source
####################################################
# set the source product for the node after the new node
if resetSuccessorSource:
self[after].source = newnode.id
else:
log.debug('inserting node {}'.format(node.id))
self.tree.insert(len(self.tree) - 1, node.element)
self.refresh_ids()
if not void:
return node
[docs] def nodes(self):
"""
Returns
-------
list
the list of :class:`Node` objects in the workflow
"""
return [Node(x) for x in self.tree.findall('node')]
@property
def operators(self):
"""
Returns
-------
list
the names of the unique operators in the workflow
"""
return sorted(list(set([node.operator for node in self])))
[docs] def refresh_ids(self):
counter = {}
for node in self:
operator = node.operator
if operator not in counter.keys():
counter[operator] = 1
else:
counter[operator] += 1
if counter[operator] > 1:
new = '{} ({})'.format(operator, counter[operator])
else:
new = operator
if node.id != new:
log.debug('renaming node {} to {}'.format(node.id, new))
node.id = new
[docs] def set_par(self, key, value):
"""
set a parameter for all nodes in the workflow
Parameters
----------
key: str
the parameter name
value
Returns
-------
"""
for node in self:
if key in node.parameters.keys():
node.parameters[key] = value2str(value)
@property
def suffix(self):
"""
Returns
-------
str
a file suffix created from the order of which the nodes will be executed
"""
nodes = self.tree.findall('node')
names = [re.sub(r'[ ]*\([0-9]+\)', '', y.attrib['id']) for y in nodes]
names_unique = []
for name in names:
if name not in names_unique:
names_unique.append(name)
suffix = '_'.join(filter(None, [LOOKUP.snap.suffix[x] for x in names_unique]))
return suffix
[docs] def write(self, outfile):
"""
write the workflow to an XML file
Parameters
----------
outfile: str
the name of the file to write
Returns
-------
"""
outfile = outfile if outfile.endswith('.xml') else outfile + '.xml'
with open(outfile, 'w') as out:
out.write(self.__str__())
[docs]class Node(object):
"""
class for handling of SNAP workflow processing nodes
Parameters
----------
element: ~xml.etree.ElementTree.Element
the node XML element
"""
def __init__(self, element):
if not isinstance(element, ET.Element):
raise TypeError('element must be of type xml.etree.ElementTree.Element')
self.element = element
def __repr__(self):
return "pyroSAR Node object '{}'".format(self.id)
def __str__(self):
rough_string = ET.tostring(self.element, 'utf-8')
reparsed = minidom.parseString(rough_string)
return reparsed.toprettyxml(indent='\t', newl='')
def __set_source(self, key, value):
source = self.element.find('.//sources/{}'.format(key))
if source is not None:
source.attrib['refid'] = value
else:
raise RuntimeError('cannot set source on {} node'.format(self.operator))
@property
def id(self):
"""
Returns
-------
str
the node ID
"""
return self.element.attrib['id']
@id.setter
def id(self, value):
self.element.attrib['id'] = value
@property
def operator(self):
"""
Returns
-------
str
the name of the node's processing operator
"""
return self.element.find('.//operator').text
@property
def parameters(self):
"""
Returns
-------
Par
the processing parameters of the node
"""
params = self.element.find('.//parameters')
return Par(params)
@property
def source(self):
"""
Returns
-------
str or list
the ID(s) of the source node(s)
"""
sources = []
elements = self.element.findall('.//sources/')
for element in elements:
sources.append(element.attrib['refid'])
if len(sources) == 0:
return None
elif len(sources) == 1:
return sources[0]
else:
return sources
@source.setter
def source(self, value):
"""
reset the source of the node by ID
Parameters
----------
value: str or list
the ID(s) of the new source node(s)
Returns
-------
Raises
------
RuntimeError
"""
log.debug('setting the source of node {} to {}'.format(self.id, value))
if isinstance(value, str):
if isinstance(self.source, list):
raise TypeError(
'node {} has multiple sources, which must be reset using a list, not str'.format(self.id))
self.__set_source('sourceProduct', value)
elif isinstance(value, list):
key = 'sourceProduct'
for i, item in enumerate(value):
self.__set_source(key, item)
key = 'sourceProduct.{}'.format(i + 1)
[docs]class Par(object):
"""
class for handling processing node parameters
Parameters
----------
element: ~xml.etree.ElementTree.Element
the node parameter XML element
"""
def __init__(self, element):
self.__element = element
def __delitem__(self, key):
par = self.__element.find('.//{}'.format(key))
self.__element.remove(par)
def __getitem__(self, item):
if item not in self.keys():
raise KeyError('key {} does not exist'.format(item))
return self.__element.find('.//{}'.format(item)).text
def __setitem__(self, key, value):
if key not in self.keys():
raise KeyError('key {} does not exist'.format(key))
strval = value2str(value)
self.__element.find('.//{}'.format(key)).text = strval
def __repr__(self):
return str(self.dict())
[docs] def dict(self):
"""
Returns
-------
dict
the parameters as a dictionary
"""
return dict(self.items())
[docs] def items(self):
"""
Returns
-------
list
the parameters as (key, value) as from :meth:`dict.items()`
"""
return list(zip(self.keys(), self.values()))
[docs] def keys(self):
"""
Returns
-------
list
the parameter names as from :meth:`dict.keys()`
"""
return [x.tag for x in self.__element.findall('./')]
[docs] def values(self):
"""
Returns
-------
list
the parameter values as from :meth:`dict.values()`
"""
return [x.text for x in self.__element.findall('./')]
def value2str(value):
"""
format a parameter value to string to be inserted into a workflow
Parameters
----------
value: bool, int, float, list
Returns
-------
str
the string representation of the value
"""
if isinstance(value, bool):
strval = str(value).lower()
elif isinstance(value, list):
strval = ','.join(map(str, value))
elif value is None:
strval = value
else:
strval = str(value)
return strval
[docs]def get_egm96_lookup():
"""
If not found, download SNAP's lookup table for converting EGM96 geoid heights to WGS84 ellipsoid heights
Returns
-------
"""
try:
auxdatapath = ExamineSnap().auxdatapath
except AttributeError:
auxdatapath = os.path.join(os.path.expanduser('~'), '.snap', 'auxdata')
local = os.path.join(auxdatapath, 'dem', 'egm96', 'ww15mgh_b.zip')
os.makedirs(os.path.dirname(local), exist_ok=True)
if not os.path.isfile(local):
remote = 'http://step.esa.int/auxdata/dem/egm96/ww15mgh_b.zip'
print('{} <<-- {}'.format(local, remote))
r = requests.get(remote)
with open(local, 'wb')as out:
out.write(r.content)