Source code for pyroSAR.datacube_util

###############################################################################
# Convenience tools for Open Data Cube ingestion

# Copyright (c) 2018-2019, the pyroSAR Developers.

# This file is part of the pyroSAR Project. It is subject to the
# license terms in the LICENSE.txt file found in the top-level
# directory of this distribution and at
# https://github.com/johntruckenbrodt/pyroSAR/blob/master/LICENSE.txt.
# No part of the pyroSAR project, including this file, may be
# copied, modified, propagated, or distributed except according
# to the terms contained in the LICENSE.txt file.
###############################################################################
"""
This (still experimental) module is intended to easily prepare SAR scenes processed
by pyroSAR for ingestion into an Open Data Cube.

.. code-block:: python

    from pyroSAR.datacube_util import Product, Dataset
    from pyroSAR.ancillary import find_datasets
    
    # find pyroSAR files by metadata attributes
    archive_s1 = '/.../sentinel1/GRD/processed'
    scenes_s1 = find_datasets(archive_s1, sensor=('S1A', 'S1B'), acquisition_mode='IW')
    
    # group the found files by their file basenames
    # files with the same basename are considered to belong to the same dataset
    grouped = groupby(scenes_s1, 'outname_base')
    
    # define the polarization units describing the data sets
    units = {'VV': 'backscatter VV', 'VH': 'backscatter VH'}
    
    # create a new product
    with Product(name='S1_GRD_index',
                 product_type='gamma0',
                 description='Gamma Naught RTC backscatter') as prod:
        
        for dataset in grouped:
            with Dataset(dataset, units=units) as ds:
                
                # add the dataset to the product
                prod.add(ds)
                
                # parse datacube indexing YMLs from product and data set metadata
                prod.export_indexing_yml(ds, 'yml_index_outdir')
        
        # write the product YML
        prod.write('yml_product')
        
        # print the product metadata which is written to the product YML
        print(prod)
"""

import os
import re
import yaml
import uuid
from time import strftime, strptime
from spatialist.raster import Raster, Dtype
from spatialist.ancillary import union
from .ancillary import parse_datasetname

import logging
log = logging.getLogger(__name__)


[docs]class Dataset(object): """ A general class describing dataset information required for creating ODC YML files Parameters ---------- filename: str, list, Dataset the product to be used; either an existing :class:`Dataset` object or a (list of) file(s) matching the pyroSAR naming pattern, i.e. that can be parsed by :func:`pyroSAR.ancillary.parse_datasetname` units: str or dict the units of the product measurement """ def __init__(self, filename, units='DN'): if isinstance(filename, list): combined = sum([Dataset(x, units) for x in filename]) self.__init__(combined) elif isinstance(filename, Dataset): for attr, value in vars(filename).items(): setattr(self, attr, value) elif isinstance(filename, str): # map pyroSAR sensor identifiers to platform and instrument codes sensor_lookup = {'ASAR': ('ENVISAT', 'ASAR'), 'ERS1': ('ERS-1', 'SAR'), 'ERS2': ('ERS-2', 'SAR'), 'PSR1': ('ALOS-1', 'PALSAR'), 'PSR2': ('ALOS-2', 'PALSAR-2'), 'S1A': ('SENTINEL-1', 'C-SAR'), 'S1B': ('SENTINEL-1', 'C-SAR'), 'TSX1': ('TERRASAR-X_1', 'SAR'), 'TDX1': ('TANDEM-X_1', 'SAR')} # extract basic metadata attributes from the filename and register them to the object meta = parse_datasetname(filename) if meta is None: raise ValueError('could not identify dataset: {}'.format(filename)) for key, val in meta.items(): setattr(self, key, val) # define acquisition start and end time; Currently both are set to the acquisition start time, # which is contained in the filename # Time will only be correct if the full scene was processed, start and end time of s subset will # differ. Thus, accurately setting both is not seen as too relevant. self.from_dt = strftime('%Y-%m-%dT%H:%M:%S', strptime(self.start, '%Y%m%dT%H%M%S')) self.to_dt = strftime('%Y-%m-%dT%H:%M:%S', strptime(self.start, '%Y%m%dT%H%M%S')) # match the sensor ID from the filename to a platform and instrument if self.sensor not in sensor_lookup.keys(): raise ValueError('unknown sensor: {}'.format(self.sensor)) self.platform, self.instrument = sensor_lookup[self.sensor] # extract general geo metadata from the GTiff information with Raster(filename) as ras: self.dtype = Dtype(ras.dtype).numpystr self.nodata = ras.nodata self.format = ras.format self.xres, self.yres = ras.res self.crs = 'EPSG:{}'.format(ras.epsg) self.is_projected = ras.projcs is not None self.extent = self.__extent_convert(ras.geo, 'x', 'y') # reproject the raster bounding box to EPSG 4326 and store its extent with ras.bbox() as bbox: bbox.reproject(4326) self.extent_4326 = self.__extent_convert(bbox.extent, 'lon', 'lat') # create dictionary for resolution metadata depending on CRS characteristics resolution_keys = ('x', 'y') if self.is_projected else ('longitude', 'latitude') self.resolution = dict(zip(resolution_keys, (self.xres, self.yres))) # check whether the data type is supported pattern = '(?:(?:u|)int(?:8|16|32|64)|float(?:32|64))' if not re.search(pattern, self.dtype): raise ValueError('unsupported data type {}'.format(self.dtype)) # determine the dataset units if isinstance(units, str): units = units elif isinstance(units, dict): try: units = units[self.polarization] except KeyError: raise KeyError("parameter 'units' does not contain key '{}'".format(self.polarization)) else: raise TypeError("parameter 'units' must be of type str or dict") # create the measurement entry from collected metadata; # this is intended for easy access by class Product self.measurements = {self.polarization: {'dtype': self.dtype, 'name': self.polarization, 'nodata': self.nodata, 'filename': filename, 'units': units}} else: raise TypeError('filename must be of type str, list or Dataset')
[docs] def __add__(self, dataset): """ override the + operator. This is intended to easily combine two Dataset objects, which were created from different files belonging to the same measurement, e.g. two GeoTIFFs with one polarization each. Parameters ---------- dataset: Dataset the dataset to add to the current one Returns ------- Dataset the combination of the two """ for attr in ['extent', 'crs', 'sensor', 'acquisition_mode', 'proc_steps', 'outname_base']: if getattr(self, attr) != getattr(dataset, attr): raise ValueError('value mismatch: {}'.format(attr)) # self.filename.append(dataset.filename) for key in dataset.measurements.keys(): if key in self.measurements.keys(): raise RuntimeError('only different measurements can be combined to one dataset') self.measurements.update(dataset.measurements) return self
[docs] def __radd__(self, dataset): """ similar to :meth:`Dataset.__add__` but for function :func:`sum`, e.g. :code:`sum([Dataset1, Dataset2])` Parameters ---------- dataset: Dataset the dataset to add to the current one Returns ------- Dataset the combination of the two """ if dataset == 0: return self else: return self.__add__(dataset)
@staticmethod def __extent_convert(extent, xkey, ykey): """ convert the extent of a :class:`~spatialist.raster.Raster` object to a datacube-compliant dictionary. Parameters ---------- extent: dict the extent as returned by a :class:`~spatialist.raster.Raster` object xkey: {'longitude', 'x'} the key of the x dimension ykey: {'latitude', 'y'} the key of the y dimension Returns ------- dict a dictionary with keys `ll`, `lr`, `ul` and ``ur """ return {'ll': {xkey: extent['xmin'], ykey: extent['ymin']}, 'lr': {xkey: extent['xmax'], ykey: extent['ymin']}, 'ul': {xkey: extent['xmin'], ykey: extent['ymax']}, 'ur': {xkey: extent['xmax'], ykey: extent['ymax']}} def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __get_measurement_attr(self, attr): """ get a certain measurement attribute from all measurements Parameters ---------- attr: str the attribute to get Returns ------- dict a dictionary with the measurement names as keys and the respective attribute as value """ return dict([(key, self.measurements[key][attr]) for key in self.measurements.keys()]) @property def filenames(self): """ Returns ------- dict all file names registered in the dataset """ return self.__get_measurement_attr('filename') @property def identifier(self): """ Returns ------- str a unique dataset identifier """ return '{}_{}'.format(self.outname_base, '_'.join(self.proc_steps)) @property def units(self): """ Returns ------- dict all measurement unit names registered in the dataset """ return self.__get_measurement_attr('units') @units.setter def units(self, value): """ (re)set the units of all measurements Parameters ---------- value: str or dict the unit(s) to be set; if multiple measurements are present, a dictionary with measurement names as keys needs to be defined Returns ------- """ keys = list(self.measurements.keys()) if isinstance(value, str): if len(keys) == 1: self.measurements[keys[0]]['units'] = value else: raise TypeError('the dataset contains multiple measurements; ' 'in this case a dictionary is needed for setting the measurement units') elif isinstance(value, dict): for name, unit in value.items(): if name in keys: self.measurements[name]['units'] = unit else: raise KeyError("the dataset does not contain a measurement '{}'".format(name))
[docs] def close(self): return
[docs]class Product(object): """ A class for describing an ODC product definition Parameters ---------- definition: str, list, None the source of the product definition; either an existing product YML, a list of :class:`Dataset` objects, or None. In the latter case the product is defined using the parameters `name`, `product_type` and `description`. name: str the name of the product in the data cube product_type: str the type of measurement defined in the product, e.g. `gamma0` description: str the description of the product and its measurements """ def __init__(self, definition=None, name=None, product_type=None, description=None): missing_message = "when initializing {}, parameters " \ "'name', 'product_type' and 'description' must be defined" if isinstance(definition, str): if os.path.isfile(definition): with open(definition, 'r') as yml: try: self.meta = yaml.load(yml, Loader=yaml.FullLoader) except yaml.YAMLError: raise RuntimeError('the provided file does not seem to be a YAML file') else: raise RuntimeError('definition file does not exist') elif isinstance(definition, list): if None in [name, product_type, description]: raise ValueError(missing_message.format(' a product from list')) self.__initialize(name, product_type, description) for dataset in definition: with Dataset(dataset) as DS: self.add(DS) elif definition is None: if None in [name, product_type, description]: raise ValueError(missing_message.format('a blank product')) self.__initialize(name, product_type, description) else: raise TypeError('type of parameter definition must be either str, list or None') def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __str__(self): return yaml.dump(self.meta, default_flow_style=False) def __getattr__(self, item): if item in self.__fixture_storage: return self.meta['storage'][item] elif item in self.__fixture_metadata: subkey = 'code' if item == 'platform' else 'name' return self.meta['metadata'][item][subkey] elif item == 'product_type': return self.meta['metadata']['product_type'] else: return object.__getattribute__(self, item) def __setattr__(self, key, value): if key in self.__fixture_storage: self.meta['storage'][key] = value elif key in self.__fixture_metadata: subkey = 'code' if key == 'platform' else 'name' self.meta['metadata'][key][subkey] = value elif key == 'product_type': self.meta['metadata']['product_type'] = value else: super(Product, self).__setattr__(key, value)
[docs] def close(self): return
def __add_measurement(self, name, dtype, nodata, units): """ create a new measurement entry Parameters ---------- name: str the measurement name dtype: str the data type, e.g. float32 nodata: int or float the nodata value of the data units: str the measurement units Returns ------- """ if name in self.measurements.keys(): raise IndexError('measurement {} already exists'.format(name)) self.meta['measurements'].append({'name': name, 'dtype': dtype, 'units': units, 'nodata': nodata}) def __initialize(self, name, product_type, description): """ create a new blank product Parameters ---------- name: str the name of the product product_type: str the product type, e.g. `gamma0` description: str a description of the product content/purpose Returns ------- """ self.meta = {'description': description, 'measurements': [], 'metadata': {'platform': {'code': None}, 'instrument': {'name': None}, 'format': {'name': None}, 'product_type': product_type}, 'metadata_type': 'eo', 'name': name, 'storage': {'crs': None, 'resolution': None}} @staticmethod def __check_dict_keys(keys, reference): return len(union(keys, reference)) == len(keys) @property def __fixture_fields(self): """ Returns ------- list the names of the top-level metadata fields, which must be defined """ return ['description', 'measurements', 'metadata', 'metadata_type', 'name', 'storage'] @property def __fixture_measurement(self): """ Returns ------- list the names of the metadata fields, which must be defined for all measurements """ return ['dtype', 'nodata', 'units'] @property def __fixture_metadata(self): """ Returns ------- list the names of the metadata fields, which must be defined in the general metadata section """ return ['format', 'instrument', 'platform'] @property def __fixture_storage(self): """ Returns ------- list the names of the metadata fields, which must be defined for the storage section """ return ['crs', 'resolution'] def __validate(self): """ assert whether the Product is valid Returns ------- Raises ------ RuntimeError """ try: assert isinstance(self.meta, dict) assert self.__check_dict_keys(self.__fixture_fields, self.meta.keys()) assert 'product_type' in self.meta['metadata'].keys() for measurement in self.meta['measurements']: assert self.__check_dict_keys(self.__fixture_measurement, measurement.keys()) except AssertionError as e: log.info(e) raise RuntimeError('product invalid')
[docs] def add(self, dataset): """ Add a dataset to the abstracted product description. This first performs a check whether the dataset is compatible with the product and its already existing measurements. If a measurement in the dataset does not yet exist in the product description it is added. Parameters ---------- dataset: Dataset the dataset whose description is to be added Returns ------- """ if not isinstance(dataset, Dataset): raise TypeError('input must be of type pyroSAR.datacube.Dataset') self.check_integrity(dataset, allow_new_measurements=True) # set the general product definition attributes if they are None for attr in self.__fixture_metadata + self.__fixture_storage: if getattr(self, attr) is None: setattr(self, attr, getattr(dataset, attr)) # if it is not yet present, add the dataset measurement definition to that of the product for measurement, content in dataset.measurements.items(): if measurement not in self.measurements.keys(): self.__add_measurement(dtype=content['dtype'], name=content['name'], nodata=content['nodata'], units=content['units'])
[docs] def check_integrity(self, dataset, allow_new_measurements=False): """ check if a dataset is compatible with the product definition. Parameters ---------- dataset: Dataset the dataset to be checked allow_new_measurements: bool allow new measurements to be added to the product definition? If not and the dataset contains measurements, which are not defined in the product, an error is raised. Returns ------- Raises ------ RuntimeError """ # check general metadata and storage fields for attr in self.__fixture_metadata + self.__fixture_storage: val_ds = getattr(dataset, attr) val_prod = getattr(self, attr) if val_prod is not None and val_ds != val_prod: raise RuntimeError("mismatch of attribute '{0}': {1}, {2}".format(attr, val_ds, val_prod)) # check measurement fields for measurement, content in dataset.measurements.items(): if measurement not in self.measurements.keys(): if not allow_new_measurements: raise RuntimeError("measurement '{}' is not present in the product definition " "and allow_new_measurements is set to False".format(measurement)) else: match = self.measurements[measurement] for attr in self.__fixture_measurement: if match[attr] != content[attr]: raise RuntimeError("mismatch of measurement '{0}', " "attribute '{1}': {2}, {3}". format(measurement, attr, match[attr], content[attr]))
[docs] def export_indexing_yml(self, dataset, outdir): """ Write a YML file named {:meth:`Dataset.identifier`}_dcindex.yml, which can be used for indexing a dataset in an Open Data Cube. The file will contain information from the product and the dataset and a test is first performed to check whether the dataset matches the product definition. A unique ID is issued using :func:`uuid.uuid4()`. Parameters ---------- dataset: Dataset the dataset for which to export a file for outdir: str the directory to write the file to Returns ------- """ self.__validate() outname = os.path.join(outdir, dataset.identifier + '_dcindex.yml') if os.path.isfile(outname): raise RuntimeError('indexing YML already exists: \n {}'.format(outname)) if not os.path.isdir(outdir): os.makedirs(outdir) self.check_integrity(dataset) out = {'id': str(uuid.uuid4()), 'image': {'bands': {}}, 'grid_spatial': {'projection': {}}, 'extent': {'coord': {}}, 'lineage': {'source_datasets': {}}} for measurement, content in dataset.measurements.items(): out['image']['bands'][measurement] = {'path': content['filename']} for attr in self.__fixture_metadata: subkey = 'code' if attr == 'platform' else 'name' out[attr] = {subkey: getattr(dataset, attr)} out['grid_spatial']['projection']['geo_ref_points'] = dataset.extent out['grid_spatial']['projection']['spatial_reference'] = dataset.crs out['extent']['coord'] = dataset.extent_4326 out['extent']['from_dt'] = dataset.from_dt out['extent']['to_dt'] = dataset.to_dt out['product_type'] = self.meta['metadata']['product_type'] with open(outname, 'w') as yml: yaml.dump(out, yml, default_flow_style=False)
[docs] def export_ingestion_yml(self, outname, product_name, ingest_location, chunking): """ Write a YML file, which can be used for ingesting indexed datasets into an Open Data Cube. Parameters ---------- outname: str the name of the YML file to write product_name: str the name of the product in the ODC ingest_location: str the location of the ingested NetCDF files chunking: dict a dictionary with keys 'x', 'y' and 'time'; determines the size of the netCDF files ingested into the datacube; e.g. {'x': 512, 'y': 512, 'time': 1} Returns ------- """ if os.path.isfile(outname): raise RuntimeError('product definition YML already exists: \n {}'.format(outname)) self.__validate() if product_name == self.meta['name']: raise ValueError('source and target product names must be different') outdir = os.path.dirname(outname) if not os.path.isdir(outdir): os.makedirs(outdir) file_path_template = '{0}/{1}_{2}_{3}_{4}_' \ '{{tile_index[0]}}_' \ '{{tile_index[1]}}_' \ '{{start_time}}.nc'.format(product_name, self.platform, self.instrument, self.product_type, self.crs.replace('EPSG:', '')) global_attributes = {'instrument': self.instrument, 'platform': self.platform, 'institution': 'ESA', 'achknowledgment': 'Sentinel-1 data is provided by the European Space Agency ' 'on behalf of the European Commission via download.'} storage = self.meta['storage'] storage['driver'] = 'NetCDF CF' storage['tile_size'] = {} storage['tile_size']['x'] = storage['resolution']['x'] * chunking['x'] storage['tile_size']['y'] = storage['resolution']['y'] * chunking['y'] storage['chunking'] = chunking storage['dimension_order'] = ['time', 'y', 'x'] measurements = self.meta['measurements'] for measurement in measurements: measurement['resampling_method'] = 'nearest' measurement['src_varname'] = measurement['name'] out = {'source_type': self.meta['name'], 'output_type': product_name, 'description': self.meta['description'], 'location': ingest_location, 'file_path_template': file_path_template, 'storage': self.meta['storage'], 'measurements': self.meta['measurements'], 'global_attributes': global_attributes} with open(outname, 'w') as yml: yaml.dump(out, yml, default_flow_style=False)
@property def measurements(self): """ Returns ------- dict of dict a dictionary with measurement names as keys """ return dict([(x['name'], x) for x in self.meta['measurements']])
[docs] def write(self, ymlfile): """ write the product definition to a YML file Parameters ---------- ymlfile: str the file to write to Returns ------- """ if os.path.isfile(ymlfile): raise RuntimeError('ingestion YML already exists: \n {}'.format(ymlfile)) self.__validate() with open(ymlfile, 'w') as yml: yaml.dump(self.meta, yml, default_flow_style=False)