Skip to content

utils

deployer.utils.models

create_model_from_func

create_model_from_func(
    func: Callable,
    model_name: Optional[str] = None,
    type_converter: Optional[TypeConverterType] = None,
    exclude_defaults: bool = False,
) -> CustomBaseModel

Create a Pydantic model from pipeline parameters.

Source code in deployer/utils/models.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def create_model_from_func(
    func: Callable,
    model_name: Optional[str] = None,
    type_converter: Optional[TypeConverterType] = None,
    exclude_defaults: bool = False,
) -> CustomBaseModel:
    """Create a Pydantic model from pipeline parameters."""
    if model_name is None:
        model_name = func.__name__

    if type_converter is None:
        type_converter = _dummy_type_converter

    func_signature = signature(func)

    func_typing = {
        p.name: (
            type_converter(p.annotation),
            ... if (exclude_defaults or p.default == Parameter.empty) else p.default,
        )
        for p in func_signature.parameters.values()
    }

    func_model = create_model(
        model_name,
        __base__=CustomBaseModel,
        **func_typing,
    )

    return func_model

deployer.utils.utils

make_enum_from_python_package_dir

make_enum_from_python_package_dir(
    dir_path: Path, raise_if_not_found: bool = False
) -> Enum

Create an Enum of file names without extention from a directory of python modules.

Source code in deployer/utils/utils.py
19
20
21
22
23
24
25
26
27
def make_enum_from_python_package_dir(dir_path: Path, raise_if_not_found: bool = False) -> Enum:
    """Create an Enum of file names without extention from a directory of python modules."""
    dir_path_ = Path(dir_path)
    if raise_if_not_found and not dir_path_.exists():
        raise FileNotFoundError(f"Directory {dir_path_} not found.")
    file_paths = dir_path_.glob("*.py")
    enum_dict = {x.stem: x.stem for x in file_paths if x.stem != "__init__"}
    file_names_enum = Enum(dir_path_.stem, enum_dict)
    return file_names_enum

import_pipeline_from_dir

import_pipeline_from_dir(
    dirpath: Path, pipeline_name: str
) -> GraphComponentType

Import a pipeline from a directory.

Source code in deployer/utils/utils.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def import_pipeline_from_dir(dirpath: Path, pipeline_name: str) -> GraphComponentType:
    """Import a pipeline from a directory."""
    dirpath_ = Path(dirpath).resolve().relative_to(Path.cwd())
    parent_module = ".".join(dirpath_.parts)
    module_import_path = f"{parent_module}.{pipeline_name}"  # used with import statements
    module_folder_path = dirpath_ / f"{pipeline_name}.py"  # used as a path to a file

    try:
        pipeline_module = importlib.import_module(module_import_path)
    except ModuleNotFoundError as e:
        raise e
    except Exception as e:
        raise ImportError(
            f"Error while importing pipeline from {module_import_path}: \n    {type(e).__name__}:"
            f"{e} \n\nPotential sources of error:\n"
            f"{filter_lines_from(e.__traceback__, module_folder_path)}"
        ) from e

    try:
        pipeline: GraphComponentType = getattr(pipeline_module, pipeline_name, None)
        if pipeline is None:
            pipeline = pipeline_module.pipeline
            warnings.warn(
                f"Pipeline in `{module_import_path}` is named `pipeline` instead of "
                f"`{pipeline_name}`. This is deprecated and will be removed in a future version. "
                f"Please rename your pipeline to `{pipeline_name}`.",
                FutureWarning,
                stacklevel=1,
            )
    except AttributeError as e:
        raise ImportError(
            f"Pipeline object not found in `{module_import_path}`. "
            "Please check that the pipeline is correctly defined and named."
            f"It should be named `{pipeline_name}` or `pipeline` (deprecated)."
        ) from e

    logger.debug(f"Pipeline {module_import_path} imported successfully.")

    return pipeline

deployer.utils.config

VertexPipelinesSettings

Bases: BaseSettings

Source code in deployer/utils/config.py
21
22
23
24
25
26
27
28
29
class VertexPipelinesSettings(BaseSettings):  # noqa: D101
    model_config = SettingsConfigDict(extra="ignore", case_sensitive=True)

    PROJECT_ID: str
    GCP_REGION: str
    GAR_LOCATION: str
    GAR_PIPELINES_REPO_ID: str
    VERTEX_STAGING_BUCKET_NAME: str
    VERTEX_SERVICE_ACCOUNT: str

load_vertex_settings

load_vertex_settings(
    env_file: Optional[Path] = None,
) -> VertexPipelinesSettings

Load the settings from the environment.

Source code in deployer/utils/config.py
32
33
34
35
36
37
38
39
40
41
42
43
44
def load_vertex_settings(env_file: Optional[Path] = None) -> VertexPipelinesSettings:
    """Load the settings from the environment."""
    try:
        settings = VertexPipelinesSettings(_env_file=env_file, _env_file_encoding="utf-8")
    except ValidationError as e:
        msg = "Validation failed for VertexPipelinesSettings. "
        if env_file is not None:
            msg += f"Please check your `.env` file: `{env_file}`"
        else:
            msg += "No `.env` file provided. Please check your environment variables"
        msg += f"\n{e}"
        raise ValueError(msg) from e
    return settings

list_config_filepaths

list_config_filepaths(
    configs_root_path: Path, pipeline_name: str
) -> List[Path]

List the config filepaths for a pipeline.

Parameters:

Name Type Description Default
configs_root_path Path

A Path object representing the root path of the configs.

required
pipeline_name str

The name of the pipeline.

required

Returns:

Type Description
List[Path]

List[Path]: A list of Path objects representing the config filepaths.

Source code in deployer/utils/config.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def list_config_filepaths(configs_root_path: Path, pipeline_name: str) -> List[Path]:
    """List the config filepaths for a pipeline.

    Args:
        configs_root_path (Path): A `Path` object representing the root path of the configs.
        pipeline_name (str): The name of the pipeline.

    Returns:
        List[Path]: A list of `Path` objects representing the config filepaths.
    """
    configs_dirpath = Path(configs_root_path) / pipeline_name
    config_filepaths = [
        x
        for config_type in ConfigType.__members__.keys()
        for x in configs_dirpath.glob(f"*.{config_type}")
    ]
    return config_filepaths

load_config

load_config(
    config_filepath: Path,
) -> Tuple[Optional[dict], Optional[dict]]

Load the parameter values and input artifacts from a config file.

Config file can be a JSON or Python file. - If JSON, it should be a dict of parameter values. - If Python, it should contain a parameter_values dict and / or an input_artifacts dict.

Parameters:

Name Type Description Default
config_filepath Path

A Path object representing the path to the config file.

required

Returns:

Type Description
Tuple[Optional[dict], Optional[dict]]

Tuple[Optional[dict], Optional[dict]]:: A tuple containing the loaded parameter values and input artifacts (or None if not available).

Raises:

Type Description
UnsupportedConfigFileError

If the file has an unsupported extension.

Source code in deployer/utils/config.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def load_config(config_filepath: Path) -> Tuple[Optional[dict], Optional[dict]]:
    """Load the parameter values and input artifacts from a config file.

    Config file can be a JSON or Python file.
        - If JSON, it should be a dict of parameter values.
        - If Python, it should contain a `parameter_values` dict
        and / or an `input_artifacts` dict.

    Args:
        config_filepath (Path): A `Path` object representing the path to the config file.

    Returns:
        Tuple[Optional[dict], Optional[dict]]:: A tuple containing the loaded parameter values
            and input artifacts (or `None` if not available).

    Raises:
        UnsupportedConfigFileError: If the file has an unsupported extension.
    """
    config_filepath = Path(config_filepath)

    if config_filepath.suffix == ".json":
        with open(config_filepath, "r") as f:
            parameter_values = json.load(f)
        return parameter_values, None

    if config_filepath.suffix == ".toml":
        parameter_values = _load_config_toml(config_filepath)
        return parameter_values, None

    if config_filepath.suffix == ".yaml" or config_filepath.suffix == ".yml":
        parameter_values = _load_config_yaml(config_filepath)
        return parameter_values, None

    if config_filepath.suffix == ".py":
        parameter_values, input_artifacts = _load_config_python(config_filepath)
        return parameter_values, input_artifacts

    raise UnsupportedConfigFileError(
        f"{config_filepath}: Config file extension '{config_filepath.suffix}' is not supported."
        f" Supported config file types are: {', '.join(ConfigType.__members__.values())}"
    )

_load_config_python

_load_config_python(
    config_filepath: Path,
) -> Tuple[Optional[dict], Optional[dict]]

Load the parameter values and input artifacts from a Python config file.

Parameters:

Name Type Description Default
config_filepath Path

A Path object representing the path to the config file.

required

Returns:

Type Description
Tuple[Optional[dict], Optional[dict]]

Tuple[Optional[dict], Optional[dict]]: A tuple containing the loaded parameter values (or None if not available) and input artifacts (or None if not available).

Raises:

Type Description
ValueError

If the config file does not contain a parameter_values and/or input_artifacts dict.

ValueError

If the config file contains common keys in parameter_values and input_artifacts dict.

Source code in deployer/utils/config.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def _load_config_python(config_filepath: Path) -> Tuple[Optional[dict], Optional[dict]]:
    """Load the parameter values and input artifacts from a Python config file.

    Args:
        config_filepath (Path): A `Path` object representing the path to the config file.

    Returns:
        Tuple[Optional[dict], Optional[dict]]: A tuple containing the loaded parameter values
            (or `None` if not available) and input artifacts (or `None` if not available).

    Raises:
        ValueError: If the config file does not contain a `parameter_values` and/or
            `input_artifacts` dict.
        ValueError: If the config file contains common keys in `parameter_values` and
            `input_artifacts` dict.
    """
    spec = importlib.util.spec_from_file_location("module.name", config_filepath)
    module = importlib.util.module_from_spec(spec)
    try:
        spec.loader.exec_module(module)
    except Exception as e:
        raise BadConfigError(
            f"{config_filepath}: invalid Python config file.\n{e.__class__.__name__}: {e}"
        ) from e

    parameter_values = getattr(module, "parameter_values", None)
    input_artifacts = getattr(module, "input_artifacts", None)

    if parameter_values is None and input_artifacts is None:
        raise BadConfigError(
            f"{config_filepath}: Python config file must contain a `parameter_values` "
            "and/or `input_artifacts` dict."
        )

    if parameter_values is not None and input_artifacts is not None:
        common_keys = set(parameter_values.keys()).intersection(set(input_artifacts.keys()))
        if common_keys:
            raise BadConfigError(
                f"{config_filepath}: Python config file must not contain common keys in "
                "`parameter_values` and `input_artifacts` dict. Common keys: {common_keys}"
            )

    return parameter_values, input_artifacts

_load_config_yaml

_load_config_yaml(config_filepath: Path) -> dict

Load the parameter values from a YAML config file.

Parameters:

Name Type Description Default
config_filepath Path

A Path object representing the path to the config file.

required

Returns:

Name Type Description
dict dict

The loaded parameter values.

Source code in deployer/utils/config.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def _load_config_yaml(config_filepath: Path) -> dict:
    """Load the parameter values from a YAML config file.

    Args:
        config_filepath (Path): A `Path` object representing the path to the config file.

    Returns:
        dict: The loaded parameter values.
    """
    with open(config_filepath, "r") as f:
        try:
            parameter_values = yaml.safe_load(f)
        except yaml.YAMLError as e:
            raise BadConfigError(
                f"{config_filepath}: invalid YAML config file.\n{e.__class__.__name__}: {e}"
            ) from e
    return parameter_values

_load_config_toml

_load_config_toml(config_filepath: Path) -> dict

Load the parameter values from a TOML config file.

Parameters:

Name Type Description Default
config_filepath Path

A Path object representing the path to the config file.

required

Returns:

Name Type Description
dict dict

The loaded parameter values.

Source code in deployer/utils/config.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def _load_config_toml(config_filepath: Path) -> dict:
    """Load the parameter values from a TOML config file.

    Args:
        config_filepath (Path): A `Path` object representing the path to the config file.

    Returns:
        dict: The loaded parameter values.
    """

    def flatten_toml_document(
        d_: Union[TOMLDocument, tomlkit.items.Table],
        parent_key: Optional[str] = None,
        sep: str = ".",
    ) -> dict:
        """Flatten a tomlkit.TOMLDocument. Inline tables are not flattened"""
        items = []
        for k, v in d_.items():
            child_key = f"{parent_key}{sep}{k}" if parent_key else k
            if isinstance(v, tomlkit.items.Table):
                # inline tables will not be flattened
                items.extend(flatten_toml_document(v, child_key, sep=sep).items())
            else:
                items.append((child_key, v))
        return dict(items)

    config_file = TOMLFile(config_filepath)

    try:
        config = config_file.read()
        parameter_values = flatten_toml_document(config, sep="_")
    except Exception as e:
        raise BadConfigError(
            f"{config_filepath}: invalid TOML config file.\n{e.__class__.__name__}: {e}"
        ) from e

    return parameter_values