Source code for tds2stac.extensions.datacube

# SPDX-FileCopyrightText: 2023 Karlsruher Institut für Technologie
#
# SPDX-License-Identifier: CC0-1.0
# TODO: make the datacube configurable from outside
# TODO: make the datacube based on two other webservices: iso and wms

from pystac.extensions.datacube import (
    AdditionalDimension,
    DatacubeExtension,
    DimensionType,
    HorizontalSpatialDimension,
    TemporalDimension,
    Variable,
    VariableType,
    VerticalSpatialDimension,
)


[docs] class Datacube(object):
[docs] def item_extension(self, item, harvesting_vars): variables = ( {} ) # variables dictionary for gathering the Variable objects dimensions = ( {} ) # dimensions dictionary for gathering the Dimension objects cube = DatacubeExtension.ext(item, add_if_missing=True) variable_dimensions = [ elem.replace(" ", ",").split(",") if " " in elem else [elem] for elem in harvesting_vars["variable_dimensions"] ] if len(harvesting_vars["variable_ids"]) != len(variable_dimensions): # Another solution for this is to index each output element and find the None values to decide about them later raise ValueError( "The length of the variable list and the dimension list are not equal. Check your dataset in Thredds. WE SHOULD EXPLAIN THIS ERROR IN THE DOCUMENTATION" ) else: for i, v in enumerate(harvesting_vars["variable_ids"]): variable_dict = dict() # forth case is when description is not available or is less than then variable ids or dimensions. It's not required then can be ignored # Usually every varialbe in ncML has name and shape attrs that it can be used as variable id and dimension. variable_dict["dimensions"] = variable_dimensions[i] variable_dict["type"] = VariableType.DATA.value if ( harvesting_vars["variable_description"] is not None and len(harvesting_vars["variable_ids"]) == len(harvesting_vars["variable_description"]) and len(harvesting_vars["variable_description"]) != harvesting_vars["variable_description"].count(None) ): if harvesting_vars["variable_description"][i] is not None: variable_dict["description"] = harvesting_vars[ "variable_description" ][i] variable_dict["dimensions"] = variable_dimensions[i] if ( harvesting_vars["variable_unit"] is not None and len(harvesting_vars["variable_unit"]) == len(variable_dimensions) and len(harvesting_vars["variable_unit"]) != harvesting_vars["variable_unit"].count(None) ): if harvesting_vars["variable_unit"][i] is not None: variable_dict["units"] = harvesting_vars[ "variable_unit" ][i] variables[harvesting_vars["variable_ids"][i]] = Variable( variable_dict ) # Horizontal Spatial Dimension list_of_required_keys = [ "horizontal_ids_lat", "horizontal_ids_lon", "horizontal_axis_x", "horizontal_axis_y", "horizontal_extent_lon_min", "horizontal_extent_lon_max", "horizontal_extent_lat_min", "horizontal_extent_lat_max", ] # TODO: check the possiblity of making the string values as float if all( harvesting_vars.get(key) is not None for key in list_of_required_keys ): print("test") horizontal_dict = dict() horizontal_dict = { "type": DimensionType.SPATIAL.value, "axis": harvesting_vars["horizontal_axis_x"], "description": harvesting_vars["horizontal_description_lon"], "extent": [ float(harvesting_vars["horizontal_extent_lon_min"]), float(harvesting_vars["horizontal_extent_lon_max"]), ], "reference_system": harvesting_vars[ "horizontal_reference_system" ], } dimensions[ harvesting_vars["horizontal_ids_lon"] ] = HorizontalSpatialDimension(horizontal_dict) horizontal_dict = dict() horizontal_dict = { "type": DimensionType.SPATIAL.value, "axis": harvesting_vars["horizontal_axis_y"], "description": harvesting_vars["horizontal_description_lat"], "extent": [ float(harvesting_vars["horizontal_extent_lat_min"]), float(harvesting_vars["horizontal_extent_lat_max"]), ], "reference_system": harvesting_vars[ "horizontal_reference_system" ], } dimensions[ harvesting_vars["horizontal_ids_lat"] ] = HorizontalSpatialDimension(horizontal_dict) else: print( "Required attributes are not involved in the output dictionary. Check your dataset in Thredds or config json file. WE SHOULD EXPLAIN THIS ERROR IN THE DOCUMENTATION" ) # Vertical Spatial Dimension list_of_required_keys = [ "vertical_id", "vertical_axis", "vertical_extent_upper", "vertical_extent_lower", ] if all( harvesting_vars.get(key) is not None and harvesting_vars[key] != [] for key in list_of_required_keys ): vertical_dict = dict() vertical_dict = { "type": DimensionType.SPATIAL.value, "axis": harvesting_vars["vertical_axis"], "description": harvesting_vars["vertical_description"], "extent": [ float(harvesting_vars["vertical_extent_lower"]), float(harvesting_vars["vertical_extent_upper"]), ], } dimensions[ harvesting_vars["vertical_id"] ] = VerticalSpatialDimension(vertical_dict) else: print( "Required attributes are not involved in the output dictionary. Check your dataset in Thredds or config json file. WE SHOULD EXPLAIN THIS ERROR IN THE DOCUMENTATION" ) # Temporal Dimension list_of_required_keys = [ "temporal_id", "temporal_extent_start_datetime", "temporal_extent_end_datetime", ] if all( harvesting_vars.get(key) is not None for key in list_of_required_keys ): temporal_dict = dict() temporal_dict = { "type": DimensionType.TEMPORAL.value, "description": harvesting_vars["temporal_description"], "extent": [ harvesting_vars["temporal_extent_start_datetime"], harvesting_vars["temporal_extent_end_datetime"], ], } dimensions[harvesting_vars["temporal_id"]] = TemporalDimension( temporal_dict ) else: print( "Required attributes are not involved in the output dictionary. Check your dataset in Thredds or config json file. WE SHOULD EXPLAIN THIS ERROR IN THE DOCUMENTATION" ) # Additional Dimensions anotherlist = [ "x", "y", "time", "longitude", "latitude", "t", "z", "height", ] for id in harvesting_vars["additional_ids"]: if id.lower() not in [ i.lower() for i in list(dimensions.keys()) + anotherlist ]: additional_dict = dict() additional_dict = { "type": harvesting_vars["additional_type"], "extent": [None, None], } dimensions[id] = AdditionalDimension(additional_dict) cube.apply(dimensions=dimensions, variables=variables)
# def item(item, harvesting_vars): # var = [] # dim = [] # variables = {} # variables in a STAC-Item # dimensions = {} # dimensions in a STAC-Item # cube = DatacubeExtension.ext(item, add_if_missing=True) # # Creating dimension and varibles to datacube extension # # TODO: temporary we add the following line to avoid error in STAC validation but we need to find a better solution # # ISSUE #81 # # print('harvesting_vars["keyword"]', harvesting_vars["keyword"]) # # print('harvesting_vars["var_lists"]', harvesting_vars["var_lists"]) # # print('harvesting_vars["var_descs"]', harvesting_vars["var_descs"]) # # print('harvesting_vars["var_dims"]', harvesting_vars["var_dims"]) # DatacubeExtension.remove_from(item) # for i, v in enumerate(harvesting_vars["var_lists"]): # if harvesting_vars["keyword"] == []: # var.append( # Variable( # dict( # type="data", # description=harvesting_vars["var_descs"][i], # dimensions=harvesting_vars["keyword"], # ) # ) # ) # else: # var.append( # Variable( # dict( # type="data", # description=harvesting_vars["var_descs"][i], # dimensions=harvesting_vars["keyword"][i], # ) # ) # ) # for i, v in enumerate(harvesting_vars["var_lists"]): # if harvesting_vars["var_lists"][i] not in harvesting_vars["var_dims"] and harvesting_vars["var_lists"][i] not in harvesting_vars["keyword"]: # variables[v] = var[i] # for i, v in enumerate(harvesting_vars["var_dims"]): # if harvesting_vars["var_dims"][i] == "row" or "lon" in harvesting_vars["var_dims"][i]: # extend_ = [ # harvesting_vars["long_min"].replace(",", "."), # harvesting_vars["long_max"].replace(",", "."), # ] # type_ = DimensionType.SPATIAL.value # description_ = "longitude" # axis_ = "x" # elif harvesting_vars["var_dims"][i] == "column" or "lat" in harvesting_vars["var_dims"][i]: # extend_ = [ # harvesting_vars["lat_min"].replace(",", "."), # harvesting_vars["lat_max"].replace(",", "."), # ] # type_ = DimensionType.SPATIAL.value # description_ = "latitude" # axis_ = "y" # elif harvesting_vars["var_dims"][i] == "temporal" or "time" in harvesting_vars["var_dims"][i] or "t" in harvesting_vars["var_dims"][i]: # extend_ = [ # harvesting_vars["time_start"], # harvesting_vars["time_end"], # ] # type_ = DimensionType.TEMPORAL.value # description_ = "time" # axis_ = "time" # else: # extend_ = None # type_ = DimensionType.SPATIAL.value # description_ = harvesting_vars["var_dims"][i] # axis_ = harvesting_vars["var_dims"][i] # dim.append( # HorizontalSpatialDimension( # properties=dict( # axis=axis_, # extent=extend_, # description=description_, # reference_system="epsg:4326", # type=type_, # ) # ) # ) # for i, v in enumerate(harvesting_vars["var_dims"]): # dimensions[v] = dim[i] # cube.apply(dimensions=dimensions, variables=variables)