Source code for pyroSAR.examine

###############################################################################
# Examination of SAR processing software
# Copyright (c) 2019-2026, the pyroSAR Developers.

# This file is part of the pyroSAR Project. It is subject to the
# license terms in the LICENSE.txt file found in the top-level
# directory of this distribution and at
# https://github.com/johntruckenbrodt/pyroSAR/blob/master/LICENSE.txt.
# No part of the pyroSAR project, including this file, may be
# copied, modified, propagated, or distributed except according
# to the terms contained in the LICENSE.txt file.
###############################################################################
import json
import os
import shutil
import re
import warnings
import platform
import subprocess as sp
import importlib.resources

from pyroSAR.config import ConfigHandler
from spatialist.ancillary import finder, run

import logging

log = logging.getLogger(__name__)

__config__ = ConfigHandler()


[docs] class ExamineSnap(object): """ Class to check if ESA SNAP is installed. Upon initialization, this class searches for relevant binaries and the accompanying relative directory structure, which uniquely identify an ESA SNAP installation on a system. First, all relevant file and folder names are read from the pyroSAR config file if it exists and their existence is verified. If this fails, a system check is performed to find relevant binaries in the system PATH variable and additional files and folders relative to them. In case SNAP is not installed, a default `snap.auxdata.properties` file delivered with pyroSAR will be copied to `$HOME/.snap/etc` so that SNAP download URLS and local directory structure can be adapted by other software. SNAP configuration can be read and modified via the attribute `snap_properties` of type :class:`~pyroSAR.examine.SnapProperties` or the properties :attr:`~pyroSAR.examine.ExamineSnap.userpath` and :attr:`~pyroSAR.examine.ExamineSnap.auxdatapath`. """ _version_dict = None def __init__(self): # update legacy config files if 'OUTPUT' in __config__.sections: __config__.remove_section('OUTPUT') if 'SNAP' in __config__.sections: snap_keys = __config__.keys('SNAP') for key in ['auxdata', 'auxdatapath', 'properties']: if key in snap_keys: __config__.remove_option(section='SNAP', key=key) # define some attributes which identify SNAP self.identifiers = ['path', 'gpt', 'etc'] # a list of relevant sections self.sections = ['SNAP', 'SNAP_SUFFIX'] # set attributes path, gpt, etc, __suffices self.__read_config() # if SNAP could not be identified from the config attributes, do a system search for it # sets attributes path, gpt, etc if not self.__is_identified(): log.debug('identifying SNAP') self.__identify_snap() # if SNAP cannot be identified, copy the snap.auxdata.properties file to $HOME/.snap/etc if not self.__is_identified(): self.etc = os.path.join(os.path.expanduser('~'), '.snap', 'etc') os.makedirs(self.etc, exist_ok=True) dst = os.path.join(self.etc, 'snap.auxdata.properties') if not os.path.isfile(dst): dir_data = importlib.resources.files('pyroSAR') / 'snap' / 'data' src = str(dir_data / 'snap.auxdata.properties') log.debug(f'creating {dst}') shutil.copyfile(src, dst) # if the SNAP suffices attribute was not yet identified, # point it to the default file delivered with pyroSAR if not hasattr(self, '__suffices'): dir_data = importlib.resources.files('pyroSAR') / 'snap' / 'data' fname_suffices = str(dir_data / 'snap.suffices.properties') with open(fname_suffices, 'r') as infile: content = infile.read().split('\n') self.__suffices = {k: v for k, v in [x.split('=') for x in content]} # SNAP property read/modification interface self.snap_properties = SnapProperties(path=os.path.dirname(self.etc)) # update the config file: this scans for config changes and re-writes the config file if any are found self.__update_config() if ExamineSnap._version_dict is None: ExamineSnap._version_dict = self.__read_version_dict() self.version_dict = ExamineSnap._version_dict def __getattr__(self, item): if item in ['path', 'gpt']: msg = ('SNAP could not be identified. If you have installed it ' 'please add the path to the SNAP executables (bin subdirectory) ' 'to the PATH environment. E.g. in the Linux .bashrc file add ' 'the following line:\nexport PATH=$PATH:path/to/snap/bin"') else: msg = "'ExamineSnap' object has no attribute '{}'".format(item) raise AttributeError(msg) def __is_identified(self): """ Check if SNAP has been properly identified, i.e. all paths in `self.identifiers` have been detected and confirmed. Returns ------- bool """ return sum([hasattr(self, x) for x in self.identifiers]) == len(self.identifiers) def __identify_snap(self): """ do a comprehensive search for an ESA SNAP installation Returns ------- bool has the SNAP properties file been changed? """ # create a list of possible SNAP executables defaults = ['snap64.exe', 'snap32.exe', 'snap.exe', 'snap'] paths = os.environ['PATH'].split(os.path.pathsep) options = [os.path.join(path, option) for path in paths for option in defaults] options = [x for x in options if os.path.isfile(x)] if not hasattr(self, 'path') or not os.path.isfile(self.path): executables = options else: executables = [self.path] + options if len(executables) == 0: log.debug("could not detect any potential 'snap' executables") # for each possible SNAP executable, check whether additional files and directories exist relative to it # to confirm whether it actually is an ESA SNAP installation or something else like e.g. the Ubuntu App Manager for path in executables: log.debug('checking candidate {}'.format(path)) if os.path.islink(path): path = os.path.realpath(path) # check whether a directory etc exists relative to the SNAP executable etc = os.path.join(os.path.dirname(os.path.dirname(path)), 'etc') if not os.path.isdir(etc): log.debug("could not find the 'etc' directory") continue # check the content of the etc directory config_files = os.listdir(etc) expected = ['snap.auxdata.properties', 'snap.clusters', 'snap.conf', 'snap.properties'] for name in expected: if name not in config_files: log.debug(f"could not find the '{name}' file") continue # identify the gpt executable gpt_candidates = finder(os.path.dirname(path), ['gpt', 'gpt.exe']) if len(gpt_candidates) == 0: log.debug("could not find the 'gpt' executable") continue else: gpt = gpt_candidates[0] self.path = path self.etc = etc self.gpt = gpt return def __read_config(self): """ This method reads the config.ini to examine the snap paths. If the snap paths are not in the config.ini or the paths are wrong they will be automatically created. Returns ------- """ for attr in self.identifiers: self.__read_config_attr(attr, section='SNAP') suffices = {} if 'SNAP_SUFFIX' in __config__.sections: suffices = __config__['SNAP_SUFFIX'] if len(suffices.keys()) > 0: self.__suffices = suffices def __read_config_attr(self, attr, section): """ read an attribute from the config file and set it as an object attribute Parameters ---------- attr: str the attribute name section: str the config section to read the attribute from Returns ------- """ if section in __config__.sections: if attr in __config__[section].keys(): val = __config__[section][attr] if os.path.exists(val): # log.info('setting attribute {}'.format(attr)) setattr(self, attr, val) def __read_version_dict(self): log.debug('reading SNAP version information') out = {} cmd = [self.path, '--nosplash', '--nogui', '--modules', '--list', '--refresh'] if platform.system() == 'Windows': cmd.extend(['--console', 'suppress']) proc = sp.Popen(args=cmd, stdout=sp.PIPE, stderr=sp.STDOUT, text=True, encoding='utf-8', bufsize=1) counter = 0 lines = [] lines_info = [] for line in proc.stdout: line = line.rstrip() lines.append(line) if line.startswith('---'): counter += 1 else: if counter == 1: lines_info.append(line) if counter == 2: proc.terminate() proc.wait() pattern = r'([a-z.]*)\s+([0-9.]+)\s+(.*)' for line in lines_info: code, version, state = re.search(pattern=pattern, string=line).groups() out[code] = {'version': version, 'state': state} if len(out) == 0: snap_msg = "\n".join(lines) raise RuntimeError(f'{snap_msg}\ncould not ' f'read SNAP version information') return out def __update_config(self): for section in self.sections: if section not in __config__.sections: # log.info('creating section {}..'.format(section)) __config__.add_section(section) for key in self.identifiers: if hasattr(self, key): self.__update_config_attr(key, getattr(self, key), 'SNAP') for key in sorted(self.__suffices.keys()): self.__update_config_attr(key, self.__suffices[key], 'SNAP_SUFFIX') @staticmethod def __update_config_attr(attr, value, section): if isinstance(value, list): value = json.dumps(value) if attr not in __config__[section].keys() or __config__[section][attr] != value: # log.info('updating attribute {0}:{1}..'.format(section, attr)) # log.info(' {0} -> {1}'.format(repr(config[section][attr]), repr(value))) __config__.set(section, key=attr, value=value, overwrite=True)
[docs] def get_suffix(self, operator): """ get the file name suffix for an operator Parameters ---------- operator: str the name of the operator Returns ------- str or None the file suffix or None if unknown Examples -------- >>> from pyroSAR.examine import ExamineSnap >>> config = ExamineSnap() >>> print(config.get_suffix('Terrain-Flattening')) 'TF' """ if operator in self.__suffices.keys(): return self.__suffices[operator] else: return None
[docs] def get_version(self, module: str) -> str: """ Read the version and date of different SNAP modules. The following SNAP command is called to get the information: .. code-block:: bash snap --nosplash --nogui --modules --list --refresh --console suppress Parameters ---------- module: one of the following - core - desktop - rstb - opttbx - microwavetbx Returns ------- the version number """ log.debug(f"reading version information for module '{module}'") patterns = {'core': 'org.esa.snap.snap.core', 'desktop': 'org.esa.snap.snap.ui', 'rstb': 'org.csa.rstb.rstb.kit', 'opttbx': 'eu.esa.opt.opttbx.kit', 'microwavetbx': 'eu.esa.microwavetbx.microwavetbx.kit'} if module not in patterns.keys(): raise ValueError(f"'{module}' is not a valid module name. " f"Supported options: {patterns.keys()}") for k, v in self.version_dict.items(): if patterns[module] == k: if v['state'] == 'Available': raise RuntimeError(f'{module} is not installed') log.debug(f'version is {v["version"]}') return v['version'] raise RuntimeError(f"Could not find version " f"information for module '{module}'.")
@property def auxdatapath(self): """ Get/set the SNAP configuration for `AuxDataPath` in `snap.auxdata.properties`. Example ------- >>> from pyroSAR.examine import ExamineSnap >>> config = ExamineSnap() >>> config.auxdatapath = '/path/to/snap/auxdata' # This is equivalent to >>> config.snap_properties['AuxDataPath'] = '/path/to/snap/auxdata' """ out = self.snap_properties['AuxDataPath'] if out is None: out = os.path.join(self.userpath, 'auxdata') return out @auxdatapath.setter def auxdatapath(self, value): self.snap_properties['AuxDataPath'] = value @property def userpath(self): """ Get/set the SNAP configuration for `snap.userdir` in `snap.properties`. Example ------- >>> from pyroSAR.examine import ExamineSnap >>> config = ExamineSnap() >>> config.userpath = '/path/to/snap/data' # This is equivalent to >>> config.snap_properties['snap.userdir'] = '/path/to/snap/data' """ return self.snap_properties.userpath @userpath.setter def userpath(self, value): self.snap_properties.userpath = value
[docs] class ExamineGamma(object): """ Class to check if GAMMA is installed. Examples -------- >>> from pyroSAR.examine import ExamineGamma >>> config = ExamineGamma() >>> print(config.home) >>> print(config.version) """ def __init__(self): home_sys = os.environ.get('GAMMA_HOME') if home_sys is not None and not os.path.isdir(home_sys): warnings.warn('found GAMMA_HOME environment variable, but directory does not exist') home_sys = None self.__read_config() if hasattr(self, 'home'): if home_sys is not None and self.home != home_sys: log.info('the value of GAMMA_HOME is different to that in the pyroSAR configuration;\n' ' was: {}\n' ' is : {}\n' 'resetting the configuration and deleting parsed modules' .format(self.home, home_sys)) parsed = os.path.join(os.path.dirname(self.fname), 'gammaparse') shutil.rmtree(parsed) self.home = home_sys if not hasattr(self, 'home'): if home_sys is not None: setattr(self, 'home', home_sys) else: raise RuntimeError('could not read GAMMA installation directory') self.version = re.search('GAMMA_SOFTWARE[-/](?P<version>[0-9]{8})', getattr(self, 'home')).group('version') try: out, err = run(['which', 'gdal-config'], void=False) gdal_config = out.strip('\n') self.gdal_config = gdal_config except sp.CalledProcessError: raise RuntimeError('could not find command gdal-config.') self.__update_config() def __read_config(self): self.fname = __config__.file if 'GAMMA' in __config__.sections: attr = __config__['GAMMA'] for key, value in attr.items(): setattr(self, key, value) def __update_config(self): if 'GAMMA' not in __config__.sections: __config__.add_section('GAMMA') for attr in ['home', 'version']: self.__update_config_attr(attr, getattr(self, attr), 'GAMMA') @staticmethod def __update_config_attr(attr, value, section): if isinstance(value, list): value = json.dumps(value) if attr not in __config__[section].keys() or __config__[section][attr] != value: __config__.set(section, key=attr, value=value, overwrite=True)
[docs] class SnapProperties(object): """ SNAP configuration interface. This class enables reading and modifying SNAP configuration in properties files. Modified properties are directly written to the files. Currently, the files `snap.properties`, `snap.auxdata.properties` and `snap.conf` are supported. These files can be found in two locations: - `<SNAP installation directory>/etc` - `<user directory>/.snap/etc` Configuration in the latter has higher priority, and modified properties will always be written there so that the installation directory is not modified. Parameters ---------- path: str SNAP installation directory path Examples -------- >>> from pyroSAR.examine import ExamineSnap, SnapProperties >>> path = ExamineSnap().path >>> config = SnapProperties(path=path) >>> config['snap.userdir'] = '/path/to/snap/auxdata' """ def __init__(self, path): self.pattern = r'^(?P<comment>#?)(?P<key>[\w\.]*)[ ]*=[ ]*"?(?P<value>[^"\n]*)"?\n*' self.pattern_key_replace = r'#?{}[ ]*=[ ]*(?P<value>.*)' self.properties_path = os.path.join(path, 'etc', 'snap.properties') log.debug(f"reading {self.properties_path}") self.properties = self._to_dict(self.properties_path) self.properties.update(self._to_dict(self.userpath_properties)) self.auxdata_properties_path = os.path.join(path, 'etc', 'snap.auxdata.properties') log.debug(f"reading {self.auxdata_properties_path}") self.auxdata_properties = self._to_dict(self.auxdata_properties_path) self.auxdata_properties.update(self._to_dict(self.userpath_auxdata_properties)) self.conf_path = os.path.join(path, 'etc', 'snap.conf') log.debug(f"reading {self.conf_path}") str_split = {'default_options': ' '} self.conf = self._to_dict(path=self.conf_path, str_split=str_split) self.conf.update(self._to_dict(self.userpath_conf, str_split=str_split)) self._dicts = [self.properties, self.auxdata_properties, self.conf] # removing this because of # "RuntimeError: OpenJDK 64-Bit Server VM warning: Options # -Xverify:none and -noverify were deprecated in JDK 13 and will # likely be removed in a future release." if '-J-Xverify:none' in self.conf['default_options']: opts = self.conf['default_options'].copy() opts.remove('-J-Xverify:none') self['default_options'] = opts # some properties need to be read from the default user path to # be visible to SNAP pairs = [(self.userpath_properties, self.properties_path), (self.userpath_auxdata_properties, self.auxdata_properties_path)] for default, defined in pairs: if default != defined: conf = self._to_dict(default) if len(conf.keys()) > 0: log.debug(f"updating keys {list(conf.keys())} from {default}") self.properties.update(conf) def __getitem__( self, key: str ) -> int | float | str | list[str]: for section in self._dicts: if key in section: return section[key].copy() \ if hasattr(section[key], 'copy') \ else section[key] raise KeyError(f'could not find key {key}') def __setitem__( self, key: str, value: int | float | str | list[str] | None ) -> None: if not (isinstance(value, (int, float, str, list)) or value is None): raise TypeError(f'invalid type for key {key}: {type(value)}') if value == self[key] and isinstance(value, type(self[key])): return if key in self.properties: self.properties[key] = value elif key in self.auxdata_properties: self.auxdata_properties[key] = value else: self.conf[key] = value if value is not None: if isinstance(value, list): value = ' '.join(value) value = str(value).encode('unicode-escape').decode() value = value.replace(':', '\\:') if key in self.properties: path = self.userpath_properties elif key in self.auxdata_properties: path = self.userpath_auxdata_properties elif key in self.conf: path = self.userpath_conf else: raise KeyError(f'unknown key {key}') if os.path.isfile(path): with open(path, 'r') as f: content = f.read() else: content = '' pattern = self.pattern_key_replace.format(key) match = re.search(pattern, content) if match: repl = f'#{key} =' if value is None else f'{key} = {value}' content = content.replace(match.group(), repl) else: content += f'\n{key} = {value}' os.makedirs(os.path.dirname(path), exist_ok=True) log.debug(f"writing key '{key}' to '{path}'") with open(path, 'w') as f: f.write(content) def _to_dict( self, path: str, str_split: dict[str, str] | None=None ) -> dict[str, int | float | str | None | list[str]]: """ Read a properties file into a dictionary. Converts values into basic python types Parameters ---------- path: the path to the properties file str_split: a dictionary with properties as keys and splitting characters as values to split a string into a list of strings Returns ------- the dictionary with the properties """ out = {} if os.path.isfile(path): with open(path, 'r') as f: for line in f: if re.search(self.pattern, line): match = re.match(re.compile(self.pattern), line) comment, key, value = match.groups() if comment == '': if str_split is not None and key in str_split.keys(): value = value.split(str_split[key]) else: value = self._string_convert(value) out[key] = value else: out[key] = None return out @staticmethod def _string_convert(string): if string.lower() == 'none': return None elif string.lower() == 'true': return True elif string.lower() == 'false': return False else: try: return int(string) except ValueError: try: return float(string) except ValueError: return string.replace('\\:', ':').replace('\\\\', '\\')
[docs] def keys(self): """ Returns ------- list[str] all known SNAP property keys """ keys = [] for item in self._dicts: keys.extend(list(item.keys())) return sorted(keys)
@property def userpath(self): key = 'snap.userdir' if key not in self.keys() or self[key] is None: return os.path.join(os.path.expanduser('~'), '.snap') else: return self[key] @userpath.setter def userpath(self, value): self['snap.userdir'] = value @property def userpath_auxdata_properties(self): return os.path.join(os.path.expanduser('~'), '.snap', 'etc', 'snap.auxdata.properties') @property def userpath_properties(self): return os.path.join(os.path.expanduser('~'), '.snap', 'etc', 'snap.properties') @property def userpath_conf(self): return os.path.join(os.path.expanduser('~'), '.snap', 'etc', 'snap.conf')