import importlib
import inspect
# SPDX-FileCopyrightText: 2023 Karlsruher Institut für Technologie
#
# SPDX-License-Identifier: CC0-1.0
import os
import sys
import traceback
from datetime import datetime
from typing import Union
import pystac
from shapely import geometry
from . import assets, logger, tds2stac, utils
from .extensions import common_metadata, datacube
from .statics import constants
[docs]
class STACCreator(object):
"""
A class for creating STAC catalog, -Collections
and its -Items from TDS datasets catalogs.
"""
[docs]
def STACCatalog(
self,
url: str,
stac_id: str,
stac_title: Union[str, None],
stac_desc: Union[str, None],
stac_dir: str,
stac_existance: bool = False,
auth: Union[tuple, None] = None,
logger_properties: dict = dict(),
):
"""
A function for creating STAC catalog from TDS dataset catalog.
Args:
url: The URL of the TDS catalog.
stac_id: The ID of the STAC catalog.
stac_title: The title of the STAC catalog.
stac_desc: The description of the STAC catalog.
stac_dir: The directory of saving the STAC catalog.
stac_existance: If it is True, it means that the STAC catalog
already exists in the directory and for the harvesting, there
is no need to create a new STAC-Catalog and import new collections
In the existed STAC-Catalog. False by default.
auth: The authentication of the TDS catalog.
logger_properties: The properties of the logger. For more information please check the :class:`~tds2stac.logger.Logger` class.
"""
# using 'xml_processing' we get the catalog URL with
# XML extension and catalog id and XML content of TDS catalog.
catalog: dict = dict()
if logger_properties is not None:
self.logger_properties = logger_properties
xml_url_catalog, id_catalog, xml = utils.xml_processing(url, auth)
if stac_desc is None:
stac_desc = "This is a STAC catalog created by tds2stac"
# In the following if condition we are going to create a new STAC catalog or use the existed one.
if stac_existance is True:
if stac_dir is None:
self.logger_properties["logger_level"] = "WARNING"
self.logger_properties[
"logger_message"
] = "You have turned on the `stac_existance`, so please provide the directory of the existed STAC catalog"
logger.Logger(self.logger_properties)
return
else:
if tds2stac.ExistenceValidator(stac_dir).existance is True:
self.logger_properties["logger_level"] = "INFO"
self.logger_properties[
"logger_message"
] = "The STAC catalog already exists in the directory"
logger.Logger(self.logger_properties)
id_catalog = pystac.Catalog.from_file(
stac_dir + "/stac/catalog.json"
).id
catalog[id_catalog] = pystac.Catalog.from_file(
stac_dir + "/stac/catalog.json"
)
else:
self.logger_properties["logger_level"] = "INFO"
self.logger_properties[
"logger_message"
] = "The STAC catalog does not exist in the directory"
logger.Logger(self.logger_properties)
id_catalog = id_catalog + " Catalog"
catalog[id_catalog] = pystac.Catalog(
id=stac_id,
title=stac_title,
description="["
+ stac_desc
+ "]("
+ utils.xml2html(xml_url_catalog)
+ ")",
)
else:
self.logger_properties["logger_level"] = "INFO"
self.logger_properties[
"logger_message"
] = "It creates a new catalog in the directory"
logger.Logger(self.logger_properties)
id_catalog = id_catalog + " Catalog"
catalog[id_catalog] = pystac.Catalog(
id=stac_id,
title=stac_title,
description="["
+ stac_desc
+ "]("
+ utils.xml2html(xml_url_catalog)
+ ")",
)
return catalog[id_catalog]
[docs]
def STACCollection(
self,
catalog: pystac.Catalog,
collection_id: str,
collection_title: str,
collection_description: str,
# collection_scientific: bool = False,
stac_existance_collection: bool = False,
logger_properties: dict = dict(),
):
"""
This is a function for creating STAC collection
from harvested information from TDS dataset catalog.
This functin returns a dictionary with two keys:
1. collection: The STAC collection
2. existed_items_id_list: The list of the items that already
exist in the STAC collection and it is going to be used for
the harvesting process.
Args:
catalog: The STAC catalog.
collection_id: The ID of the STAC collection.
collection_title: The title of the STAC collection.
collection_description: The description of the STAC collection.
collection_scientific: The scientific extension of the STAC collection.
stac_existance_collection: If it is True, it means that the STAC collection
already exists in the catalog and for the harvesting, there
is no need to create a new STAC-Collection and import new items
In the existed STAC-Collection. False by default.
logger_properties: The properties of the logger. For more information please check the :class:`~tds2stac.logger.Logger` class.
"""
collection: dict = dict()
existed_items_id_list = []
if logger_properties is not None:
self.logger_properties = logger_properties
if stac_existance_collection is True:
existed_collections_id_list = []
existed_collections_id_list = [
existence_collection.id
for existence_collection in list(catalog.get_collections())
]
if (
collection is not None
and collection_id in existed_collections_id_list
):
collection[collection_id] = catalog.get_child(collection_id)
existed_items_id_list = [
existed_item.id
for existed_item in list(
collection[collection_id].get_items()
)
]
else:
# Defining a None Spatial and Temporal extent for the collection
collection[collection_id] = pystac.Collection(
id=collection_id,
title=collection_title,
extent=pystac.Extent(
spatial=pystac.SpatialExtent(bboxes=[0.0, 0.0]),
temporal=pystac.TemporalExtent(
intervals=[[datetime.utcnow(), datetime.utcnow()]]
),
),
description=collection_description,
)
catalog.add_child(collection[collection_id])
else:
# When the STAC collection does not exist in the catalog
# Instead of None value for Spatial and Temporal extent, we define a default value
# as a list of [0.0, 0.0] for Spatial extent and [[datetime.utcnow(), datetime.utcnow()]] for Temporal extent
collection[collection_id] = pystac.Collection(
id=collection_id,
title=collection_title,
extent=pystac.Extent(
spatial=pystac.SpatialExtent(bboxes=[0.0, 0.0]),
temporal=pystac.TemporalExtent(
intervals=[[datetime.utcnow(), datetime.utcnow()]]
),
),
description=collection_description,
)
catalog.add_child(collection[collection_id])
# if collection_scientific is True:
# scientific_class = scientific.Scientific()
# scientific_class.collection(
# catalog.get_child(collection_id),
# collection_scientific,
# )
return {
"existed_items_id_list": existed_items_id_list,
"collection": collection[collection_id],
}
[docs]
def STACItem(
self,
url: str,
catalog: pystac.Catalog,
harvesting_vars: dict,
Recognizer_output: Union[str, None],
collection_id: str,
aggregated_dataset_url: Union[str, None] = None,
extension_properties: Union[dict, None] = None,
asset_properties: Union[dict, None] = dict(),
logger_properties: dict = dict(),
):
"""
This is a function for creating STAC item
from harvested data in TDS dataset catalog.
Args:
url: The URL of the TDS catalog.
catalog: The STAC catalog.
harvesting_vars: The harvested data from TDS catalog.
Recognizer_output: The output of the Recognizer class.
collection_id: The ID of the STAC collection.
aggregated_dataset_url: The URL of the aggregated dataset that
whole of data is located there.
extension_properties: The properties of the extensions.
asset_properties: The properties of the assets.
logger_properties: The properties of the logger. For more information please check the :class:`~tds2stac.logger.Logger` class.
"""
# if "Seventh Scenario" in str(Recognizer_output):
# service_url_html = utils.xml2html(url)
# else:
# service_url_html = (
# utils.xml2html(url)
# + "?dataset="
# + harvesting_vars["dataset"].get("ID")
# )
if logger_properties is not None:
self.logger_properties = logger_properties
if (
utils.replacement_func_collection_item_id(
harvesting_vars["item_id"]
)
is None
or harvesting_vars["item_footprint"] is None
or harvesting_vars["item_bbox"] is None
or harvesting_vars["modified_date_time"] is None
):
self.logger_properties["logger_level"] = "ERROR"
self.logger_properties[
"logger_message"
] = "One of `item_id`, `item_footprint`, `item_bbox` or `modified_date_time` is None, so you need to double check your dataset."
logger.Logger(self.logger_properties)
return
item = pystac.Item(
id=utils.replacement_func_collection_item_id(
harvesting_vars["item_id"]
),
geometry=geometry.mapping(harvesting_vars["item_footprint"]),
bbox=harvesting_vars["item_bbox"],
datetime=harvesting_vars["modified_date_time"],
properties={},
)
if (
extension_properties is not None
and extension_properties["item_common_metadata"] is True
):
common_metadata_class = common_metadata.CommonMetadata()
common_metadata_class.item(item, harvesting_vars)
#########################################
# Adding web services as assets into items
#########################################
asset = assets.Assets()
asset.item(
harvesting_vars=harvesting_vars,
item=item,
Recognizer_output=Recognizer_output,
url=url,
aggregated_dataset_url=aggregated_dataset_url,
asset_properties=asset_properties,
logger_properties=self.logger_properties,
)
if extension_properties is not None:
if extension_properties.get("item_extensions") is not None:
for extension_property in extension_properties[
"item_extensions"
]:
# if (
# isinstance(extension_property, str)
# and extension_property == "item_scientific"
# ):
# scientific_class = scientific.Scientific()
# scientific_class.item(
# item, extension_properties["item_scientific"]
# )
if (
isinstance(extension_property, str)
and extension_property == "item_datacube_extension"
):
datacube_class = datacube.Datacube()
datacube_class.item_extension(item, harvesting_vars)
if (
isinstance(extension_property, str)
and extension_property == "common_metadata"
):
common_metadata_class = (
common_metadata.CommonMetadata()
)
common_metadata_class.item(item, harvesting_vars)
if isinstance(extension_property, tuple):
if (
len(extension_property) < 2
or len(extension_property) > 3
):
self.logger_properties["logger_level"] = "ERROR"
self.logger_properties[
"logger_message"
] = "The length of `extension_property` tuple is less than 2 or greater than 3, so you need to double check your input"
logger.Logger(self.logger_properties)
return
elif len(extension_property) == 2:
caller_filename = inspect.stack()
script_path = caller_filename[2][1]
sys.path.append(script_path)
custom_module = os.path.splitext(
os.path.basename(caller_filename[2][1])
)[0]
imported_module = importlib.import_module(
custom_module
)
elif len(extension_property) == 3:
script_path = extension_property[2]
sys.path.append(script_path)
custom_module = os.path.splitext(
os.path.basename(extension_property[2])
)[0]
imported_module = importlib.import_module(
custom_module
)
if "." in extension_property[1]:
extension_property_class_str = extension_property[
1
].split(".")[0]
extension_property_function_str = (
extension_property[1].split(".")[1]
)
if hasattr(
imported_module, extension_property_class_str
):
extension_property_class = getattr(
imported_module,
extension_property_class_str,
)
instance = extension_property_class()
if hasattr(
instance, extension_property_function_str
):
getattr(
instance,
extension_property_function_str,
)(item, harvesting_vars)
else:
self.logger_properties[
"logger_level"
] = "ERROR"
self.logger_properties[
"logger_message"
] = "Check your extension function name, second element of the tuple"
logger.Logger(self.logger_properties)
else:
self.logger_properties[
"logger_level"
] = "ERROR"
self.logger_properties[
"logger_message"
] = "Check your extension class name, second element of the tuple"
logger.Logger(self.logger_properties)
else:
extension_property_function_str = (
extension_property[1]
)
if hasattr(
imported_module,
extension_property_function_str,
):
getattr(
imported_module,
extension_property_function_str,
)(item, harvesting_vars)
else:
self.logger_properties[
"logger_level"
] = "ERROR"
self.logger_properties[
"logger_message"
] = "Check your extension function name, second element of the tuple"
logger.Logger(self.logger_properties)
sys.path.remove(script_path)
# if (
# extension_properties is not None
# and extension_properties["item_scientific"] is True
# ):
# scientific_class = scientific.Scientific()
# scientific_class.item(
# item, extension_properties["item_scientific"]
# )
# applying datacube extension to items
if (
extension_properties is not None
and extension_properties["item_datacube"] is True
):
datacube_class = datacube.Datacube()
datacube_class.item_extension(item, harvesting_vars)
# Because Collection does not provide point coordination, this condition was applied.
# TODO: Should be checked by collection_footprint_point None ir not SHOULD BE REFACTORED .....
if (
harvesting_vars["collection_bbox"][0]
== harvesting_vars["collection_bbox"][2]
or harvesting_vars["collection_bbox"][1]
== harvesting_vars["collection_bbox"][3]
):
harvesting_vars["collection_bbox"] = [
harvesting_vars["collection_bbox"][0] - constants.epilon,
harvesting_vars["collection_bbox"][1] - constants.epilon,
harvesting_vars["collection_bbox"][2] + constants.epilon,
harvesting_vars["collection_bbox"][3] + constants.epilon,
]
spatial_extent = pystac.SpatialExtent(
bboxes=[harvesting_vars["collection_bbox"]]
)
temporal_extent = pystac.TemporalExtent(
intervals=[harvesting_vars["collection_interval_time_final"]]
)
# An empty condition for either Temporal or Spatial extent
# TODO: To be refactored
if (
harvesting_vars["collection_bbox"] is None
or harvesting_vars["collection_interval_time_final"] is None
):
spatial_extent = pystac.SpatialExtent(bboxes=[0.0, 0.0])
temporal_extent = pystac.TemporalExtent(
intervals=[[datetime.utcnow(), datetime.utcnow()]]
)
final_collection: pystac.Collection = catalog.get_child(collection_id) # type: ignore
if final_collection is not None:
final_collection.extent = pystac.Extent(
spatial=spatial_extent,
temporal=temporal_extent,
)
final_collection.add_item(item)
return {"item": item}
[docs]
def SaveCatalog(self, catalog, catalog_dir):
try:
catalog.normalize_hrefs(os.path.join(catalog_dir, "stac"))
catalog.save(catalog_type=pystac.CatalogType.SELF_CONTAINED)
except Exception:
ex_type, ex_value, ex_traceback = sys.exc_info()
if ex_type is not None and ex_value is not None:
self.logger_properties["logger_level"] = "ERROR"
self.logger_properties["logger_message"] = (
"The Collection doesn't contain bounding box and temporal information. Therefore the STAC-Catalog cannot be created. Review the input values. %s : %s"
% (
ex_type.__name__,
ex_value,
)
)
logger.Logger(self.logger_properties)
print(traceback.format_exc())