Source code for tds2stac.utils

# SPDX-FileCopyrightText: 2023 Karlsruher Institut für Technologie
#
# SPDX-License-Identifier: CC0-1.0

import os
from urllib import parse as urlparse
from urllib.request import (
    HTTPBasicAuthHandler,
    HTTPPasswordMgrWithDefaultRealm,
    build_opener,
)

import requests


[docs] def replacement_func(url): """A function for making a an id from catalog URL for collections""" split_arr = urlparse.urlsplit(url) trans_dict_path = {"%": "", "/catalog.xml": "", "/thredds/": "", "/": " "} path_ = split_arr.path query_ = split_arr.query for k, v in trans_dict_path.items(): temp = path_.split(k) path_ = v.join(temp) if split_arr.query != "": trans_dict_query = {"=": " ", "?": " ", "/": " "} for k, v in trans_dict_query.items(): temp = query_.split(k) query_ = v.join(temp) replaced_url = path_.replace(".xml", "") + " " + query_ trans_dict_replaced = {".nc": "", "-": " ", "_": " "} for k, v in trans_dict_replaced.items(): temp = replaced_url.split(k) replaced_url = v.join(temp) else: replaced_url = path_ trans_dict_replaced = {".xml": "", ".nc": "", "-": " ", "_": " "} for k, v in trans_dict_replaced.items(): temp = replaced_url.split(k) replaced_url = v.join(temp) replaced_url = replaced_url.replace(".", " ") return replaced_url.title()
[docs] def replacement_func_collection_item_id(url): """A function for making a an id from catalog URL for collections""" split_arr = urlparse.urlsplit(url) trans_dict_path = {"%": "", "/catalog.xml": "", "/thredds/": "", "/": " "} path_ = split_arr.path query_ = split_arr.query for k, v in trans_dict_path.items(): temp = path_.split(k) path_ = v.join(temp) if split_arr.query != "": trans_dict_query = {"=": " ", "?": " ", "/": " "} for k, v in trans_dict_query.items(): temp = query_.split(k) query_ = v.join(temp) replaced_url = path_.replace(".xml", "") + " " + query_ trans_dict_replaced = {".nc": "", "-": " ", "_": " "} for k, v in trans_dict_replaced.items(): temp = replaced_url.split(k) replaced_url = v.join(temp) else: replaced_url = path_ trans_dict_replaced = {".xml": "", ".nc": "", "-": " ", "_": " "} for k, v in trans_dict_replaced.items(): temp = replaced_url.split(k) replaced_url = v.join(temp) replaced_url = replaced_url.lower() replaced_url = replaced_url.replace(" ", "_") replaced_url = replaced_url.replace(".", "_") return replaced_url
[docs] def html2xml(url): """A function for making a an xml URL from html URL""" u = urlparse.urlsplit(url) path, extension = os.path.splitext(u.path) if extension == ".html": u = urlparse.urlsplit(url.replace(".html", ".xml")) return u.geturl()
[docs] def xml2html(url): """A function for making a an html URL from xml URL""" u = urlparse.urlsplit(url) path, extension = os.path.splitext(u.path) if extension == ".xml": u = urlparse.urlsplit(url.replace(".xml", ".html")) return u.geturl()
[docs] def get_xml(url, request_properties): """A function for getting XML content from url""" try: if request_properties is not None and request_properties != {}: xml_url = requests.get( url, None, auth=request_properties["auth"], verify=request_properties["verify"], timeout=request_properties["timeout"], ) xml = xml_url.text.encode("utf-8") return xml else: xml_url = requests.get(url) xml = xml_url.text.encode("utf-8") return xml except BaseException: pass return None
[docs] def references_urls(url, additional): split_arr = urlparse.urlsplit(url) common_url = str(split_arr.scheme) + "://" + str(split_arr.netloc) without_catalog_xml = urlparse.urljoin( common_url, os.path.split(split_arr.path)[0] ) if not additional: final_url = url elif additional[:4] == "http": # finding http or https final_url = additional elif additional[0] == "/": # Absolute paths final_url = urlparse.urljoin(common_url, additional) else: # Relative paths. final_url = without_catalog_xml + "/" + additional return final_url
[docs] def xml_processing(catalog, request_properties): """A function for getting out XML details of a catalog URL""" catalog_xml = html2xml(catalog) catalog_id = replacement_func(catalog_xml) xml_final = get_xml(catalog_xml, request_properties) return catalog_xml, catalog_id, xml_final
[docs] def xml_tag_name_ncml(input_xml, var_name): """A function for finding the tag names in NcML XML files""" # A list for recognizign the exceptions. # This list contains variable's name with same `input_xml.tag` and different `var_name`s exception_list = ["var_lists", "var_dims", "var_descs", "keyword"] if var_name in exception_list: return input_xml.tag + "_" + var_name else: return str(input_xml.get("name")) + "_" + var_name
[docs] def xml_tag_finder(input_xml, web_service, var_name): """A function for finding the tag names in all TDS webservices""" tag_finder_dict = { "iso": input_xml.tag + "_" + var_name, "ncml": xml_tag_name_ncml(input_xml, var_name), "wms": input_xml.tag + "_" + var_name, } return tag_finder_dict.get( web_service, )
[docs] def validate_catalog_url(url, requests_properties): """A function for validating the catalog URL""" try: if requests_properties is not None and requests_properties != {}: xml_url = requests.get( url, None, auth=requests_properties["auth"], verify=requests_properties["verify"], timeout=requests_properties["timeout"], ) if xml_url.status_code == 200: return True else: return False else: xml_url = requests.get(url) if xml_url.status_code == 200: return True else: return False except BaseException: return False
[docs] def merge_bboxes(bbox1, bbox2): x1_1, y1_1, x2_1, y2_1 = bbox1 x1_2, y1_2, x2_2, y2_2 = bbox2 new_x1 = min(x1_1, x1_2) new_y1 = min(y1_1, y1_2) new_x2 = max(x2_1, x2_2) new_y2 = max(y2_1, y2_2) return (new_x1, new_y1, new_x2, new_y2)
[docs] def merge_intervals(interval1, interval2): start1, end1 = interval1 start2, end2 = interval2 merged_start = min(start1, start2) merged_end = max(end1, end2) return (merged_start, merged_end)
[docs] def opener_module(service_url, requests_properties): # Set your URL, username, and password username = requests_properties["auth"][0] password = requests_properties["auth"][1] # Create a password manager password_mgr = HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(None, service_url, username, password) # Create an opener that will replace the default urlopen method on further calls handler = HTTPBasicAuthHandler(password_mgr) opener = build_opener(handler) opener.open(service_url)