Source code for xcmor.xcmor

import collections
import re
from datetime import datetime, timezone

# from warnings import warn
import cf_xarray as cfxr  # noqa
import xarray as xr
from xarray import DataArray

from .log import get_logger
from .mapping import dtype_map
from .resources import get_project_tables
from .rules import rules
from .tests.tables import coords as coords_default
from .tests.tables import grids as grids_default
from .utils import cf_table, key_by_attr, posix_to_python_regex, read_tables

logger = get_logger(__name__)


def _update_attrs(obj):
    pass


def _transpose(ds):
    """Transpose dataset to COARDS convention"""
    axis = ["T", "Z", "Y", "X"]
    cf_dims = list(ds.cf.dims.keys())
    order = [ax for ax in axis if ax in cf_dims]
    if order:
        logger.debug(f"transposing order: {order}")
        return ds.cf.transpose(*order, ...)
    return ds


def _encode_time(ds, cf_units=None):
    """Encode time units and calendar"""
    time = ds.cf["time"]
    cf_units = cf_units or time.attrs.get("units") or time.encoding.get("units")
    # print(time.name, cf_units)
    if cf_units is None:
        cf_units = "days since ?"
    else:
        del time.attrs["units"]

    start_format = "%Y-%m-%dT%H:%M:%S"

    # check if time is datetime-like, maybe there is a better way?
    # decode times if not datetime-like
    try:
        start_str = f"{time[0].dt.strftime(start_format).item()}"
        units = cf_units.replace("?", start_str)
        logger.debug(f"setting time units: {units}")
        time.encoding["units"] = units
    except (AttributeError, TypeError):
        cf_units = cf_units.replace("?", "1950")
        logger.warning(
            f"time axis does not seem to be datetime-like, encoding with units '{cf_units}'"
        )
        ds.time.attrs["units"] = cf_units  # .replace("?", "1950")
        ds = xr.decode_cf(ds, decode_times=True, decode_coords=False)
        time = ds.time

    if time.attrs.get("type"):
        time.encoding["dtype"] = dtype_map[time.attrs["type"]]

    return time


def _units_convert(da, format=None):
    """Use pint_xarray to convert units"""
    import pint_xarray  # noqa
    from cf_xarray.units import units  # noqa

    if format is None:
        format = "cf"
    if units.Unit(da.original_units) != units.Unit(da.units):
        logger.warn(
            f"converting units {da.original_units} from input data to CF units {da.units}"
        )
        da_quant = da.pint.quantify(da.original_units)
        da = da_quant.pint.to(da.units).pint.dequantify(format=format)
        da.attrs["history"] = (
            f"original data with units {da.original_units} converted to {da.units}"
        )
    return da


def _remove_bounds_attrs(obj):
    """Remove bounds variable attributes because they shouldn't have any"""
    for k in obj.cf.bounds:
        obj.cf.get_bounds(k).attrs = {}
        obj.cf.get_bounds(k).encoding = {}
    return obj


def _get_x_y_coords(obj):
    """Guess linear X and Y coordinates"""
    obj = obj.cf.guess_coord_axis()
    # obj = _remove_bounds_attrs(obj)

    X = None
    Y = None
    # cfxr finds the X and Y coordinates right away
    try:
        # if "X" in obj.cf.coords and "Y" in obj.cf.coords:
        X = obj.cf["X"]
        Y = obj.cf["Y"]
    except KeyError as e:
        logger.warning(e)
        # cfxr finds longitude and latitude, let's check if they are 1D
        lon = obj.cf["longitude"]
        lat = obj.cf["latitude"]
        if lon.ndim == 1 and lat.ndim == 1:
            X = lon
            Y = lat
    # ensure the attributes to make CF conform
    if X is not None and Y is not None:
        X.attrs["axis"] = "X"
        Y.attrs["axis"] = "Y"
    else:
        logger.error("could not find X and Y coordinates")
        raise Exception("could not find X and Y coordinates")
    return X, Y


def _get_lon_lat_coords(obj):
    """Return lon and lat extracted from ds

    Use cf_xarray to identify longitude and latitude coordinates.
    Might be 1D or 2D coordinates.

    """
    obj = obj.copy().cf.guess_coord_axis()
    try:
        lon = obj.cf["longitude"]
        lat = obj.cf["latitude"]
    except KeyError:
        raise KeyError("could not identify longitude/latitude")

    return lon, lat


def _is_curvilinear(obj):
    """Check for curvilinear

    Pretty naive definition here, curvilinear for us here simply
    means if longitude and latitude are not 1D coordinates.

    """
    lon, lat = _get_lon_lat_coords(obj)
    return lon.ndim > 1 and lat.ndim > 1


def _guess_dims_attr(obj):
    """Try to guess dimensions attribute"""
    obj = obj.copy().cf.guess_coord_axis()
    dimensions = []
    try:
        lon, lat = _get_lon_lat_coords(obj)
        logger.debug(f"guessing longitude, latitude: {lon.name}, {lat.name}")
        dimensions.extend(["longitude", "latitude"])
    except KeyError:
        logger.warning(
            f"Could not guess longitude and latitude coordinates from {list(obj.coords)}"
        )
    if "Z" in obj.cf.coords:
        dimensions.append(obj.cf.coords["Z"].name)
    if "time" in obj.cf.coords:
        dimensions.append("time")
    return dimensions


def _add_var_attrs(ds, mip_table):
    """add variable attributes"""

    for var in ds.data_vars:
        da = ds[var]
        mip_entry = mip_table[var]
        for k, v in mip_entry.items():
            if k in da.attrs and da.attrs[k] != v:
                # warn if we overvwrite conflicting attributes
                logger.warn(
                    f"{var}: overwriting conflicting value '{da.attrs[k]}' of attribute '{k}' with value '{v}' from mip table."
                )
                if k == "units":
                    # keep original units for later interpretation
                    da.attrs["original_units"] = v
            da.attrs[k] = v

        # derive global attributes
        ds.attrs["variable_id"] = mip_entry.get("out_name") or var

        if mip_entry.get("frequency"):
            ds.attrs["frequency"] = mip_entry["frequency"]
            del da.attrs["frequency"]
        if mip_entry.get("modeling_realm"):
            ds.attrs["realm"] = mip_entry["modeling_realm"]
            del da.attrs["modeling_realm"]

    return ds


def _interpret_var_attrs(ds, mip_table):
    """Apply variable attributes found in the mip table.

    This will interpret attributes found in the mip table, e.g.,
    valid_min, valid_max, convert dtypes, etc...
    Once attributes were interpreted they are removed from the
    variables attributes dictionary.

    """

    for v in ds.data_vars:
        attrs = ds[v].attrs.copy()
        for attr in attrs:
            if hasattr(rules, attr):
                ds = getattr(rules, attr)(ds, v)

        da = ds[v]
        # handle units
        if "original_units" in da.attrs:
            da = _units_convert(da)
        ds = ds.assign({da.name: da})

    return ds


def _interpret_coord_attrs(ds, time_units=None):
    """Apply coordinates attributes.

    This will interpret attributes found in the mip table, e.g.,
    valid_min, valid_max, convert dtypes, etc...
    Once attributes were interpreted they are removed from the
    variables attributes dictionary.

    """

    for v in ds.coords:
        # logger.debug(f"interpreting coordinate attributes: {v}")
        da = ds.coords[v]
        for attr in da.attrs.copy():
            if hasattr(rules, attr):
                # logger.debug(f"interpreting attribute: {attr}")
                apply_rule = getattr(rules, attr)
                ds = apply_rule(ds, v)
        # ds = ds.assign_coords({da.name: da})

    if "time" in ds:
        # time = _encode_time(ds, time_units)
        ds = ds.assign_coords(time=_encode_time(ds, time_units))

    return ds


def _find_coord_key(da, axis_entry):
    """find datarray coordinate by cf attributes from coordinates table"""

    keys = ["out_name", "standard_name", "axis"]
    for k in keys:
        if axis_entry[k] in da.cf.coords or axis_entry[k] in da.coords:
            # print(f"found {v[k]} by {k}")
            return axis_entry[k]
    return None


def _add_coord_attrs(da, axis_entry):
    """Add coordinate attributes from coordinates table"""

    out_name = axis_entry["out_name"]
    coord_key = out_name
    logger.debug(f"adding coordinate attribtes: {out_name}")
    if coord_key not in da.coords:
        coord_key = _find_coord_key(da, axis_entry)

    if coord_key is None:
        # we could not find the coordinate in the dataset
        logger.info(f"adding coordinate: {out_name}")
        value = float(axis_entry["value"])
        da = da.assign_coords({out_name: DataArray(value)})
    elif coord_key != out_name:
        # rename coord key to actual coordinate out_name
        logger.debug(f"renaming coordinate: {coord_key} to {out_name}")
        da = da.cf.rename({coord_key: out_name})

    # add required attributes
    da.coords[out_name].attrs = {k: v for k, v in axis_entry.items() if v}

    # dims = da.coords[out_name].dims

    ## this is a coordinate variable (not auxilliary), swap dims
    # if len(dims) == 1:
    #    da = da.swap_dims({dims[0]: out_name})

    return da


def _apply_dims(da, dims):
    """Apply dimensions from coordinates table

    Parameters
    ----------
    da : DataArray, Dataset
        DataArray of which coordinates should be cmorized.
    dims : dict
        Dictionary with dimension names a keys and cmor coordinate
        table entries as values.

    """
    # d is a cmor coordinate table key, v is the coordinates table entry
    for d, v in dims.items():
        if v:
            logger.debug(f"{d}, {v}")
            da = _add_coord_attrs(da, v)
        else:
            logger.warning(f"found no coordinate attributes for coordinate '{d}'")
        # we find the coordinate already by its correct cf out_name
        # if v["out_name"] in da.coords:
        #     da = _add_coord_attrs(da, d, v)
        #     continue

        # # search for a coordinate by attributes (using cf_xarray)
        # keys = ["out_name", "standard_name", "axis"]
        # for k in keys:
        #     if v[k] in da.cf.coords or v[k] in da.coords:
        #         # print(f"found {v[k]} by {k}")
        #         da = _add_coord_attrs(da, v[k], v)
        #         break

        # # seems to be a scalar coordinate that we need to create
        # if v["out_name"] not in da.coords:
        #     logger.info(f"adding coordinate: {d}")
        #     value = float(v["value"])
        #     coord = DataArray(value)
        #     dtype = v["type"]
        #     coord = DataArray(value).astype(dtype)
        #     da = da.assign_coords({v["out_name"]: coord})
        #     da.coords[v["out_name"]].attrs = v

    return da


def _find_table_entry(table, value):
    entry = table.get(value)
    if entry is None:
        # could not find by key, search by attributes
        attrs = ["axis", "out_name", "standard_name"]
        for attr in attrs:
            keys = key_by_attr(table, attr, value)
            if keys and len(keys) == 1:
                logger.debug(
                    f"found value '{value}' as attribute '{attr}' in key '{keys}'"
                )
                entry = table[keys[0]]
                break
            elif keys:
                logger.debug(
                    f"found several values '{value}' as attribute '{attr}' in keys '{keys}'"
                )

    if entry is None:
        logger.error(f"Could not find any unique entry with attribute value '{value}'")

    return entry


def _interpret_var_dims(ds, coords_table, grids_table=None, drop=False):
    """Interpret variable dimensions attribute.

    This will look up the dimensions defined for variables
    in the mip table and update coordinates acoording to
    meta data in the coordinates table and grids table.

    See also: https://cfconventions.org/cf-conventions/cf-conventions.html#coordinate-system

    """
    all_dims = []
    auxiliary = False
    has_xy = False
    has_lonlat = False
    curvilinear = _is_curvilinear(ds)
    has_grid_mapping = ds.cf.grid_mapping_names != {}

    logger.debug(f"curvilinear: {curvilinear}")
    logger.debug(f"has_grid_mapping: {has_grid_mapping}")

    x, y = _get_x_y_coords(ds)
    has_xy = x is not None and y is not None

    if not has_xy:
        message = "Input dataset should have 1D linear coordinates!"
        logger.critical(message)
        raise Exception(message)

    lon, lat = _get_lon_lat_coords(ds)
    has_lonlat = lon is not None and lat is not None

    logger.debug(f"has lonlat: {has_lonlat}")
    logger.debug(f"has xy: {has_xy}")
    logger.debug(f"x-axis, y-axis: {x.name}, {y.name}")
    logger.debug(f"longitude, latitude: {lon.name}, {lat.name}")

    # check if lon lat are auxilliary coordinates
    if not (lon.equals(x) and lat.equals(y)):
        auxiliary = True

    logger.debug(f"auxiliary coordinates: {auxiliary}")

    if auxiliary is True:
        # coords = (
        #     coords_table
        #     | grids_table.get("variable_entry")
        #     | grids_table.get("axis_entry")
        # )
        # lon_entry = _find_table_entry(grids_table.get("variable_entry"), "longitude")
        # lat_entry = _find_table_entry(grids_table.get("variable_entry"), "latitude")
        x_entry = _find_table_entry(grids_table["axis_entry"], x.name)
        y_entry = _find_table_entry(grids_table["axis_entry"], y.name)
        ds[x.name].attrs = x_entry
        ds[y.name].attrs = y_entry
    else:
        # lon_entry = _find_table_entry(coords_table, lon.name)
        # lat_entry = _find_table_entry(coords_table, lat.name)
        x_entry = None
        y_entry = None

    if auxiliary is True and not has_grid_mapping:
        logger.warning(
            "no grid mapping found although the dataset seems to have auxilliary coordinates"
        )

    # combine coordinates and grids table
    if grids_table and auxiliary is True:
        coords_table = (
            coords_table
            | grids_table.get("variable_entry")
            | grids_table.get("axis_entry")
        )

    for var in ds.data_vars:
        dims = ds[var].attrs.get("dimensions")
        if not dims:
            dims = _guess_dims_attr(ds[var])
            logger.debug(f"guessing dims of {var}: {dims}")
        else:
            del ds[var].attrs["dimensions"]
            dims = dims.split()

        logger.debug(f"{var}: {dims}")

        dims = {d: coords_table.get(d) or {"out_name": d} for d in dims}

        ds = _apply_dims(ds, dims)

        # add coordinates attribute, e.g., for 0D coordinate variables
        # e.g., height2m, if not a dataarray index:
        coordinates = " ".join(
            [
                d["out_name"]
                for d in dims.values()
                if d["out_name"] not in ds[var].indexes and d["out_name"] in ds.coords
            ]
        )

        if coordinates:
            # Set coordinates attribute
            ds[var].attrs["coordinates"] = coordinates
            # remove encoding entry if present
            if "coordinates" in ds[var].encoding:
                del ds[var].encoding["coordinates"]

        all_dims.extend([v["out_name"] for v in dims.values()])

        if curvilinear:
            x, y = _get_x_y_coords(ds)
            logger.debug(f"X/Y: {x.name}/{y.name}")

    logger.debug(f"added coordinates: {list(all_dims)}")
    # drop unneccessary coordinates
    if drop is True:
        drops = [c for c in ds.coords if c not in list(all_dims)]
        logger.debug(f"dropping coordinates: {drops}")
        ds = ds.drop(drops)

    return ds


def _add_version_attr(ds):
    """add version attribute"""
    now = datetime.now().strftime("%Y%m%d")
    ds.attrs["version"] = now

    return ds


def _add_creation_date(ds):
    """add version attribute"""
    now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S%Z")
    ds.attrs["creation_date"] = now

    return ds


def _update_global_attrs(ds, dataset_table):
    ds.attrs.update(
        {
            k: v
            for k, v in dataset_table.items()
            if (not k.startswith("#") and not k.startswith("_"))
        }
    )

    return ds


def _match_regex(s, pattern):
    pyregex = posix_to_python_regex(pattern)
    pattern = re.compile(pyregex)
    return pattern.fullmatch(s)


def _looks_like_regex(s):
    """
    Heuristically determine if a string looks like a regular expression.

    Returns True if the string contains common regex metacharacters.
    """
    regex_meta = set(".^$*+?{}[]\\|()")
    return any(c in regex_meta for c in s)


def _check_required_global_attributes(ds, cv_table):
    cv = cv_table.get("CV") or cv_table

    req_attrs = cv["required_global_attributes"]

    for attr in req_attrs:
        cv_values = cv.get(attr)
        v = ds.attrs.get(attr)
        logger.debug(f"Checking global attribute '{attr}' with value '{v}'")
        if not v:
            logger.error(f"global {attr} not found but required")
        elif not cv_values:
            logger.debug(
                f"Found global attribute '{attr}' with value '{v[0:50]}...' which has no specific requirements"
            )
        elif cv_values and v in list(cv_values):
            logger.debug(f"Found valid value '{v[0:50]}...' for '{attr}'")
        elif (
            isinstance(cv_values, list)
            and len(cv_values) == 1
            and _looks_like_regex(cv_values[0])
        ):
            if not _match_regex(v, cv_values[0]):
                logger.error(
                    f"global attribute '{attr}' has value '{v[0:50]}...' which does not match expected regex '{cv_values[0]}'"
                )
        elif cv_values and v not in list(cv_values):
            logger.error(
                f"global attribute '{attr}' has value '{v[0:50]}...' which is not one of the valid values: {str(list(cv_values))[0:50]}..."
            )


def _add_derived_attrs(ds, cv_table):
    """Add derived global attributes from CV

    Attributes in the CV table might contain derived
    attributes that we add automatically.


    """
    cv = cv_table.get("CV") or cv_table

    req_attrs = cv["required_global_attributes"]

    for attr in req_attrs:
        actual_value = ds.attrs.get(attr)
        cv_values = cv.get(attr)
        if isinstance(cv_values, dict) and actual_value in cv_values.keys():
            ds = _add_derived_attr(ds, attr, cv_values.get(actual_value))

    return ds


def _add_derived_attr(ds, attr, cv_values):
    if isinstance(cv_values, str) and attr.endswith("_id"):
        # for all attributes that end with "*_id", and that
        # have a description in the CV, we add another attribute
        # containing that description, e.g. institution_id
        v = cv_values
        k = attr.replace("_id", "")
        logger.debug(f"for attribute '{k}' --> add value '{v}'")
        ds.attrs[k] = v
        return ds

    if isinstance(cv_values, str):
        # for all attributes that have a description in the CV
        # we add another attribute ending on "*_info" that
        # adds the description, .e.g, frequency
        v = cv_values
        k = attr + "_info"
        logger.debug(f"for attribute '{k}' --> add value '{v}'")
        ds.attrs[k] = v
        return ds

    for k, v in cv_values.items():
        # if cv_values is a dict,
        actual_value = ds.attrs.get(k)
        if isinstance(v, list) and actual_value:
            if actual_value not in v:
                logger.warn(
                    f"attribute '{attr}' has value '{ds.attrs.get(attr)}' but attribute '{k}' has value '{actual_value}' which is not in the list of expected values: {v}"
                )
        elif isinstance(v, str) and actual_value is None:
            message = f"attribute '{attr}' has value '{ds.attrs.get(attr)}' and requires attribute '{k}' to be set to '{v}'"
            logger.info(message)
            ds.attrs[k] = v
        elif isinstance(v, str) and actual_value:
            if actual_value != v:
                logger.warn(
                    f"attribute '{attr}' has value '{ds.attrs.get(attr)}' but attribute '{k}' is set to '{actual_value}' but CV requires '{v}'!"
                )
            ds.attrs[k] = v

    return ds


def _add_header_attrs(ds, header, cv_table=None):
    default_header_attrs = ["table_id", "realm", "product", "mip_era", "Conventions"]

    if cv_table:
        cv = cv_table.get("CV") or cv_table
        header_attrs = [
            a for a in header.keys() if a in cv["required_global_attributes"]
        ]
    else:
        header_attrs = default_header_attrs

    if "table_id" in header_attrs:
        header["table_id"] = header["table_id"].split()[-1]

    ds.attrs.update({k: header[k] for k in header_attrs})

    return ds


def _swap_dims(ds):
    """ensure all 1D coordinates to be dimension coordinates"""
    swaps = {}
    for coord in ds.coords:
        # this is a coordinate variable (not auxilliary), swap dims
        dims = ds.coords[coord].dims
        if len(dims) == 1 and dims[0] != coord:
            logger.debug(f"coord: {coord}")
            swaps[dims[0]] = coord
    logger.info(f"swap dims: {swaps}")
    return ds.swap_dims(swaps)


[docs] @read_tables( tables=["mip_table", "coords_table", "dataset_table", "cv_table", "mapping_table"] ) def cmorize( ds, mip_table=None, coords_table=None, dataset_table=None, cv_table=None, grids_table=None, mapping_table=None, guess=True, time_units=None, transpose=True, decode=True, ): """Lazy cmorization. Cmorizes an xarray Dataset or DataArray object. The cmorizations tries to follow the approach of the original `cmor <https://github.com/PCMDI/cmor>`_ library in adding, manipulating and interpreting dataseta attributes and cmor table vocabulary. All input table arguments (``*_table``) can either be a dictionary or a path to a cmor table in json or yaml format. Parameters ---------- ds : DataArray, Dataset Dataset that should be cmorized. mip_table : dict, str MIP table coords_table : dict, str The cmor coordinates table. dataset_table : dict, str The input dataset cmor table. cv_table: dict, str The controlled vocabulary table. grids_table: dict, str The grids table. mapping_table: dict The mapping table maps input variable names to mip table variable keys. time_units: str Time units for NetCDF encoding. Default is ``days since`` the beginning of the time interval. transpose: logical Transpose dataset to COARDS conventions if neccessary. decode: logical Decode output dataset, e.g., to interpret coordinates attributes. If ``decode=True``, ``xr.decode_cf`` will be applied on the output dataset. Returns ------- Cmorized Dataset. """ ds = ds.copy() # ensure dataset if isinstance(ds, DataArray): ds = ds.to_dataset() # ensure grid mappings and bounds in coords, not in data_vars # so that cf_xarray can understand everything... ds = xr.decode_cf(ds, decode_coords="all") # bounds variables should not have any attributes ds = _remove_bounds_attrs(ds) if mip_table is None: logger.debug("using default cf variable table") mip_table = cf_table().to_dict(orient="index") if coords_table is None: logger.debug("using default coords table") coords_table = coords_default if ds.cf.grid_mapping_names or _is_curvilinear(ds): logger.debug(f"grid mappings: {ds.cf.grid_mapping_names}") logger.debug(f"requires grid mapping: {_is_curvilinear(ds)}") grids_table = grids_table or grids_default if guess is True: ds = ds.cf.guess_coord_axis(verbose=True) if mapping_table is not None: ds = ds.rename_vars({v: (mapping_table.get(v) or v) for v in ds}) # add variable attributes from mip table entries ds = _add_var_attrs(ds, mip_table.get("variable_entry") or mip_table) # interprets variable attributes ds = _interpret_var_attrs(ds, mip_table.get("variable_entry") or mip_table) if coords_table: ds = _interpret_var_dims( ds, coords_table.get("axis_entry") or coords_table, grids_table ) ds = _interpret_coord_attrs(ds, time_units) # ensure all 1D coordinates to be dimension coordinates ds = _swap_dims(ds) if dataset_table: ds = _update_global_attrs(ds, dataset_table) ds = _add_version_attr(ds) ds = _add_creation_date(ds) if mip_table.get("Header"): ds = _add_header_attrs(ds, mip_table.get("Header"), cv_table) if cv_table: ds = _add_derived_attrs(ds, cv_table) _check_required_global_attributes(ds, cv_table) # sort attributes ds.attrs = collections.OrderedDict(sorted(ds.attrs.items())) # transpose to COARDS if transpose is True: ds = _transpose(ds) if decode is True: ds = xr.decode_cf(ds) return ds
[docs] class Cmorizer: """ Cmorizer class supporting preconfigured MIPs. Parameters ---------- project : str, optional Pre-configures MIP, e.g., - CMIP6 - CORDEX url : str, optional Base URL or directory of cmor tables. template : str, optional CMOR talbe naming template, e.g.:: CMIP6_{table_id}.json e.g. CMIP6_Amon.json Returns ------- cmorizer : Cmorizer object. """
[docs] def __init__(self, project=None, url=None, template=None): self._init_tables(project, url, template)
[docs] def _init_tables(self, project, url, template): if project is None and url is None: self.project = "CMIP6" else: self.project = project self.tables = get_project_tables(url, self.project, template)
@property def required(self): """List required global attributes.""" return self.tables.cv["CV"].get("required_global_attributes")
[docs] def cmorize( self, ds, mip_table, dataset_table, mapping_table=None, time_units=None, **kwargs, ): """Lazy cmorization. Cmorizes an xarray Dataset or DataArray object. The cmorizations tries to follow the approach of the original `cmor <https://github.com/PCMDI/cmor>`_ library in adding, manipulating and interpreting dataseta attributes and cmor table vocabulary. Parameters ---------- ds : DataArray, Dataset Dataset that should be cmorized. mip_table : dict, str The MIP table, can either be a dictionary or a path to a cmor table in json format or a table_id from the MIP. dataset_table : dict, str The input dataset cmor table, can either be a dictionary or a path to a cmor table in json format. mapping_table: dict The mapping table maps input variable names to mip table variable keys. time_units: str Time units for NetCDF encoding. Default is ``days since`` the beginning of the time interval. Returns ------- Cmorized Dataset. Examples -------- >>> from xcmor.datasets import reg_ds >>> from xcmor import Cmorizer >>> >>> cmor = Cmorizer() >>> ds_out = cmor.cmorize( ... reg_ds.rename(temperature="tas").tas, ... "Amon", ... cmor.tables["input_example"], ... ) """ if not isinstance(mip_table, dict): mip_table = self.tables[mip_table] return cmorize( ds, mip_table=mip_table, dataset_table=dataset_table, coords_table=self.tables.coords, cv_table=self.tables.cv, grids_table=self.tables.grids, mapping_table=mapping_table, time_units=time_units, **kwargs, )