# SPDX-FileCopyrightText: 2023 Karlsruher Institut für Technologie
#
# SPDX-License-Identifier: CC0-1.0
import os
import sys
import traceback
from datetime import datetime
from typing import Literal, Tuple, Union
import pytz
import urllib3
from lxml import etree
from tqdm import tqdm
from . import assets, creator, harvester, logger, utils
from .analysers.nested_collections import NestedCollectionInspector
from .analysers.properties_verifier import Verifier
from .analysers.recognizer import Recognizer
from .statics import constants
from .webservices import core
##################################################
# Disabling the warning of InsecureRequestWarning
# for web server that doesn't have SSL certificate
##################################################
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
[docs]
class ExistenceValidator(object):
"""
A class for verifying the main STAC catalog's existence.
This class is implemented in :class:`~tds2stac.STACCreator`.
Args:
stac_dir (str): Directory of the main STAC catalog (*)
logger_properties (dict, optional): A dictionary of properties for logger. default is `None`.
"""
stac_dir: str
"""
Directory of the main STAC catalog. It can be a relative or absolute path.
"""
logger_properties: Union[dict, None]
"""
A dictionary of properties for logger. default is `None`.
You can look at keys in :class:`~tds2stac.logger.Logger` class.
"""
def __init__(
self,
stac_dir: str,
logger_properties: Union[dict, None] = dict(),
):
self.stac_dir = stac_dir
self.stac_dir = os.path.join(self.stac_dir, "stac/catalog.json")
if os.path.exists(self.stac_dir):
self.existance = True
else:
self.existance = False
if logger_properties is not None:
logger_properties["logger_msg"] = self.existance
logger.Logger(logger_properties)
def __repr__(self):
return "<TDS2STACExistanceChecker existance: %s>" % (self.existance)
[docs]
class TDS2STACIntegrator(object):
"""
This class is the central component of the TDS2STAC. It harvests the TDS catalog
and then generates the STAC-Catalog, -Collections, and -Items through the TDS catalogs,
based on the user's input.
This class mainly defines all configurations related to harvesting and STAC creation.
In the first step, it recognizes the scenario of the TDS catalog using :class:`~tds2stac.Recognizer`.
If it is recognized as a nested collection, :class:`~tds2stac.NestedCollectionInspector` is responsible for
determining the nested collection's `ID`, `Title`, and `url` of subdirectories. Other procedures follow
in succession. For example, :class:`~tds2stac.CollectionHarvester` harvests the collection's information
and :class:`~tds2stac.STACCreator` creates the STAC-Catalog and -Collection. Then, :class:`~tds2stac.ItemHarvester`
harvests the item's information and :class:`~tds2stac.STACCreator` creates the STAC-Item and connect them
to the related STAC-Collections. At the end each STAC-Collection will be connected to the main STAC-Catalog.
Args:
TDS_catalog(str) : The URL address of the TDS catalog that will be harvested.
stac_dir (str) : Directory of saving created STAC catalogs.
stac_id (str): STAC catalog ID. default value is 'TDS2STAC'.
stac_title (str, optional): STAC catalog Title. default value is 'TDS2STAC'.
stac_description (str, optional): STAC catalog description.
auth (tuple, optional): Authentication tuple for TDS catalog e.g.('user', 'password').
stac_existance (bool, optional): Verifying the presence of the STAC catalog in order to
update an existing catalog; if not, a new catalog will be generated.
stac_existance_collection (bool, optional): Verifying the presence of the STAC Collection
in order to update an existing catalog; if not, a new collection will be generated.
collection_tuples (list, optional): The elements of this tuple comprise the auto-TDS2STAC-generated ID
, the user-defined ID, title, and description of the STAC-Collection respectively. (auto-ID, user-ID,
user-title, user-description).
datetime_filter (list, optional): Datetime-based filtering of harvesting. It works based on the `modified`
tag in each dataset at TDS.
aggregated_dataset_url (str, optional): Dataset's URL of each data entry in the Aggregated datasets of TDS.
depth_number (int, optional) : depth number of nested datasets if it is a nested collection. default value is 0.
limited_number (int, optional): The objective is to reduce the quantity of harvested items in each collection. It is beneficial
for developing and testing purposes.
spatial_information (list, optional): Spatial information of 2D datasets e.g. [minx, maxx, miny, maxy] or 1D dataset e.g.
[x,y]. default value is None.
temporal_format_by_dataname (str, optional): A preferred datetime format for datasets that include the time period in their names. e.g
"e%y%m%d%H.%M%S%f"
item_geometry_linestring (bool, optional): Set True to make a LineString geometry for STAC-Items from wms service.
Otherwise it makes Polygon geometry for the given Item. default value is False.
extension_properties (dict, optional): A dictionary of properties for extensions. default is None.
For more information about the keys, please refer to the :class:`~tds2stac.TDS2STACIntegrator.extension_properties`.
webservice_properties (dict, optional): A dictionary of properties for web_service. default is None (optional)
For more information about the keys, please refer to the :class:`~tds2stac.TDS2STACIntegrator.webservice_properties`.
asset_properties (dict, optional): A dictionary of properties for assets. default is None (optional)
For more information about the keys, please refer to the :class:`~tds2stac.TDS2STACIntegrator.asset_properties`.
logger_properties (dict, optional): A dictionary of properties for logger. default is `None`.
"""
TDS_catalog: str
"""
TDS catalog URL address. Initial point of harvesting e.g.
https://thredds.atmohub.kit.edu/thredds/catalog/caribic/IAGOS-CARIBIC_MS_files_collection_20231017/catalog.html
"""
stac_dir: str
"""
Directory of saving created STAC catalogs e.g. /path/to/stac/directory/
"""
stac_id: str
"""
STAC catalog ID. default value `TDS2STAC`
"""
stac_title: Union[str, None]
"""
STAC catalog Title. default value `TDS2STAC`
"""
stac_description: Union[str, None]
"""
STAC catalog description
"""
auth: Union[Tuple[str, str], None]
"""
Authentication tuple for TDS catalog e.g.('user', 'password')
"""
stac_existance: Literal[False]
"""
Verifying the existence of STAC catalog.
If the catalog exists in the directory, it updates a existed catalog,
otherwise it creates new catalog. default value `False`
"""
stac_existance_collection: Literal[False]
"""
Verifying the existence of STAC Collections.
If the collection exists in the directory, it updates a existed collection,
otherwise it creates new collection. default value `False`
"""
collection_tuples: Union[list[tuple], None]
"""
STAC collection auto-generated ID, user-ID, user-Title and user-Description defined by user.
It is worth mentioning that in order to obtain the list of automatically generated collection
IDs, one can employ the :class:`~tds2stac.NestedCollectionInspector` for the given
TDS Catalog and subsequently utilize this argument.
Warning - Identifiers should consist of only lowercase characters, numbers, '_', and '-'.
Default value `None`. e.g. (ID, Title, Description)
"""
datetime_filter: Union[list, None]
"""
Datetime-based filtering. e.g. ``['2010-02-18T00:00:00.000Z','2020-02-22T00:00:00.000Z']``
Default value `None`. It should be noted it works based on the `modified` tag in each dataset at TDS.
"""
aggregated_dataset_url: Union[str, None]
"""
Dataset's URL of each data entry in the Aggregated datasets of TDS.. default value `None`.
The `HTTPServer` is not functional in the aggregated dataset. Therefore, in order to utilize
this service as an asset in our STAC-Item, we should employ the `aggregated_dataset_url`, which
links the individual datasets to the `HTTPServer` asset of the relevant Item.
"""
depth_number: Union[int, None]
"""
The depth refers to the number of layered datasets. If the collection is nested, this argument
is applicable; otherwise, employing this argument would be futile. default value `None` (optional)
"""
limited_number: Union[int, None]
"""
The objective is to reduce the quantity of harvested items in each collection. It is beneficial
for developing and testing purposes.. default value `None` (optional)
"""
spatial_information: Union[list, None]
"""
Spatial information of 2D datasets e.g. [minx, maxx, miny, maxy] or 1D dataset e.g. [x,y].
Default value `None`(optional)
"""
temporal_format_by_dataname: Union[str, None]
"""
A preferred datetime format for datasets that include the time period in their names e.g "e%y%m%d%H.%M%S%f".
Default value `None` (optional)
"""
item_geometry_linestring: Literal[False]
"""
The default value for the LineString geometry in the STAC Items from the WMS
service is set to False and the default geometry type for the STAC-Item is Polygon.
However, in instances where the item has a POINT geometry, it can be automatically detected.
However, in order to obtain the LineString geometry, it is necessary to set this argument
to True.
"""
extension_properties: Union[dict, None]
"""
A dictionary of properties for extensions. default is `None`.
**item_extensions (list[str, tuple], optional)**:
The argument can consist of either a list of extension names (string) or a
list of tuples containing three elements: the extension name, the function
name or class name associated with the extension, and the Python script
required for execution. For more explanation, refer to the :ref:`custom-extension`.
**collection_extensions (Union[list, tuple], optional)**:
It works as same as `item_extensions` argument. For more explanation,
refer to the :ref:`custom-extension`.
"""
webservice_properties: Union[dict, None]
"""
A dictionary of properties for web_service. default is `None`.
It has the following keys.
**web_service_config_file(str, opntional)**:
The primary `tag_config.json` file is situated in the primary directory
of the installed TDS2STAC. However, the user has the ability to declare
an alternative `tag_config.json` file, which allows for customization of
the settings. The user can specify the location of their own JSON file
in this section. To obtain further details on the creation of a
`tag_config.json` file, refer: :ref:`tag-config`. The default
value is set to `tag_config.json` in the root directory of the installed
app.
"""
asset_properties: Union[dict, None]
"""
A dictionary of properties for assets. default is `None`.
When it's None, keys look like the following example:
**item_thumbnail (bool, optional)**:
A `thumbnail` asset for STAC-Items sourced from the Web Map Service (WMS)
of the TDS. The default value is set to False.
**item_overview (bool, optional)**:
A `overview` asset for STAC-Items sourced from the Web Map Service (WMS)
of the TDS. The default value is set to False.
**item_getminmax_thumbnail (bool, optional)**:
The TDS offers a function that allows users to obtain the minimum and maximum
values of the colorbar associated with an image through the use of `metadata`.
The aforementioned attribute is contingent upon both the `item_thumbnail` and
`item_overview`. The default value is set to False.
**collection_thumbnail (str, optional)**:
A `thumbnail` asset for STAC-collection sourced from the Web Map Service (WMS)
of the TDS. It can be chosen from `wms`, `link`, or `None`. The default value
is set to None.
**collection_overview (str, optional)**:
A `overview` asset for STAC-collection sourced from the Web Map Service (WMS)
of the TDS. It can be chosen from `wms`, `link`, or `None`. The default value
is set to None.
**collection_link (str, optional)**:
This property is reliant upon the values of `collection_thumbnail` and
`collection_overview`. When the value of either of these attributes is
set to `link`, it allows for the inclusion of a hyperlink to an image
for `collection_thumbnail` or `collection_overview`.
**item_assets_list_allowed (list, optional)**:
This is a list of permissible web services that will be incorporated as assets
in the STAC-Item. The :class:`~tds2stac.WebServiceScraper` class provides access
to the list of available web services. Default value is None.
**item_assets_list_avoided (list, optional)**:
This is a list of web services that will be excluded from the STAC-Item asset
list. The :class:`~tds2stac.WebServiceScraper` class provides access to the
list of available webservices. Default value is None.
**collection_assets_list_allowed (list, optional)**:
This is a list of permissible web services that will be incorporated as assets
in the STAC-Collection. The :class:`~tds2stac.WebServiceScraper` class provides access
to the list of available web services. Default value is None.
**collection_assets_list_avoided (list, optional)**:
This is a list of web services that will be excluded from the STAC-Collection asset
list. The :class:`~tds2stac.WebServiceScraper` class provides access to the
list of available webservices. Default value is None.
**explore_data (bool, optional)**:
By enabling the `True` setting, the inclusion of Godiva3 as an exploration asset will be implemented.
**verify_explore_data (bool, optional)**:
This argument verifies the availability of the `GetMetadata` function. The provided
function facilitates the retrieval of data necessary for generating maps using the
Web Map Service (WMS) protocol. However, an error occurs when attempting to open
`Godiva3` when this function doesn't work. In order to mitigate such errors, it
would be advisable to establish this argument.
**jupyter_notebook (bool, optional)**:
This argument posits the inclusion of the Jupyter Notebook as an asset.
"""
logger_properties: dict
"""
A dictionary of properties for logger. default is `None`.
You can look at keys in :class:`~tds2stac.logger.Logger` class.
"""
def __init__(
self,
TDS_catalog: str,
stac_dir: str,
# web_service: str,
stac_id: str = "TDS2STAC",
stac_title: Union[str, None] = "TDS2STAC",
stac_description: Union[str, None] = None,
auth: Union[Tuple[str, str], None] = None,
stac_existance: bool = False,
stac_existance_collection: bool = False,
collection_tuples: Union[list[tuple], None] = None,
datetime_filter: Union[list, None] = None,
aggregated_dataset_url: Union[str, None] = None,
depth_number: Union[int, None] = None,
limited_number: Union[int, None] = None,
# spatial_information :Union[list[Union[str, int], Union[str, int]]|list[Union[str, int], Union[str, int],Union[str, int],Union[str, int]], None] = None,
spatial_information: Union[list, None] = None,
temporal_format_by_dataname: Union[str, None] = None,
item_geometry_linestring: bool = False,
webservice_properties: Union[dict, None] = dict(),
asset_properties: Union[dict, None] = dict(),
extension_properties: Union[dict, None] = dict(),
logger_properties: dict = dict(),
) -> None:
################################################
# Verifying the `webservice_properties`,
# `asset_properties`, and `extension_properties`,
# when they are None
################################################
verifier = Verifier()
if webservice_properties is not None and isinstance(
webservice_properties, dict
):
verifier.webservice_properties(webservice_properties)
if asset_properties is not None and isinstance(asset_properties, dict):
verifier.asset_properties(asset_properties)
if extension_properties is not None and isinstance(
extension_properties, dict
):
verifier.extension_properties(extension_properties)
if logger_properties is not None and isinstance(
logger_properties, dict
):
verifier.logger_properties(logger_properties)
if logger_properties is not None and isinstance(
logger_properties, dict
):
self.logger_properties = logger_properties
self.logger_properties["logger_level"] = "DEBUG"
self.logger_properties["logger_msg"] = "Harvesting is started!"
logger.Logger(self.logger_properties)
################################################
# Getting the list of used webservices names in
# `tag_config.json`
################################################
if webservice_properties is not None:
if (
webservice_properties.get("web_service_config_file")
is not None
and webservice_properties["web_service_config_file"]
== "default"
):
webservice_properties["webservices"] = list(
core.ConfigFileWebServicesScraper(
constants.default_tag_config_file
)
)
elif (
webservice_properties["web_service_config_file"] is not None
and webservice_properties["web_service_config_file"]
!= "default"
):
webservice_properties["webservices"] = list(
core.ConfigFileWebServicesScraper(
webservice_properties["web_service_config_file"]
)
)
################################################
# Defining a dict to store the items information
################################################
item_harvested_details = None
################################################
# Getting the Date time modiefied of TDS catalog
################################################
if datetime_filter is not None:
"""Skip TDS datasets out of 'datetime_filter' according
to 'modified' attribute in `date` tag"""
if datetime_filter[0] is not None:
try:
datetime_after = datetime.strptime(
datetime_filter[0], "%Y-%m-%dT%H:%M:%S.%fZ"
)
if not isinstance(datetime_after, datetime):
self.logger_properties[
"logger_msg"
] = "`datetime_after` parameter have to be a datatime object. Therefore `datetime_filter` is not applied for this harvesting."
self.logger_properties["logger_level"] = "WARNING"
logger.Logger(self.logger_properties)
else:
if datetime_after.tzinfo:
datetime_after = datetime_after.astimezone(
pytz.utc
)
else:
datetime_after = datetime_after.replace(
tzinfo=pytz.utc
)
except Exception:
datetime_after = None
ex_type, ex_value, ex_traceback = sys.exc_info()
if ex_type is not None and ex_value is not None:
self.logger_properties[
"logger_msg"
] = "`datetime_filter` warning: %s : %s" % (
ex_type.__name__,
ex_value,
)
self.logger_properties["logger_level"] = "WARNING"
logger.Logger(self.logger_properties)
print(traceback.format_exc())
if datetime_filter[1] is not None:
try:
datetime_before = datetime.strptime(
datetime_filter[1], "%Y-%m-%dT%H:%M:%S.%fZ"
)
if not isinstance(datetime_before, datetime):
self.logger_properties[
"logger_msg"
] = "`datetime_before` parameter have to be a datatime object. Therefore `datetime_filter` is not applied for this harvesting."
self.logger_properties["logger_level"] = "WARNING"
logger.Logger(self.logger_properties)
else:
if datetime_before.tzinfo:
datetime_before = datetime_before.astimezone(
pytz.utc
)
else:
datetime_before = datetime_before.replace(
tzinfo=pytz.utc
)
except Exception:
datetime_before = None
if ex_type is not None and ex_value is not None:
self.logger_properties[
"logger_msg"
] = "`datetime_filter` warning: %s : %s" % (
ex_type.__name__,
ex_value,
)
self.logger_properties["logger_level"] = "WARNING"
logger.Logger(self.logger_properties)
print(traceback.format_exc())
else:
datetime_after = None
datetime_before = None
###############################
# Finding dataset's scenario
###############################
recognizer = Recognizer(
TDS_catalog, logger_properties=self.logger_properties
)
###############################
# STAC-Catalog creation
###############################
STAC_creator = creator.STACCreator()
stac_catalog = STAC_creator.STACCatalog(
url=TDS_catalog,
stac_id=stac_id,
stac_title=stac_title,
stac_desc=stac_description,
stac_dir=stac_dir,
stac_existance=stac_existance,
auth=auth,
)
########################################
# STAC-Collection and -Item creation
# 1. Harvest for nested collections
########################################
if any(
scenario in str(recognizer.status)
for scenario in constants.nested_scenarios
):
list_of_collection_details = NestedCollectionInspector(
TDS_catalog,
nested_number=depth_number,
logger_properties=self.logger_properties,
)
for k in list_of_collection_details:
###########################################
# defining a new function that harvests the
# collections and create STAC-Collections
# - Harvest Collection's information
###########################################
collection_dict = harvester.CollectionHarvester(
TDS_catalog,
recognizer.status,
subdirs=k,
collection_tuples=collection_tuples,
auth=auth,
logger_properties=self.logger_properties,
)
###########################
# - Create STAC-Collection
###########################
stac_collection = STAC_creator.STACCollection(
catalog=stac_catalog,
collection_id=dict(collection_dict)["collection_id"],
collection_title=dict(collection_dict)["collection_title"],
collection_description=dict(collection_dict)[
"collection_description"
],
stac_existance_collection=stac_existance_collection,
)
###########################################
# - Loop over collection's items
###########################################
if k[3] != []:
for url_for_items in k[3]:
(
xml_url_catalog,
id_catalog,
xml,
) = utils.xml_processing(url_for_items, auth)
recognizer_output = Recognizer(
xml_url_catalog,
logger_properties=self.logger_properties,
)
try:
tree = etree.XML(xml)
except BaseException:
continue
data_counted: int = 0
for data in tqdm(
tree.findall(
".//{%s}dataset[@urlPath]" % constants.unidata
),
colour="red",
):
data_counted += 1
if limited_number is not None:
if data_counted > limited_number:
break
if (
utils.replacement_func_collection_item_id(
utils.replacement_func(data.get("ID"))
)
in stac_collection["existed_items_id_list"]
):
continue
self.harvesting_vars = constants.harvesting_vars
self.harvesting_vars[
"collection_interval_time"
] = ( # noqa
dict(item_harvested_details)["collection_interval_time"] if item_harvested_details is not None else [] # type: ignore
)
self.harvesting_vars[
"collection_interval_time_final"
] = ( # noqa
dict(item_harvested_details)["collection_interval_time_final"] if item_harvested_details is not None else [] # type: ignore
)
self.harvesting_vars["collection_bbox"] = ( # noqa
dict(item_harvested_details)["collection_bbox"] if item_harvested_details is not None else [] # type: ignore
)
self.harvesting_vars[
"collection_footprint"
] = ( # noqa
dict(item_harvested_details)["collection_footprint"] if item_harvested_details is not None else None # type: ignore
)
self.harvesting_vars["collection_footprint_point"] = dict(item_harvested_details)["collection_footprint_point"] if item_harvested_details is not None else None # type: ignore
################################
# - Harvest Item
#################################
item_harvested_details = harvester.ItemHarvester(
xml_url_catalog,
data,
self.harvesting_vars,
webservice_properties,
datetime_after=datetime_after,
datetime_before=datetime_before,
spatial_information=spatial_information,
temporal_format_by_dataname=temporal_format_by_dataname,
extension_properties=extension_properties,
linestring=item_geometry_linestring,
logger_properties=self.logger_properties,
)
################################
# - Create STAC-Item
#################################
STAC_creator.STACItem(
xml_url_catalog,
stac_catalog,
dict(item_harvested_details),
recognizer_output.status,
dict(collection_dict)["collection_id"],
aggregated_dataset_url=aggregated_dataset_url,
extension_properties=extension_properties,
asset_properties=asset_properties,
)
if asset_properties is not None and (
asset_properties["collection_thumbnail"]
in [
"wms",
"link",
]
or asset_properties["collection_overview"]
in ["wms", "link"]
):
if (item_harvested_details) is not None:
asset = assets.Assets()
asset.collection(
harvesting_vars=dict(
item_harvested_details
),
collection_dict=dict(collection_dict),
stac_catalog=stac_catalog,
asset_properties=asset_properties,
logger_properties=self.logger_properties,
)
else:
(
xml_url_catalog,
id_catalog,
xml,
) = utils.xml_processing(k[0], auth)
recognizer_output = Recognizer(
xml_url_catalog,
logger_properties=self.logger_properties,
)
try:
tree = etree.XML(xml)
except BaseException:
return
data_counted = 0
for data in tqdm(
tree.findall(
".//{%s}dataset[@urlPath]" % constants.unidata
),
colour="red",
):
data_counted += 1
if limited_number is not None:
if data_counted > limited_number:
break
if (
utils.replacement_func_collection_item_id(
utils.replacement_func(data.get("ID"))
)
in stac_collection["existed_items_id_list"]
):
continue
# TODO: we should get this dictionary from the JSON file
harvesting_vars = { # type: ignore
"item_id": None,
"description": None,
"horizontal_ids_lat": None,
"horizontal_ids_lon": None,
"horizontal_axis_x": None,
"horizontal_axis_y": None,
"horizontal_extent_lon_min": None,
"horizontal_extent_lon_max": None,
"horizontal_extent_lat_min": None,
"horizontal_extent_lat_max": None,
"horizontal_description_lon": None,
"horizontal_description_lat": None,
"horizontal_reference_system": None,
"vertical_ids": None,
"vertical_axis": None,
"vertical_extent_upper": None,
"vertical_extent_lower": None,
"vertical_description": None,
"temporal_id": None,
"temporal_axis": None,
"temporal_extent_start_datetime": None,
"temporal_extent_end_datetime": None,
"temporal_description": None,
"variable_description": None, # Description of each variable
"variable_dimensions": None, # dimention of each variable
"variable_ids": None, # Variable names
"variable_unit": None, # Variable units
"variable_types": None, # Variable types
"services": None,
"dataset": None,
"catalog_url": None,
"main_dataset_url": None,
"catalog_id": None,
"item_bbox": None,
"item_footprint": None,
"collection_bbox": None,
"collection_footprint": None,
"collection_footprint_point": None,
"collection_interval_time": None,
"modified_date_time": None,
"collection_interval_time_final": None,
}
harvesting_vars["collection_interval_time"] = ( # noqa
dict(item_harvested_details)["collection_interval_time"] if item_harvested_details is not None else [] # type: ignore
)
harvesting_vars[
"collection_interval_time_final"
] = ( # noqa
dict(item_harvested_details)["collection_interval_time_final"] if item_harvested_details is not None else [] # type: ignore
)
harvesting_vars["collection_bbox"] = ( # noqa
dict(item_harvested_details)["collection_bbox"] if item_harvested_details is not None else [] # type: ignore
)
harvesting_vars["collection_footprint"] = ( # noqa
dict(item_harvested_details)["collection_footprint"] if item_harvested_details is not None else None # type: ignore
)
harvesting_vars[
"collection_footprint_point"
] = ( # noqa
dict(item_harvested_details)["collection_footprint_point"] if item_harvested_details is not None else None # type: ignore
)
################################
# - Harvest Item
#################################
item_harvested_details = harvester.ItemHarvester(
xml_url_catalog,
data,
harvesting_vars,
webservice_properties,
datetime_after=datetime_after,
datetime_before=datetime_before,
spatial_information=spatial_information,
temporal_format_by_dataname=temporal_format_by_dataname,
extension_properties=extension_properties,
linestring=item_geometry_linestring,
logger_properties=self.logger_properties,
)
################################
# - Create STAC-Item
#################################
STAC_creator.STACItem(
xml_url_catalog,
stac_catalog,
dict(item_harvested_details),
recognizer_output.status,
dict(collection_dict)["collection_id"],
aggregated_dataset_url=aggregated_dataset_url,
extension_properties=extension_properties,
asset_properties=asset_properties,
)
if asset_properties is not None and (
asset_properties["collection_thumbnail"]
in [
"wms",
"link",
]
or asset_properties["collection_overview"]
in ["wms", "link"]
):
if (item_harvested_details) is not None:
asset = assets.Assets()
asset.collection(
harvesting_vars=dict(item_harvested_details),
collection_dict=dict(collection_dict),
stac_catalog=stac_catalog,
asset_properties=asset_properties,
logger_properties=self.logger_properties,
)
########################################
# 1. Harvest for none-nested collections
########################################
else:
###########################################
# defining a new function that harvests the
# collections and create STAC-Collections
# - Harvest Collection
###########################################
collection_dict = harvester.CollectionHarvester(
TDS_catalog,
recognizer.status,
subdirs=None,
collection_tuples=None,
auth=None,
logger_properties=self.logger_properties,
)
###########################
# - Create STAC-Collection
###########################
stac_collection = STAC_creator.STACCollection(
catalog=stac_catalog,
collection_id=dict(collection_dict)["collection_id"],
collection_title=dict(collection_dict)["collection_title"],
collection_description=dict(collection_dict)[
"collection_description"
],
stac_existance_collection=stac_existance_collection,
)
###########################################
# - Loop over collection's items
###########################################
(
xml_url_catalog,
id_catalog,
xml,
) = utils.xml_processing(TDS_catalog, auth)
recognizer_output = Recognizer(
xml_url_catalog, logger_properties=self.logger_properties
)
try:
tree = etree.XML(xml)
except BaseException:
return
data_counted = 0
for data in tqdm(
tree.findall(".//{%s}dataset[@urlPath]" % constants.unidata),
colour="red",
):
data_counted += 1
if limited_number is not None:
if data_counted > limited_number:
break
if (
utils.replacement_func_collection_item_id(
utils.replacement_func(data.get("ID"))
)
in stac_collection["existed_items_id_list"]
):
continue
# TODO: we should get this dictionary from the JSON file
harvesting_vars = { # type: ignore
"item_id": None,
"description": None,
"horizontal_ids_lat": None,
"horizontal_ids_lon": None,
"horizontal_axis_x": None,
"horizontal_axis_y": None,
"horizontal_extent_lon_min": None,
"horizontal_extent_lon_max": None,
"horizontal_extent_lat_min": None,
"horizontal_extent_lat_max": None,
"horizontal_description_lon": None,
"horizontal_description_lat": None,
"horizontal_reference_system": None,
"vertical_ids": None,
"vertical_axis": None,
"vertical_extent_upper": None,
"vertical_extent_lower": None,
"vertical_description": None,
"temporal_id": None,
"temporal_axis": None,
"temporal_extent_start_datetime": None,
"temporal_extent_end_datetime": None,
"temporal_description": None,
"variable_description": None, # Description of each variable
"variable_dimensions": None, # dimention of each variable
"variable_ids": None, # Variable names
"variable_unit": None, # Variable units
"variable_types": None, # Variable types
"services": None,
"dataset": None,
"catalog_url": None,
"main_dataset_url": None,
"catalog_id": None,
"item_bbox": None,
"item_footprint": None,
"collection_bbox": None,
"collection_footprint": None,
"collection_footprint_point": None,
"collection_interval_time": None,
"modified_date_time": None,
"collection_interval_time_final": None,
}
harvesting_vars["collection_interval_time"] = ( # noqa
dict(item_harvested_details)["collection_interval_time"] if item_harvested_details is not None else [] # type: ignore
)
harvesting_vars["collection_interval_time_final"] = ( # noqa
dict(item_harvested_details)["collection_interval_time_final"] if item_harvested_details is not None else [] # type: ignore
)
harvesting_vars["collection_bbox"] = ( # noqa
dict(item_harvested_details)["collection_bbox"] if item_harvested_details is not None else [] # type: ignore
)
harvesting_vars["collection_footprint"] = ( # noqa
dict(item_harvested_details)["collection_footprint"] if item_harvested_details is not None else None # type: ignore
)
harvesting_vars["collection_footprint_point"] = ( # noqa
dict(item_harvested_details)["collection_footprint_point"] if item_harvested_details is not None else None # type: ignore
)
################################
# - Harvest Item
#################################
item_harvested_details = harvester.ItemHarvester(
xml_url_catalog,
data,
harvesting_vars,
webservice_properties,
datetime_after=datetime_after,
datetime_before=datetime_before,
spatial_information=spatial_information,
temporal_format_by_dataname=temporal_format_by_dataname,
extension_properties=extension_properties,
linestring=item_geometry_linestring,
logger_properties=self.logger_properties,
)
################################
# - Create STAC-Item
#################################
STAC_creator.STACItem(
xml_url_catalog,
stac_catalog,
dict(item_harvested_details),
recognizer_output.status,
dict(collection_dict)["collection_id"],
aggregated_dataset_url=aggregated_dataset_url,
extension_properties=extension_properties,
asset_properties=asset_properties,
)
if asset_properties is not None and (
asset_properties["collection_thumbnail"]
in [
"wms",
"link",
]
or asset_properties["collection_overview"] in ["wms", "link"]
):
if (item_harvested_details) is not None:
asset = assets.Assets()
asset.collection(
harvesting_vars=dict(item_harvested_details),
collection_dict=dict(collection_dict),
stac_catalog=stac_catalog,
asset_properties=asset_properties,
logger_properties=self.logger_properties,
)
#############################
# Saving STAC catalog
#############################
self.logger_properties["logger_level"] = "DEBUG"
self.logger_properties["logger_msg"] = "Harvesting is Finished!"
logger.Logger(self.logger_properties)
self.logger_properties["logger_level"] = "DEBUG"
self.logger_properties[
"logger_msg"
] = "Creating STAC-Metadata is started!"
logger.Logger(self.logger_properties)
STAC_creator.SaveCatalog(catalog=stac_catalog, catalog_dir=stac_dir)
self.logger_properties["logger_level"] = "DEBUG"
self.logger_properties[
"logger_msg"
] = "Creating STAC-Metadata is finished!"
logger.Logger(self.logger_properties)