Source code for pyroSAR.ancillary

###############################################################################
# ancillary routines for software pyroSAR

# Copyright (c) 2014-2024, the pyroSAR Developers.

# This file is part of the pyroSAR Project. It is subject to the
# license terms in the LICENSE.txt file found in the top-level
# directory of this distribution and at
# https://github.com/johntruckenbrodt/pyroSAR/blob/master/LICENSE.txt.
# No part of the pyroSAR project, including this file, may be
# copied, modified, propagated, or distributed except according
# to the terms contained in the LICENSE.txt file.
###############################################################################
"""
This module gathers central functions and classes for general pyroSAR applications.
"""
import os
import re
import time
import uuid
from pathlib import Path
from math import sin, radians
import inspect
from datetime import datetime
from . import patterns
from spatialist.ancillary import finder
import logging

log = logging.getLogger(__name__)


[docs]def groupby(images, attribute): """ group a list of images by a metadata attribute Parameters ---------- images: list[str] the names of the images to be sorted attribute: str the name of the attribute used for sorting; see :func:`parse_datasetname` for options Returns ------- list[list[str]] a list of sub-lists containing the grouped images """ images_sort = sorted(images, key=lambda x: re.search(patterns.pyrosar, x).group(attribute)) out_meta = [[parse_datasetname(images_sort.pop(0))]] while len(images_sort) > 0: filename = images_sort.pop(0) meta = parse_datasetname(filename) if out_meta[-1][0][attribute] == meta[attribute]: out_meta[-1].append(meta) else: out_meta.append([meta]) out = [[x['filename'] for x in y] for y in out_meta] return out
[docs]def groupbyTime(images, function, time): """ function to group images by their acquisition time difference Parameters ---------- images: list[str] a list of image names function: function a function to derive the time from the image names; see e.g. :func:`seconds` time: int or float a time difference in seconds by which to group the images Returns ------- list[list[str]] a list of sub-lists containing the grouped images """ # sort images by time stamp srcfiles = sorted(images, key=function) groups = [[srcfiles[0]]] group = groups[0] for i in range(1, len(srcfiles)): item = srcfiles[i] timediff = abs(function(item) - function(group[-1])) if timediff <= time: group.append(item) else: groups.append([item]) group = groups[-1] return [x[0] if len(x) == 1 else x for x in groups]
[docs]def multilook_factors(source_rg, source_az, target, geometry, incidence): """ compute multi-looking factors to approximate a square pixel with defined target ground range pixel spacing. Parameters ---------- source_rg: int or float the range pixel spacing source_az: int or float the azimuth pixel spacing target: int or float the target pixel spacing of an approximately square pixel geometry: str the imaging geometry; either 'SLANT_RANGE' or 'GROUND_RANGE' incidence: int or float the angle of incidence Returns ------- tuple[int] the multi-looking factors as (range looks, azimuth looks) Examples -------- >>> from pyroSAR.ancillary import multilook_factors >>> rlks, azlks = multilook_factors(source_rg=2, source_az=13, target=10, >>> geometry='SLANT_RANGE', incidence=39) >>> print(rlks, azlks) 4 1 """ azlks = int(round(float(target) / source_az)) azlks = azlks if azlks > 0 else 1 if geometry == 'SLANT_RANGE': rlks = float(azlks) * source_az * sin(radians(incidence)) / source_rg elif geometry == 'GROUND_RANGE': rlks = float(azlks) * source_az / source_rg else: raise ValueError("parameter 'geometry' must be either 'SLANT_RANGE' or 'GROUND_RANGE'") rlks = int(round(rlks)) return rlks, azlks
[docs]def seconds(filename): """ function to extract time in seconds from a file name. the format must follow a fixed pattern: YYYYmmddTHHMMSS Images processed with pyroSAR functionalities via module snap or gamma will contain this information. Parameters ---------- filename: str the name of a file from which to extract the time from Returns ------- float the difference between the time stamp in filename and Jan 01 1900 in seconds """ # return mktime(strptime(re.findall('[0-9T]{15}', filename)[0], '%Y%m%dT%H%M%S')) td = datetime.strptime(re.findall('[0-9T]{15}', filename)[0], '%Y%m%dT%H%M%S') - datetime(1900, 1, 1) return td.total_seconds()
[docs]def parse_datasetname(name, parse_date=False): """ Parse the name of a pyroSAR processing product and extract its metadata components as dictionary Parameters ---------- name: str the name of the file to be parsed parse_date: bool parse the start date to a :class:`~datetime.datetime` object or just return the string? Returns ------- dict the metadata attributes Examples -------- >>> meta = parse_datasetname('S1A__IW___A_20150309T173017_VV_grd_mli_geo_norm_db.tif') >>> print(sorted(meta.keys())) ['acquisition_mode', 'extensions', 'filename', 'orbit', 'outname_base', 'polarization', 'proc_steps', 'sensor', 'start'] """ filename = os.path.abspath(name) if os.path.isfile(name) else name match = re.match(re.compile(patterns.pyrosar), filename) if not match: return out = match.groupdict() if out['extensions'] == '': out['extensions'] = None if out['proc_steps'] is not None: out['proc_steps'] = out['proc_steps'].split('_') if parse_date: out['start'] = datetime.strptime(out['start'], '%Y%m%dT%H%M%S') out['filename'] = filename out['outname_base'] = out['outname_base'].strip('_') return out
[docs]def find_datasets(directory, recursive=False, **kwargs): """ find pyroSAR datasets in a directory based on their metadata Parameters ---------- directory: str the name of the directory to be searched recursive: bool search the directory recursively into subdirectories? kwargs: Metadata attributes for filtering the scene list supplied as `key=value`. e.g. `sensor='S1A'`. Multiple allowed options can be provided in tuples, e.g. `sensor=('S1A', 'S1B')`. Any types other than tuples require an exact match, e.g. `proc_steps=['grd', 'mli', 'geo', 'norm', 'db']` will be matched only if these processing steps are contained in the product name in this exact order. The special attributes `start` and `stop` can be used for time filtering where `start<=value<=stop`. See function :func:`parse_datasetname` for further options. Returns ------- list of str the file names found in the directory and filtered by metadata attributes Examples -------- >>> selection = find_datasets('path/to/files', sensor=('S1A', 'S1B'), polarization='VV') """ files = finder(directory, [patterns.pyrosar], regex=True, recursive=recursive) selection = [] for file in files: meta = parse_datasetname(file) matches = [] for key, val in kwargs.items(): if key == 'start': match = val <= meta['start'] elif key == 'stop': match = val >= meta['start'] # only the start time stamp is contained in the filename elif isinstance(val, tuple): match = meta[key] in val else: match = meta[key] == val matches.append(match) if all(matches): selection.append(file) return selection
[docs]def getargs(func): """ get the arguments of a function Parameters ---------- func: function the function to be checked Returns ------- list or str the argument names """ return sorted(inspect.getfullargspec(func).args)
[docs]def hasarg(func, arg): """ simple check whether a function takes a parameter as input Parameters ---------- func: function the function to be checked arg: str the argument name to be found Returns ------- bool does the function take this as argument? """ return arg in getargs(func)
[docs]def windows_fileprefix(func, path, exc_info): """ Helper function for :func:`shutil.rmtree` to exceed Windows' file name length limit of 256 characters. See `here <https://stackoverflow.com/questions/36219317/pathname-too-long-to-open>`_ for details. Parameters ---------- func: function the function to be executed, i.e. :func:`shutil.rmtree` path: str the path to be deleted exc_info: tuple execution info as returned by :func:`sys.exc_info` Returns ------- Examples -------- >>> import shutil >>> from pyroSAR.ancillary import windows_fileprefix >>> shutil.rmtree('/path', onerror=windows_fileprefix) """ func(u'\\\\?\\' + path)
[docs]class Lock(object): """ File and folder locking mechanism. This mechanism creates lock files indicating whether a file/folder 1. is being modified (`target`.lock), 2. is being used/read (`target`.used_<uuid.uuid4>) or 3. was damaged during modification (`target`.error). Although these files will not prevent locking by other mechanisms (UNIX locks are generally only advisory), this mechanism is respected across any running instances. I.e., if such a lock file exists, no process trying to acquire a lock using this class will succeed if a lock file intending to prevent it exists. This was implemented because other existing solutions like `filelock <https://github.com/tox-dev/filelock>`_ or `fcntl <https://docs.python.org/3/library/fcntl.html>`_ do not implement effective solutions for parallel jobs in HPC systems. Hard locks prevent any usage of the data. Damage/error locks work like hard locks except that `timeout` is ignored and a `RuntimeError` is raised immediately. Error locks are created if an error occurs whilst a hard lock is acquired and `target` exists (by renaming the hard lock file). Infinite usage locks may exist, each with a different random UUID. No hard lock may be acquired whilst usage locks exist. On error usage locks are simply deleted. The class supports nested locks. One function might lock a file and another function called inside it will reuse this lock if it tries to lock the file. It may happen that lock files remain when a process is killed by HPC schedulers like Slurm because in this case the process is not ended by Python. Optimally, hard locks should be renamed to error lock files and usage lock files should be deleted. This has to be done separately. Examples -------- >>> from pyroSAR.ancillary import Lock >>> target = 'test.txt' >>> with Lock(target=target): >>> with open(target, 'w') as f: >>> f.write('Hello World!') >>> with Lock(target=target): # initialize lock >>> with Lock(target=target): # reuse lock >>> with open(target, 'w') as f: >>> f.write('Hello World!') Parameters ---------- target: str the file/folder to lock soft: bool lock the file/folder only for reading (and not for modification)? timeout: int the time in seconds to retry acquiring a lock """ _instances = {} _nesting_levels = {} def __new__(cls, target, soft=False, timeout=7200): target_abs = os.path.abspath(os.path.expanduser(target)) if target_abs not in cls._instances: log.debug(f'creating lock instance for target {target_abs}') instance = super().__new__(cls) cls._instances[target_abs] = instance cls._nesting_levels[target_abs] = 0 else: if soft != cls._instances[target_abs].soft: msg = 'cannot place nested {}-lock on existing {}-lock for target {}' vals = ['read', 'write'] if soft else ['write', 'read'] vals.append(target_abs) raise RuntimeError(msg.format(*vals)) log.debug(f'reusing lock instance for target {target_abs}') return cls._instances[target_abs] def __init__(self, target, soft=False, timeout=7200): if not hasattr(self, '_initialized'): self.target = os.path.abspath(os.path.expanduser(target)) used_id = str(uuid.uuid4()) self.lock = self.target + '.lock' self.error = self.target + '.error' self.used = self.target + f'.used_{used_id}' self.soft = soft if os.path.isfile(self.error): msg = 'cannot acquire lock on damaged target: {}' raise RuntimeError(msg.format(self.target)) end = time.time() + timeout log.debug(f'trying to {"read" if self.soft else "write"}-lock {target}') while True: if time.time() > end: msg = 'could not acquire lock due to timeout: {}' raise RuntimeError(msg.format(self.target)) try: if self.soft and not os.path.isfile(self.lock): Path(self.used).touch(exist_ok=False) break if not self.soft and not self.is_used(): Path(self.lock).touch(exist_ok=False) break except FileExistsError: pass time.sleep(1) log.debug(f'acquired {"read" if self.soft else "write"}-lock on {target}') self._initialized = True Lock._nesting_levels[self.target] += 1 def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.remove(exc_type)
[docs] def is_used(self): """ Does any usage lock exist? Returns ------- bool """ base = os.path.basename(self.target) folder = os.path.dirname(self.target) files = list(Path(folder).glob(base + '.used*')) return len(files) > 0
[docs] def remove(self, exc_type=None): """ Remove the acquired soft/hard lock Returns ------- """ Lock._nesting_levels[self.target] -= 1 if Lock._nesting_levels[self.target] == 0: if not self.soft and exc_type is not None: if os.path.exists(self.target): os.rename(self.lock, self.error) log.debug(f'placed error-lock on {self.target}') else: if self.soft: os.remove(self.used) else: os.remove(self.lock) msg_sub = "read" if self.soft else "write" log.debug(f'removed {msg_sub}-lock on {self.target}') del Lock._instances[self.target] del Lock._nesting_levels[self.target] else: log.debug(f'decrementing lock level on {self.target}')
[docs]class LockCollection(object): """ Like :class:`Lock` but for multiple files/folders. Parameters ---------- targets: list[str] the files/folders to lock soft: bool lock the files/folders only for reading (and not for modification)? timeout: int the time in seconds to retry acquiring a lock """ def __init__(self, targets, soft=False, timeout=7200): self.locks = [Lock(x, soft=soft, timeout=timeout) for x in targets] def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): for lock in reversed(self.locks): lock.__exit__(exc_type, exc_value, traceback)