Source code for pyroSAR.examine

###############################################################################
# Examination of SAR processing software
# Copyright (c) 2019-2024, the pyroSAR Developers.

# This file is part of the pyroSAR Project. It is subject to the
# license terms in the LICENSE.txt file found in the top-level
# directory of this distribution and at
# https://github.com/johntruckenbrodt/pyroSAR/blob/master/LICENSE.txt.
# No part of the pyroSAR project, including this file, may be
# copied, modified, propagated, or distributed except according
# to the terms contained in the LICENSE.txt file.
###############################################################################
import json
import os
import shutil
import platform
import re
import warnings
import subprocess as sp
import importlib.resources

from pyroSAR.config import ConfigHandler
from spatialist.ancillary import finder, run

import logging

log = logging.getLogger(__name__)

__config__ = ConfigHandler()


[docs]class ExamineSnap(object): """ Class to check if ESA SNAP is installed. Upon initialization, this class searches for relevant binaries and the accompanying relative directory structure, which uniquely identify an ESA SNAP installation on a system. First, all relevant file and folder names are read from the pyroSAR config file if it exists and their existence is verified. If this fails, a system check is performed to find relevant binaries in the system PATH variable and additional files and folders relative to them. In case SNAP is not installed, a default `snap.auxdata.properties` file delivered with pyroSAR will be copied to `$HOME/.snap/etc` so that SNAP download URLS and local directory structure can be adapted by other software. SNAP configuration can be read and modified via the attribute `snap_properties` of type :class:`~pyroSAR.examine.SnapProperties` or the properties :attr:`~pyroSAR.examine.ExamineSnap.userpath` and :attr:`~pyroSAR.examine.ExamineSnap.auxdatapath`. """ def __init__(self): # update legacy config files if 'OUTPUT' in __config__.sections: __config__.remove_section('OUTPUT') if 'SNAP' in __config__.sections: snap_keys = __config__.keys('SNAP') for key in ['auxdata', 'auxdatapath', 'properties']: if key in snap_keys: __config__.remove_option(section='SNAP', key=key) # define some attributes which identify SNAP self.identifiers = ['path', 'gpt', 'etc'] # a list of relevant sections self.sections = ['SNAP', 'SNAP_SUFFIX'] # set attributes path, gpt, etc, __suffices self.__read_config() # if SNAP could not be identified from the config attributes, do a system search for it # sets attributes path, gpt, etc if not self.__is_identified(): log.debug('identifying SNAP') self.__identify_snap() # if SNAP cannot be identified, copy the snap.auxdata.properties file to $HOME/.snap/etc if not self.__is_identified(): self.etc = os.path.join(os.path.expanduser('~'), '.snap', 'etc') os.makedirs(self.etc, exist_ok=True) dst = os.path.join(self.etc, 'snap.auxdata.properties') if not os.path.isfile(dst): dir_data = importlib.resources.files('pyroSAR') / 'snap' / 'data' src = str(dir_data / 'snap.auxdata.properties') log.debug(f'creating {dst}') shutil.copyfile(src, dst) # if the SNAP suffices attribute was not yet identified, # point it to the default file delivered with pyroSAR if not hasattr(self, '__suffices'): dir_data = importlib.resources.files('pyroSAR') / 'snap' / 'data' fname_suffices = str(dir_data / 'snap.suffices.properties') with open(fname_suffices, 'r') as infile: content = infile.read().split('\n') self.__suffices = {k: v for k, v in [x.split('=') for x in content]} # SNAP property read/modification interface self.snap_properties = SnapProperties(path=os.path.dirname(self.etc)) # update the config file: this scans for config changes and re-writes the config file if any are found self.__update_config() def __getattr__(self, item): if item in ['path', 'gpt']: msg = ('SNAP could not be identified. If you have installed it ' 'please add the path to the SNAP executables (bin subdirectory) ' 'to the PATH environment. E.g. in the Linux .bashrc file add ' 'the following line:\nexport PATH=$PATH:path/to/snap/bin"') else: msg = "'ExamineSnap' object has no attribute '{}'".format(item) raise AttributeError(msg) def __is_identified(self): """ Check if SNAP has been properly identified, i.e. all paths in `self.identifiers` have been detected and confirmed. Returns ------- bool """ return sum([hasattr(self, x) for x in self.identifiers]) == len(self.identifiers) def __identify_snap(self): """ do a comprehensive search for an ESA SNAP installation Returns ------- bool has the SNAP properties file been changed? """ # create a list of possible SNAP executables defaults = ['snap64.exe', 'snap32.exe', 'snap.exe', 'snap'] paths = os.environ['PATH'].split(os.path.pathsep) options = [os.path.join(path, option) for path in paths for option in defaults] options = [x for x in options if os.path.isfile(x)] if not hasattr(self, 'path') or not os.path.isfile(self.path): executables = options else: executables = [self.path] + options if len(executables) == 0: log.debug("could not detect any potential 'snap' executables") # for each possible SNAP executable, check whether additional files and directories exist relative to it # to confirm whether it actually is an ESA SNAP installation or something else like e.g. the Ubuntu App Manager for path in executables: log.debug('checking candidate {}'.format(path)) if os.path.islink(path): path = os.path.realpath(path) # check whether a directory etc exists relative to the SNAP executable etc = os.path.join(os.path.dirname(os.path.dirname(path)), 'etc') if not os.path.isdir(etc): log.debug("could not find the 'etc' directory") continue # check the content of the etc directory config_files = os.listdir(etc) expected = ['snap.auxdata.properties', 'snap.clusters', 'snap.conf', 'snap.properties'] for name in expected: if name not in config_files: log.debug(f"could not find the '{name}' file") continue # identify the gpt executable gpt_candidates = finder(os.path.dirname(path), ['gpt', 'gpt.exe']) if len(gpt_candidates) == 0: log.debug("could not find the 'gpt' executable") continue else: gpt = gpt_candidates[0] self.path = path self.etc = etc self.gpt = gpt return def __read_config(self): """ This method reads the config.ini to examine the snap paths. If the snap paths are not in the config.ini or the paths are wrong they will be automatically created. Returns ------- """ for attr in self.identifiers: self.__read_config_attr(attr, section='SNAP') suffices = {} if 'SNAP_SUFFIX' in __config__.sections: suffices = __config__['SNAP_SUFFIX'] if len(suffices.keys()) > 0: self.__suffices = suffices def __read_config_attr(self, attr, section): """ read an attribute from the config file and set it as an object attribute Parameters ---------- attr: str the attribute name section: str the config section to read the attribute from Returns ------- """ if section in __config__.sections: if attr in __config__[section].keys(): val = __config__[section][attr] if os.path.exists(val): # log.info('setting attribute {}'.format(attr)) setattr(self, attr, val) def __update_config(self): for section in self.sections: if section not in __config__.sections: # log.info('creating section {}..'.format(section)) __config__.add_section(section) for key in self.identifiers: if hasattr(self, key): self.__update_config_attr(key, getattr(self, key), 'SNAP') for key in sorted(self.__suffices.keys()): self.__update_config_attr(key, self.__suffices[key], 'SNAP_SUFFIX') @staticmethod def __update_config_attr(attr, value, section): if isinstance(value, list): value = json.dumps(value) if attr not in __config__[section].keys() or __config__[section][attr] != value: # log.info('updating attribute {0}:{1}..'.format(section, attr)) # log.info(' {0} -> {1}'.format(repr(config[section][attr]), repr(value))) __config__.set(section, key=attr, value=value, overwrite=True)
[docs] def get_suffix(self, operator): """ get the file name suffix for an operator Parameters ---------- operator: str the name of the operator Returns ------- str or None the file suffix or None if unknown Examples -------- >>> from pyroSAR.examine import ExamineSnap >>> config = ExamineSnap() >>> print(config.get_suffix('Terrain-Flattening')) 'TF' """ if operator in self.__suffices.keys(): return self.__suffices[operator] else: return None
[docs] def get_version(self, module): """ Read the version and date of different SNAP modules. This scans a file 'messages.log', which is re-written every time SNAP is started. Parameters ---------- module: str one of the following - core - desktop - rstb - opttbx - microwavetbx Returns ------- dict a dictionary with keys 'version' and 'date' """ # base search patterns for finding the right lines patterns = {'core': r'org\.esa\.snap\.snap\.core', 'desktop': r'org\.esa\.snap\.snap\.ui', 'rstb': r'org\.csa\.rstb\.rstb\.kit', 'opttbx': r'eu\.esa\.opt\.opttbx\.kit', 'microwavetbx': r'eu\.esa\.microwavetbx\.microwavetbx\.kit'} if module in patterns.keys(): pattern = patterns[module] pattern += r' \[(?P<version>[0-9.]+) [0-9.]+ (?P<date>[0-9]{12})' else: raise RuntimeError('module not supported') system = platform.system() if system in ['Linux', 'Darwin']: path = os.path.join(os.path.expanduser('~'), '.snap', 'system') elif system == 'Windows': path = os.path.join(os.environ['APPDATA'], 'SNAP') else: raise RuntimeError('operating system not supported') conda_env_path = os.environ.get('CONDA_PREFIX') if conda_env_path is not None and conda_env_path in self.gpt: fname = os.path.join(conda_env_path, 'snap', '.snap', 'system', 'var', 'log', 'messages.log') else: fname = os.path.join(path, 'var', 'log', 'messages.log') if not os.path.isfile(fname): try: # This will start SNAP and immediately stop it because of the invalid argument. # Currently, this seems to be the only way to create the messages.log file if it does not exist. sp.check_call([self.path, '--nosplash', '--dummytest', '--console', 'suppress']) except sp.CalledProcessError: pass if not os.path.isfile(fname): raise RuntimeError("cannot find 'messages.log' to read SNAP module versions from.") with open(fname, 'r') as m: content = m.read() match = re.search(pattern, content) if match is None: raise RuntimeError('cannot read version information from {}.\nPlease restart SNAP.'.format(fname)) return match.groupdict()
@property def auxdatapath(self): """ Get/set the SNAP configuration for `AuxDataPath` in `snap.auxdata.properties`. Example ------- >>> from pyroSAR.examine import ExamineSnap >>> config = ExamineSnap() >>> config.auxdatapath = '/path/to/snap/auxdata' # This is equivalent to >>> config.snap_properties['AuxDataPath'] = '/path/to/snap/auxdata' """ out = self.snap_properties['AuxDataPath'] if out is None: out = os.path.join(self.userpath, 'auxdata') return out @auxdatapath.setter def auxdatapath(self, value): self.snap_properties['AuxDataPath'] = value @property def userpath(self): """ Get/set the SNAP configuration for `snap.userdir` in `snap.properties`. Example ------- >>> from pyroSAR.examine import ExamineSnap >>> config = ExamineSnap() >>> config.userpath = '/path/to/snap/data' # This is equivalent to >>> config.snap_properties['snap.userdir'] = '/path/to/snap/data' """ return self.snap_properties.userpath @userpath.setter def userpath(self, value): self.snap_properties.userpath = value
[docs]class ExamineGamma(object): """ Class to check if GAMMA is installed. Examples -------- >>> from pyroSAR.examine import ExamineGamma >>> config = ExamineGamma() >>> print(config.home) >>> print(config.version) """ def __init__(self): home_sys = os.environ.get('GAMMA_HOME') if home_sys is not None and not os.path.isdir(home_sys): warnings.warn('found GAMMA_HOME environment variable, but directory does not exist') home_sys = None self.__read_config() if hasattr(self, 'home'): if home_sys is not None and self.home != home_sys: log.info('the value of GAMMA_HOME is different to that in the pyroSAR configuration;\n' ' was: {}\n' ' is : {}\n' 'resetting the configuration and deleting parsed modules' .format(self.home, home_sys)) parsed = os.path.join(os.path.dirname(self.fname), 'gammaparse') shutil.rmtree(parsed) self.home = home_sys if not hasattr(self, 'home'): if home_sys is not None: setattr(self, 'home', home_sys) else: raise RuntimeError('could not read GAMMA installation directory') self.version = re.search('GAMMA_SOFTWARE[-/](?P<version>[0-9]{8})', getattr(self, 'home')).group('version') try: out, err = run(['which', 'gdal-config'], void=False) gdal_config = out.strip('\n') self.gdal_config = gdal_config except sp.CalledProcessError: raise RuntimeError('could not find command gdal-config.') self.__update_config() def __read_config(self): self.fname = __config__.file if 'GAMMA' in __config__.sections: attr = __config__['GAMMA'] for key, value in attr.items(): setattr(self, key, value) def __update_config(self): if 'GAMMA' not in __config__.sections: __config__.add_section('GAMMA') for attr in ['home', 'version']: self.__update_config_attr(attr, getattr(self, attr), 'GAMMA') @staticmethod def __update_config_attr(attr, value, section): if isinstance(value, list): value = json.dumps(value) if attr not in __config__[section].keys() or __config__[section][attr] != value: __config__.set(section, key=attr, value=value, overwrite=True)
[docs]class SnapProperties(object): """ SNAP configuration interface. This class enables reading and modifying SNAP configuration in properties files. Modified properties are directly written to the files. Currently, the files `snap.properties` and `snap.auxdata.properties` are supported. These files can be found in two locations: - `<SNAP installation directory>/etc` - `<user directory>/.snap/etc` Configuration in the latter has higher priority and modified properties will always be written there so that the installation directory is not modified. Parameters ---------- path: str SNAP installation directory path Examples -------- >>> from pyroSAR.examine import SnapProperties >>> config = SnapProperties() >>> config['snap.userdir'] = '/path/to/snap/auxdata' """ def __init__(self, path): self.pattern = r'^(?P<comment>#?)(?P<key>[\w\.]*)[ ]*=[ ]*(?P<value>.*)\n*' self.properties_path = os.path.join(path, 'etc', 'snap.properties') self.auxdata_properties_path = os.path.join(path, 'etc', 'snap.auxdata.properties') log.debug(f"reading {self.properties_path}") self.properties = self._to_dict(self.properties_path) self.auxdata_properties = self._to_dict(self.auxdata_properties_path) self._dicts = [self.properties, self.auxdata_properties] # some properties need to be read from the default user path to # be visible to SNAP pairs = [(self.userpath_properties, self.properties_path), (self.userpath_auxdata_properties, self.auxdata_properties_path)] for default, defined in pairs: if default != defined: conf = self._to_dict(default) if len(conf.keys()) > 0: log.debug(f"updating keys {list(conf.keys())} from {default}") self.properties.update(conf)
[docs] def __getitem__(self, key): """ Parameters ---------- key: str Returns ------- """ for section in self._dicts: if key in section: return section[key] raise KeyError(f'could not find key {key}')
[docs] def __setitem__(self, key, value): """ Parameters ---------- key: str value: Any Returns ------- """ if value == self[key] and isinstance(value, type(self[key])): return if key in self.properties: self.properties[key] = value else: self.auxdata_properties[key] = value if value is not None: value = str(value).encode('unicode-escape').decode() if key in ['snap.home', 'snap.userdir']: path = os.path.join(os.path.expanduser('~'), '.snap', 'etc', 'snap.properties') elif key in self.properties: path = self.userpath_properties elif key in self.auxdata_properties: path = self.userpath_auxdata_properties else: raise KeyError(f'unknown key {key}') if os.path.isfile(path): with open(path, 'r') as f: content = f.read() else: content = '' pattern = r'#?{}[ ]*=[ ]*(?P<value>.*)'.format(key) match = re.search(pattern, content) if match: repl = f'#{key} =' if value is None else f'{key} = {value}' content = content.replace(match.group(), repl) else: content += f'\n{key} = {value}' os.makedirs(os.path.dirname(path), exist_ok=True) log.debug(f"writing key '{key}' with value '{value}' to '{path}'") with open(path, 'w') as f: f.write(content)
def _to_dict(self, path): """ Parameters ---------- path: str Returns ------- dict """ out = {} if os.path.isfile(path): with open(path, 'r') as f: for line in f: if re.search(self.pattern, line): match = re.match(re.compile(self.pattern), line) comment, key, value = match.groups() value = self._string_convert(value) out[key] = value if comment == '' else None return out @staticmethod def _string_convert(string): if string.lower() == 'none': return None elif string.lower() == 'true': return True elif string.lower() == 'false': return False else: try: return int(string) except ValueError: try: return float(string) except ValueError: return string.replace('\\\\', '\\')
[docs] def keys(self): """ Returns ------- list[str] all known SNAP property keys """ keys = [] for item in self._dicts: keys.extend(list(item.keys())) return sorted(keys)
@property def userpath(self): key = 'snap.userdir' if key not in self.keys() or self[key] is None: return os.path.join(os.path.expanduser('~'), '.snap') else: return self[key] @userpath.setter def userpath(self, value): self['snap.userdir'] = value @property def userpath_auxdata_properties(self): return os.path.join(os.path.expanduser('~'), '.snap', 'etc', 'snap.auxdata.properties') @property def userpath_properties(self): return os.path.join(os.path.expanduser('~'), '.snap', 'etc', 'snap.properties')