Source code for tds2stac.creator

import importlib
import inspect

# SPDX-FileCopyrightText: 2023 Karlsruher Institut für Technologie
#
# SPDX-License-Identifier: CC0-1.0
import os
import sys

# import traceback
from datetime import datetime
from importlib.machinery import SourceFileLoader
from importlib.util import module_from_spec, spec_from_loader
from typing import Union

import pystac
from shapely import geometry

from . import assets, logger, utils
from .analysers.existence_validator import ExistenceValidator
from .extensions import common_metadata, datacube
from .extensions.extra_metadata import ExtraMetadata
from .statics import constants


[docs] class STACCreator(object): """ A class for creating STAC catalog, -Collections and its -Items from TDS datasets catalogs. """
[docs] def STACCatalog( self, url: str, stac_id: str, stac_title: Union[str, None], stac_desc: Union[str, None], stac_dir: str, stac_existence: bool = False, logger_properties: dict = dict(), requests_properties: dict = dict(), ): """ A function for creating STAC catalog from TDS dataset catalog. Args: url: The URL of the TDS catalog. stac_id: The ID of the STAC catalog. stac_title: The title of the STAC catalog. stac_desc: The description of the STAC catalog. stac_dir: The directory of saving the STAC catalog. stac_existence: If it is True, it means that the STAC catalog already exists in the directory and for the harvesting, there is no need to create a new STAC-Catalog and import new collections In the existed STAC-Catalog. False by default. logger_properties: The properties of the logger. For more information please check the :class:`~tds2stac.logger.Logger` class. requests_properties: The properties of the requests. For more information please check the :class:`~tds2stac.TDS2STAC.requests_properties` class. """ # using 'xml_processing' we get the catalog URL with # XML extension and catalog id and XML content of TDS catalog. catalog: dict = dict() if logger_properties is not None: self.logger_properties = logger_properties xml_url_catalog, id_catalog, xml = utils.xml_processing( url, requests_properties ) if stac_desc is None: stac_desc = "This is a STAC catalog created by tds2stac" # In the following if condition we are going to create a new STAC catalog or use the existed one. if stac_existence is True: if stac_dir is None: self.logger_properties["logger_level"] = "WARNING" self.logger_properties[ "logger_msg" ] = "You have turned on the `stac_existence`, so please provide the directory of the existed STAC catalog" logger.Logger(self.logger_properties) return else: if ExistenceValidator(stac_dir).existence is True: self.logger_properties["logger_level"] = "INFO" self.logger_properties[ "logger_msg" ] = "The STAC catalog already exists in the directory" logger.Logger(self.logger_properties) id_catalog = pystac.Catalog.from_file( stac_dir + "/catalog.json" ).id catalog[id_catalog] = pystac.Catalog.from_file( stac_dir + "/catalog.json" ) else: self.logger_properties["logger_level"] = "INFO" self.logger_properties[ "logger_msg" ] = "The STAC catalog does not exist in the directory" logger.Logger(self.logger_properties) id_catalog = id_catalog + " Catalog" catalog[id_catalog] = pystac.Catalog( id=stac_id, title=stac_title, description="[" + stac_desc + "](" + utils.xml2html(xml_url_catalog) + ")", ) else: self.logger_properties["logger_level"] = "INFO" self.logger_properties[ "logger_msg" ] = "It creates a new catalog in the directory" logger.Logger(self.logger_properties) id_catalog = id_catalog + " Catalog" catalog[id_catalog] = pystac.Catalog( id=stac_id, title=stac_title, description="[" + stac_desc + "](" + utils.xml2html(xml_url_catalog) + ")", ) return catalog[id_catalog]
[docs] def STACCollection( self, catalog: pystac.Catalog, collection_id: str, collection_title: str, collection_description: str, # collection_scientific: bool = False, stac_existence_collection: bool = False, logger_properties: dict = dict(), extra_metadata: dict = dict(), ): """ This is a function for creating STAC collection from harvested information from TDS dataset catalog. This function returns a dictionary with two keys: 1. collection: The STAC collection 2. existed_items_id_list: The list of the items that already exist in the STAC collection and it is going to be used for the harvesting process. Args: catalog: The STAC catalog. collection_id: The ID of the STAC collection. collection_title: The title of the STAC collection. collection_description: The description of the STAC collection. collection_scientific: The scientific extension of the STAC collection. stac_existence_collection: If it is True, it means that the STAC collection already exists in the catalog and for the harvesting, there is no need to create a new STAC-Collection and import new items In the existed STAC-Collection. False by default. logger_properties: The properties of the logger. For more information please check the :class:`~tds2stac.logger.Logger` class. """ collection: dict = dict() collection["collection_bbox_existed"] = None collection["collection_interval_time_final_existed"] = None existed_items_id_list = [] if logger_properties is not None: self.logger_properties = logger_properties if stac_existence_collection is True: existed_collections_id_list = [] existed_collections_id_list = [ existence_collection.id for existence_collection in list(catalog.get_collections()) ] if ( collection is not None and collection_id in existed_collections_id_list ): collection[collection_id] = catalog.get_child(collection_id) collection["collection_bbox_existed"] = collection[ collection_id ].extent.spatial.bboxes[0] collection[ "collection_interval_time_final_existed" ] = collection[collection_id].extent.temporal.intervals[0] existed_items_id_list = [ existed_item.id for existed_item in list( collection[collection_id].get_items() ) ] else: # Defining a None Spatial and Temporal extent for the collection collection[collection_id] = pystac.Collection( id=collection_id, title=collection_title, extent=pystac.Extent( spatial=pystac.SpatialExtent(bboxes=[0.0, 0.0]), temporal=pystac.TemporalExtent( intervals=[[datetime.utcnow(), datetime.utcnow()]] ), ), description=collection_description, ) if extra_metadata is not None: if extra_metadata.get("extra_metadata"): ExtraMetadata( logger_properties=self.logger_properties ).collection( collection=collection[collection_id], extra_metadata=extra_metadata, ) else: self.logger_properties["logger_level"] = "INFO" self.logger_properties[ "logger_msg" ] = "The `extra_metadata` is not activated. So, it does not add any extra metadata to the STAC collection." # logger.Logger(self.logger_properties) catalog.add_child(collection[collection_id]) else: # When the STAC collection does not exist in the catalog # Instead of None value for Spatial and Temporal extent, we define a default value # as a list of [0.0, 0.0] for Spatial extent and [[datetime.utcnow(), datetime.utcnow()]] for Temporal extent collection[collection_id] = pystac.Collection( id=collection_id, title=collection_title, extent=pystac.Extent( spatial=pystac.SpatialExtent(bboxes=[0.0, 0.0]), temporal=pystac.TemporalExtent( intervals=[[datetime.utcnow(), datetime.utcnow()]] ), ), description=collection_description, ) if extra_metadata is not None: if extra_metadata.get("extra_metadata"): ExtraMetadata( logger_properties=self.logger_properties ).collection( collection=collection[collection_id], extra_metadata=extra_metadata, ) else: self.logger_properties["logger_level"] = "INFO" self.logger_properties[ "logger_msg" ] = "The `extra_metadata` is not activated. So, it does not add any extra metadata to the STAC collection." # logger.Logger(self.logger_properties) catalog.add_child(collection[collection_id]) # if collection_scientific is True: # scientific_class = scientific.Scientific() # scientific_class.collection( # catalog.get_child(collection_id), # collection_scientific, # ) return { "existed_items_id_list": existed_items_id_list, "collection": collection[collection_id], "collection_bbox_existed": collection["collection_bbox_existed"], "collection_interval_time_final_existed": collection[ "collection_interval_time_final_existed" ], }
[docs] def STACItem( self, url: str, catalog: pystac.Catalog, harvesting_vars: dict, Recognizer_output: Union[str, None], collection_id: str, aggregated_dataset_url: Union[str, None] = None, extension_properties: Union[dict, None] = None, asset_properties: Union[dict, None] = dict(), logger_properties: dict = dict(), extra_metadata: dict = dict(), stac_existence_collection: bool = False, collection_bbox_existed: Union[list, None] = None, collection_interval_time_final_existed: Union[list, None] = None, ): """ This is a function for creating STAC item from harvested data in TDS dataset catalog. Args: url: The URL of the TDS catalog. catalog: The STAC catalog. harvesting_vars: The harvested data from TDS catalog. Recognizer_output: The output of the Recognizer class. collection_id: The ID of the STAC collection. aggregated_dataset_url: The URL of the aggregated dataset that whole of data is located there. extension_properties: The properties of the extensions. asset_properties: The properties of the assets. logger_properties: The properties of the logger. For more information please check the :class:`~tds2stac.logger.Logger` class. """ # if "Seventh Scenario" in str(Recognizer_output): # service_url_html = utils.xml2html(url) # else: # service_url_html = ( # utils.xml2html(url) # + "?dataset=" # + harvesting_vars["dataset"].get("ID") # ) if logger_properties is not None: self.logger_properties = logger_properties if ( utils.replacement_func_collection_item_id( harvesting_vars["item_id"] ) is None or harvesting_vars["item_footprint"] is None or harvesting_vars["item_bbox"] is None or harvesting_vars["modified_date_time"] is None ): self.logger_properties["logger_level"] = "CRITICAL" self.logger_properties[ "logger_msg" ] = "One of `item_id`, `item_footprint`, `item_bbox` or `modified_date_time` is None, so you need to double check your dataset." logger.Logger(self.logger_properties) return item = pystac.Item( id=utils.replacement_func_collection_item_id( harvesting_vars["item_id"] ), geometry=geometry.mapping(harvesting_vars["item_footprint"]), bbox=harvesting_vars["item_bbox"], datetime=harvesting_vars["modified_date_time"], properties={}, ) if ( extension_properties is not None and extension_properties["item_common_metadata"] is True ): common_metadata_class = common_metadata.CommonMetadata() common_metadata_class.item( item, harvesting_vars, self.logger_properties ) ######################################### # Adding web services as assets into items ######################################### # profiler = cProfile.Profile() # profiler.enable() asset = assets.Assets() asset.item( harvesting_vars=harvesting_vars, item=item, Recognizer_output=Recognizer_output, url=url, aggregated_dataset_url=aggregated_dataset_url, asset_properties=asset_properties, logger_properties=self.logger_properties, ) # profiler.disable() # stats = pstats.Stats(profiler).sort_stats('cumtime') # stats.print_stats() if extension_properties is not None: if extension_properties.get("item_extensions") is not None: for extension_property in extension_properties[ "item_extensions" ]: # if ( # isinstance(extension_property, str) # and extension_property == "item_scientific" # ): # scientific_class = scientific.Scientific() # scientific_class.item( # item, extension_properties["item_scientific"] # ) if ( isinstance(extension_property, str) and extension_property == "item_datacube_extension" ): datacube_class = datacube.Datacube() datacube_class.item_extension( item, harvesting_vars, self.logger_properties ) if ( isinstance(extension_property, str) and extension_property == "common_metadata" ): common_metadata_class = ( common_metadata.CommonMetadata() ) common_metadata_class.item( item, harvesting_vars, self.logger_properties ) if isinstance(extension_property, tuple): if ( len(extension_property) < 2 or len(extension_property) > 3 ): self.logger_properties["logger_level"] = "ERROR" self.logger_properties[ "logger_msg" ] = "The length of `extension_property` tuple is less than 2 or greater than 3, so you need to double check your input" logger.Logger(self.logger_properties) return elif len(extension_property) == 2: caller_filename = inspect.stack() script_path = caller_filename[2][1] sys.path.append(script_path) custom_module = os.path.splitext( os.path.basename(caller_filename[2][1]) )[0] imported_module = importlib.import_module( custom_module ) elif len(extension_property) == 3: script_path_dir = os.path.dirname( extension_property[2] ) sys.path.append(script_path_dir) custom_module = os.path.splitext( os.path.basename(extension_property[2]) )[0] # imported_module = importlib.import_module( # custom_module # ) try: spec = spec_from_loader( custom_module, SourceFileLoader( custom_module, extension_property[2] ), ) imported_module = module_from_spec(spec) # type: ignore spec.loader.exec_module(imported_module) # type: ignore except Exception: self.logger_properties[ "logger_level" ] = "ERROR" self.logger_properties[ "logger_msg" ] = "Check your extension script path, third element of the tuple" logger.Logger(self.logger_properties) return if "." in extension_property[1]: extension_property_class_str = extension_property[ 1 ].split(".")[0] extension_property_function_str = ( extension_property[1].split(".")[1] ) if hasattr( imported_module, extension_property_class_str ): extension_property_class = getattr( imported_module, extension_property_class_str, ) instance = extension_property_class() if hasattr( instance, extension_property_function_str ): a = getattr( instance, extension_property_function_str, )(item, harvesting_vars) else: self.logger_properties[ "logger_level" ] = "ERROR" self.logger_properties[ "logger_msg" ] = "Check your extension function name, second element of the tuple" logger.Logger(self.logger_properties) else: self.logger_properties[ "logger_level" ] = "ERROR" self.logger_properties[ "logger_msg" ] = "Check your extension class name, second element of the tuple" logger.Logger(self.logger_properties) else: extension_property_function_str = ( extension_property[1] ) if hasattr( imported_module, extension_property_function_str, ): a = getattr( imported_module, extension_property_function_str, )(item, harvesting_vars) print(a) else: self.logger_properties[ "logger_level" ] = "ERROR" self.logger_properties[ "logger_msg" ] = "Check your extension function name, second element of the tuple" logger.Logger(self.logger_properties) sys.path.remove(script_path_dir) # if ( # extension_properties is not None # and extension_properties["item_scientific"] is True # ): # scientific_class = scientific.Scientific() # scientific_class.item( # item, extension_properties["item_scientific"] # ) if extra_metadata is not None: if extra_metadata.get("extra_metadata"): ExtraMetadata(logger_properties=self.logger_properties).item( item=item, extra_metadata=extra_metadata, harvesting_vars=harvesting_vars, ) else: self.logger_properties["logger_level"] = "INFO" self.logger_properties[ "logger_msg" ] = "The `extra_metadata` is not activated. So, it does not add any extra metadata to the STAC item." # logger.Logger(self.logger_properties) # applying datacube extension to items if ( extension_properties is not None and extension_properties["item_datacube"] is True ): datacube_class = datacube.Datacube() datacube_class.item_extension( item, harvesting_vars, logger_properties=self.logger_properties ) # Because Collection does not provide point coordination, this condition was applied. # TODO: Should be checked by collection_footprint_point None ir not SHOULD BE REFACTORED ..... if ( harvesting_vars["collection_bbox"][0] == harvesting_vars["collection_bbox"][2] or harvesting_vars["collection_bbox"][1] == harvesting_vars["collection_bbox"][3] ): harvesting_vars["collection_bbox"] = [ harvesting_vars["collection_bbox"][0] - constants.epilon, harvesting_vars["collection_bbox"][1] - constants.epilon, harvesting_vars["collection_bbox"][2] + constants.epilon, harvesting_vars["collection_bbox"][3] + constants.epilon, ] spatial_extent = pystac.SpatialExtent( bboxes=[harvesting_vars["collection_bbox"]] ) temporal_extent = pystac.TemporalExtent( intervals=[harvesting_vars["collection_interval_time_final"]] ) # An empty condition for either Temporal or Spatial extent # TODO: To be refactored if ( harvesting_vars["collection_bbox"] is None or harvesting_vars["collection_interval_time_final"] is None ): spatial_extent = pystac.SpatialExtent(bboxes=[0.0, 0.0]) temporal_extent = pystac.TemporalExtent( intervals=[[datetime.utcnow(), datetime.utcnow()]] ) if ( stac_existence_collection is True and collection_bbox_existed is not None and collection_interval_time_final_existed is not None ): if ( spatial_extent.bboxes[0][0] != 0.0 and spatial_extent.bboxes[0][1] != 0.0 ): collection_boundingbox = utils.merge_bboxes( collection_bbox_existed, harvesting_vars["collection_bbox"] ) spatial_extent = pystac.SpatialExtent( bboxes=[collection_boundingbox] ) else: spatial_extent = pystac.SpatialExtent( bboxes=[collection_bbox_existed] ) if ( temporal_extent.intervals[0][0] != datetime.utcnow() and temporal_extent.intervals[0][1] != datetime.utcnow() ): collection_interval_time_final = utils.merge_intervals( collection_interval_time_final_existed, harvesting_vars["collection_interval_time_final"], ) temporal_extent = pystac.TemporalExtent( intervals=[collection_interval_time_final] ) else: temporal_extent = pystac.TemporalExtent( intervals=[collection_interval_time_final_existed] ) # final_collection: pystac.Collection = catalog.get_child(collection_id) # type: ignore # print(len(list(catalog.get_children()))) # print(len(list(final_collection.get_children()))) # print("Before adding item to collection") if catalog.get_child(collection_id) is not None: catalog.get_child(collection_id).extent = pystac.Extent( spatial=spatial_extent, temporal=temporal_extent, ) catalog.get_child(collection_id).add_item(item) item = None return {"item": item}
[docs] def SaveCatalog( self, catalog, catalog_dir, logger_properties: dict = dict() ): self.logger_properties = logger_properties try: catalog.normalize_hrefs(os.path.join(catalog_dir, "stac")) catalog.save(catalog_type=pystac.CatalogType.SELF_CONTAINED) return True except Exception: ex_type, ex_value, ex_traceback = sys.exc_info() if ex_type is not None and ex_value is not None: self.logger_properties["logger_level"] = "CRITICAL" self.logger_properties["logger_msg"] = ( "The Collection doesn't contain bounding box and/or temporal information. Therefore the STAC-Catalog cannot be created. Review the input values. %s : %s" % ( ex_type.__name__, ex_value, ) ) logger.Logger(self.logger_properties) else: self.logger_properties["logger_level"] = "CRITICAL" self.logger_properties[ "logger_msg" ] = "The Collection doesn't contain bounding box and/or temporal information. Therefore the STAC-Catalog cannot be created. Review the input values." logger.Logger(self.logger_properties) return False
# print(traceback.format_exc())