Skip to content

pipeline_checks

Pipelines

Bases: CustomBaseModel

Model to validate multiple pipelines at once

Source code in deployer/pipeline_checks.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class Pipelines(CustomBaseModel):
    """Model to validate multiple pipelines at once"""

    pipelines: Dict[str, Pipeline]

    @model_validator(mode="wrap")
    def _init_remove_temp_directory(self, handler: ModelWrapValidatorHandler) -> Any:
        """Create and remove temporary directory"""
        Path(TEMP_LOCAL_PACKAGE_PATH).mkdir(exist_ok=True)

        try:
            validated_self = handler(self)
        except ValidationError as e:
            raise e
        finally:
            shutil.rmtree(TEMP_LOCAL_PACKAGE_PATH)

        return validated_self

Pipeline

Bases: CustomBaseModel

Validation of one pipeline and its configs

Source code in deployer/pipeline_checks.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class Pipeline(CustomBaseModel):
    """Validation of one pipeline and its configs"""

    pipeline_name: str
    config_paths: Annotated[List[Path], Field(validate_default=True)] = None
    pipelines_root_path: Path
    configs_root_path: Path
    configs: Optional[Dict[str, ConfigDynamicModel]] = None  # Optional because populated after

    @model_validator(mode="before")
    @classmethod
    def populate_config_names(cls, data: Any) -> Any:
        """Populate config names before validation"""
        if data.get("config_paths") is None:
            data["config_paths"] = list_config_filepaths(
                str(data["configs_root_path"]), data["pipeline_name"]
            )
        return data

    @computed_field
    @property
    def pipeline(self) -> graph_component.GraphComponent:
        """Import pipeline"""
        if getattr(self, "_pipeline", None) is None:
            with DisableLogger("deployer.utils.utils"):
                self._pipeline = import_pipeline_from_dir(
                    str(self.pipelines_root_path), self.pipeline_name
                )
        return self._pipeline

    @model_validator(mode="after")
    def import_pipeline(self):
        """Validate that the pipeline can be imported by calling pipeline computed field"""
        logger.debug(f"Importing pipeline {self.pipeline_name}")
        try:
            _ = self.pipeline
        except (ImportError, ModuleNotFoundError) as e:
            raise ValueError(f"Pipeline import failed: {e}") from e
        return self

    @model_validator(mode="after")
    def compile_pipeline(self):
        """Validate that the pipeline can be compiled"""
        logger.debug(f"Compiling pipeline {self.pipeline_name}")
        try:
            with DisableLogger("deployer.pipeline_deployer"):
                VertexPipelineDeployer(
                    pipeline_name=self.pipeline_name,
                    pipeline_func=self.pipeline,
                    local_package_path=TEMP_LOCAL_PACKAGE_PATH,
                ).compile()
        except Exception as e:
            raise ValueError(f"Pipeline compilation failed: {e.__repr__()}")  # noqa: B904
        return self

    @model_validator(mode="after")
    def validate_configs(self, info: ValidationInfo):
        """Validate configs against pipeline parameters definition"""
        logger.debug(f"Validating configs for pipeline {self.pipeline_name}")
        pipelines_dynamic_model = create_model_from_func(
            self.pipeline.pipeline_func,
            type_converter=_convert_artifact_type_to_str,
            exclude_defaults=info.context.get("raise_for_defaults", False),
        )
        config_model = ConfigsDynamicModel[pipelines_dynamic_model]
        self.configs = config_model.model_validate(
            {"configs": {x.name: {"config_path": x} for x in self.config_paths}}
        )
        return self

populate_config_names classmethod

populate_config_names(data: Any) -> Any

Populate config names before validation

Source code in deployer/pipeline_checks.py
63
64
65
66
67
68
69
70
71
@model_validator(mode="before")
@classmethod
def populate_config_names(cls, data: Any) -> Any:
    """Populate config names before validation"""
    if data.get("config_paths") is None:
        data["config_paths"] = list_config_filepaths(
            str(data["configs_root_path"]), data["pipeline_name"]
        )
    return data

pipeline property

pipeline: GraphComponent

Import pipeline

import_pipeline

import_pipeline()

Validate that the pipeline can be imported by calling pipeline computed field

Source code in deployer/pipeline_checks.py
84
85
86
87
88
89
90
91
92
@model_validator(mode="after")
def import_pipeline(self):
    """Validate that the pipeline can be imported by calling pipeline computed field"""
    logger.debug(f"Importing pipeline {self.pipeline_name}")
    try:
        _ = self.pipeline
    except (ImportError, ModuleNotFoundError) as e:
        raise ValueError(f"Pipeline import failed: {e}") from e
    return self

compile_pipeline

compile_pipeline()

Validate that the pipeline can be compiled

Source code in deployer/pipeline_checks.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@model_validator(mode="after")
def compile_pipeline(self):
    """Validate that the pipeline can be compiled"""
    logger.debug(f"Compiling pipeline {self.pipeline_name}")
    try:
        with DisableLogger("deployer.pipeline_deployer"):
            VertexPipelineDeployer(
                pipeline_name=self.pipeline_name,
                pipeline_func=self.pipeline,
                local_package_path=TEMP_LOCAL_PACKAGE_PATH,
            ).compile()
    except Exception as e:
        raise ValueError(f"Pipeline compilation failed: {e.__repr__()}")  # noqa: B904
    return self

validate_configs

validate_configs(info: ValidationInfo)

Validate configs against pipeline parameters definition

Source code in deployer/pipeline_checks.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@model_validator(mode="after")
def validate_configs(self, info: ValidationInfo):
    """Validate configs against pipeline parameters definition"""
    logger.debug(f"Validating configs for pipeline {self.pipeline_name}")
    pipelines_dynamic_model = create_model_from_func(
        self.pipeline.pipeline_func,
        type_converter=_convert_artifact_type_to_str,
        exclude_defaults=info.context.get("raise_for_defaults", False),
    )
    config_model = ConfigsDynamicModel[pipelines_dynamic_model]
    self.configs = config_model.model_validate(
        {"configs": {x.name: {"config_path": x} for x in self.config_paths}}
    )
    return self

ConfigsDynamicModel

Bases: CustomBaseModel, Generic[PipelineConfigT]

Model used to generate checks for configs based on pipeline dynamic model

Source code in deployer/pipeline_checks.py
48
49
50
51
class ConfigsDynamicModel(CustomBaseModel, Generic[PipelineConfigT]):
    """Model used to generate checks for configs based on pipeline dynamic model"""

    configs: Dict[str, ConfigDynamicModel[PipelineConfigT]]

ConfigDynamicModel

Bases: CustomBaseModel, Generic[PipelineConfigT]

Model used to generate checks for configs based on pipeline dynamic model

Source code in deployer/pipeline_checks.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class ConfigDynamicModel(CustomBaseModel, Generic[PipelineConfigT]):
    """Model used to generate checks for configs based on pipeline dynamic model"""

    config_path: Path
    config: PipelineConfigT

    @model_validator(mode="before")
    @classmethod
    def load_config_if_empty(cls, data: Any) -> Any:
        """Load config if it is empty"""
        if data.get("config") is None:
            try:
                parameter_values, input_artifacts = load_config(data["config_path"])
            except BadConfigError as e:
                raise PydanticCustomError("BadConfigError", str(e)) from e
            data["config"] = {**(parameter_values or {}), **(input_artifacts or {})}
        return data

load_config_if_empty classmethod

load_config_if_empty(data: Any) -> Any

Load config if it is empty

Source code in deployer/pipeline_checks.py
35
36
37
38
39
40
41
42
43
44
45
@model_validator(mode="before")
@classmethod
def load_config_if_empty(cls, data: Any) -> Any:
    """Load config if it is empty"""
    if data.get("config") is None:
        try:
            parameter_values, input_artifacts = load_config(data["config_path"])
        except BadConfigError as e:
            raise PydanticCustomError("BadConfigError", str(e)) from e
        data["config"] = {**(parameter_values or {}), **(input_artifacts or {})}
    return data

_convert_artifact_type_to_str

_convert_artifact_type_to_str(annotation: type) -> type

Convert a kfp.dsl.Artifact type to a string.

This is mandatory for type checking, as kfp.dsl.Artifact types should be passed as strings to VertexAI. See https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.PipelineJob for details.

Source code in deployer/pipeline_checks.py
145
146
147
148
149
150
151
152
153
154
155
def _convert_artifact_type_to_str(annotation: type) -> type:
    """Convert a kfp.dsl.Artifact type to a string.

    This is mandatory for type checking, as kfp.dsl.Artifact types should be passed as strings
    to VertexAI. See <https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.PipelineJob>
    for details.
    """
    if isinstance(annotation, _AnnotatedAlias):
        if issubclass(annotation.__origin__, kfp.dsl.Artifact):
            return str
    return annotation