diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 3d2651e..ff078e1 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -25,4 +25,3 @@ Any relevant screenshots - Run `pre-commit install && pre-commit run --all` locally for formatting and linting. - [ ] Includes screenshots of documentation updates. - Run `mkdocs serve` view documentation locally. -- [ ] Summarizes PR's changes in [CHANGELOG.md](https://github.com/PrefectHQ/prefect-gcp/blob/main/CHANGELOG.md) diff --git a/CHANGELOG.md b/CHANGELOG.md deleted file mode 100644 index 124db21..0000000 --- a/CHANGELOG.md +++ /dev/null @@ -1,365 +0,0 @@ -# Changelog - -All notable changes to this project will be documented in this file. - -The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), -and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - -## Unreleased - -### Added - -- Added `service_account_name` to Cloud Run v2 default template and variables - [#231](https://github.com/PrefectHQ/prefect-gcp/pull/231) - -### Changed - -### Deprecated - -### Removed - -### Fixed - -### Security - -## 0.5.5 - -Released December 11th, 2023. - -### Added - -- Ability to publish `CloudRun` blocks as cloud-run work pools - [#237](https://github.com/PrefectHQ/prefect-gcp/pull/237) -- Ability to publish `VertexAICustomTrainingJob` blocks as a vertex-ai work pool - [#238](https://github.com/PrefectHQ/prefect-gcp/pull/238) - -### Fixed - -- Modified default command logic in `CloudRunWorkerJobV2Configuration` to utilize the `BaseJobConfiguration._base_flow_run_command` method. - -## 0.5.4 - -Released November 29th, 2023. - -### Changed - -- Fix display name for Google Cloud Run v2 worker / work pool - [#229](https://github.com/PrefectHQ/prefect-gcp/pull/229) -- Removed credentials block creation step from the google cloud run worker guide - [#228](https://github.com/PrefectHQ/prefect-gcp/pull/228) - -## 0.5.3 - -Released November 29th, 2023. - -### Added - -- Added `CloudRunJobV2` and `CloudRunWorkerV2` for executing Prefect flows via Google Cloud Run - [#220](https://github.com/PrefectHQ/prefect-gcp/pull/220) - -### Fixed - -- Fix `list_folders` and `list_blobs` now logging bucket name and bucket path - [#184](https://github.com/PrefectHQ/prefect-gcp/pull/214) -- Fix empty `GcpCredentials` not inferring the GCP project upon initialization when running on Compute Engine, Cloud Run, and App Engine - [#219](https://github.com/PrefectHQ/prefect-gcp/pull/219) - -## 0.5.2 - -Released November 13th, 2023. - -### Added - -- GCP Cloud Run worker guide - [#203](https://github.com/PrefectHQ/prefect-gcp/pull/203) - -### Fixed - -- Default `project_id` in `GcpCredentials` if quota project is not provided - [#219](https://github.com/PrefectHQ/prefect-gcp/pull/219) - -## 0.5.1 - -Released October 12th, 2023. - -### Changed - -- Updated workers to respect default command from base worker configuration - [#216](https://github.com/PrefectHQ/prefect-gcp/pull/216) - -## 0.5.0 - -Released October 5th, 2023. - -### Fixed - -- Empty logging in `list_folders` and `list_blobs` - [#214](https://github.com/PrefectHQ/prefect-gcp/pull/214) - -### Added - -- Conditional imports to support operating with `pydantic>2` installed - [#215](https://github.com/PrefectHQ/prefect-gcp/pull/215) - -## 0.4.7 - -Released September 22nd, 2023. - -### Added - -- Vertex AI `CustomJob` worker - [#211](https://github.com/PrefectHQ/prefect-gcp/pull/211) -- Add `kill_infrastructure` method to Vertex AI worker - [#213](https://github.com/PrefectHQ/prefect-gcp/pull/213) - -### Changed - -- Use flow run name for name of created custom jobs - [#208](https://github.com/PrefectHQ/prefect-gcp/pull/208) - -## 0.4.6 - -Released September 5th, 2023. - -### Changed - -- Persist Labels to Vertex AI Custom Job - [#198](https://github.com/PrefectHQ/prefect-gcp/pull/208) - -## 0.4.5 - -Released July 20th, 2023. - -### Changed - -- Promoted workers to GA, removed beta disclaimers - -## 0.4.4 - -Released June 26th, 2023. - -### Changed - -- Vertex agent now attempts to retry create custom job up to three times to recover from transient errors - [#192](https://github.com/PrefectHQ/prefect-gcp/192) -- Updated `prefect.docker` import to `prefect.utilities.dockerutils` - [#194](https://github.com/PrefectHQ/prefect-gcp/pull/194) - -## 0.4.3 - -Released June 15th, 2023. - -### Deprecated - -- `prefect_gcp.projects` module. Use `prefect_gcp.deployments` instead. - [#189](https://github.com/PrefectHQ/prefect-gcp/pull/189) -- `pull_project_from_gcs` step. Use `pull_from_gcs` instead. - [#189](https://github.com/PrefectHQ/prefect-gcp/pull/189) -- `push_project_to_gcs` step. Use `push_to_gcs` instead. - [#189](https://github.com/PrefectHQ/prefect-gcp/pull/189) -- `PullProjectFromGcsOutput` step output. Use `PullFromGcsOutput` instead. - [#189](https://github.com/PrefectHQ/prefect-gcp/pull/189) -- `PushProjectToGcsOutput` step output. Use `PushToGcsOutput` instead. - [#189](https://github.com/PrefectHQ/prefect-gcp/pull/189) - -### Fixed - -- Bug that `list_folders` method removes dot(`"."`)s in the middle of paths - -## 0.4.2 - -Released on May 25th, 2023. - -### Added - -- `accelerator_count` property for `VertexAICustomTrainingJob` - [#174](https://github.com/PrefectHQ/prefect-gcp/pull/174) -- `result_transformer` parameter to customize the return structure of `bigquery_query` - [#176](https://github.com/PrefectHQ/prefect-gcp/pull/176) -- `boot_disk_type` and `boot_disk_size_gb` properties for `VertexAICustomTrainingJob` - [#177](https://github.com/PrefectHQ/prefect-gcp/pull/177) -- Support to stream worker logs for executed flow runs - [#183](https://github.com/PrefectHQ/prefect-gcp/pull/183) - -## 0.4.1 - -Released on April 20th, 2023. - -### Added - -- `CloudRunWorker` for executing Prefect flows via Google Cloud Run - [#172](https://github.com/PrefectHQ/prefect-gcp/pull/172) - -### Fixed - -- Fix `CloudRunJob` with VPC Connector usage. - [#170](https://github.com/PrefectHQ/prefect-gcp/pull/170) - -## 0.4.0 - -Released on April 6th, 2023. - -### Added - -- `pull_project_from_gcs` and `push_project_to_gcs` steps - [#167](https://github.com/PrefectHQ/prefect-gcp/pull/167) - -### Fixed - -- `upload_from_dataframe` docstring - [#162](https://github.com/PrefectHQ/prefect-gcp/pull/162) -- `upload_from_dataframe` file extensions for compressed parquet ('.snappy.parquet', '.gz.parquet') - [#166](https://github.com/PrefectHQ/prefect-gcp/pull/166) - -## 0.3.0 - -Released on February 28th, 2023. - -### Added - -- `upload_from_dataframe` method in `GcsBucket` - [#140](https://github.com/PrefectHQ/prefect-gcp/pull/140) - -### Fixed - -- Using `GcsBucket` as a deployment storage option - [#147](https://github.com/PrefectHQ/prefect-gcp/pull/147) -- Breaking: Stop decoding and return a `bytes` type in `GcpSecret.read_secret`, as originally annotated - [#149](https://github.com/PrefectHQ/prefect-gcp/pull/149) -- Resolving paths in `GcsBucket` unintentionally generating an arbitrary UUID when path is an empty string - [#150](https://github.com/PrefectHQ/prefect-gcp/pull/150) - -## 0.2.6 - -Released on February 7th, 2023. - -### Fixed - -- `get_directory` method in `GcsBucket` returns the list of downloaded file paths and supports relative path for `local_path` - [#129](https://github.com/PrefectHQ/prefect-gcp/pull/129). -- Reporting the state of the `VertexAICustomTrainingJob` to the Prefect API by passing the base env in job spec - [#132](https://github.com/PrefectHQ/prefect-gcp/pull/132). - -## 0.2.5 - -Released on January 27th, 2023. - -### Added - -- `vpc_connector_name` field to `CloudRunJob` - [#123](https://github.com/PrefectHQ/prefect-gcp/pull/123) -- `list_folders` method for GcsBucket - [#121](https://github.com/PrefectHQ/prefect-gcp/pull/121) - -### Fixed - -- Listing blobs at the root folder - [#120](https://github.com/PrefectHQ/prefect-gcp/pull/120) - -## 0.2.4 - -Released on January 20th, 2023. - -### Fixed - -- Correctly format GCS paths on Windows machines - [#117](https://github.com/PrefectHQ/prefect-gcp/pull/117) - -## 0.2.3 - -Released on January 5th, 2023. - -### Fixed - -- Wrapping type annotations in quotes to prevent them from loading if the object is not found - [#105](https://github.com/PrefectHQ/prefect-gcp/pull/105) - -## 0.2.2 - -Released on January 3rd, 2023. - -### Added - -- The `CloudRunJob` timeout parameter is now passed to the GCP TaskSpec. This allows Cloud Run tasks to run for longer than their default of 10min - [#99](https://github.com/PrefectHQ/prefect-gcp/pull/99) - -### Fixed - -- Improper imports on the top level - [#100](https://github.com/PrefectHQ/prefect-gcp/pull/100) - -## 0.2.1 - -Released on December 23rd, 2022. - -### Changed - -- Adds handling for ` service_account_info` supplied to `GcpCredentials` as a json formatted string - [#94](https://github.com/PrefectHQ/prefect-gcp/pull/94) - -## 0.2.0 - -Released on December 22nd, 2022. - -### Added - -- `list_blobs`, `download_object_to_path`, `download_object_to_file_object`, `download_folder_to_path`, `upload_from_path`, `upload_from_file_object`, `upload_from_folder` methods in `GcsBucket` - [#85](https://github.com/PrefectHQ/prefect-gcp/pull/85) -- `GcpSecret` block with `read_secret`, `write_secret`, and `delete_secret` methods - [#86](https://github.com/PrefectHQ/prefect-gcp/pull/86) -- `BigQueryWarehouse` block with `get_connection`, `fetch_one`, `fetch_many`, `fetch_all`, `execute`, `execute_many`, methods - [#88](https://github.com/PrefectHQ/prefect-gcp/pull/88) - -### Changed - -- Made `GcpCredentials.get_access_token` sync compatible - [#80](https://github.com/PrefectHQ/prefect-gcp/pull/80) -- Breaking: Obfuscated `GcpCredentials.service_account_info` by using `SecretDict` type - [#88](https://github.com/PrefectHQ/prefect-gcp/pull/88) -- `GcsBucket` additionally inherits from `ObjectStorageBlock` - [#85](https://github.com/PrefectHQ/prefect-gcp/pull/85) -- Expose all blocks available in the collection to top level init - [#88](https://github.com/PrefectHQ/prefect-gcp/pull/88) -- Inherit `CredentialsBlock` in `GcpCredentials` - [#92](https://github.com/PrefectHQ/prefect-gcp/pull/92) - -### Fixed - -- Warning stating `Failed to load collection 'prefect_gcp_aiplatform'` - [#87](https://github.com/PrefectHQ/prefect-gcp/pull/87) - -## 0.1.8 - -Released on December 5th, 2022. - -### Added - -- `VertexAICustomTrainingJob` infrastructure block - [#75](https://github.com/PrefectHQ/prefect-gcp/pull/75) - -## 0.1.7 - -Released on December 2nd, 2022. - -### Added - -- `CloudJobRun.kill` method for cancellation support - [#76](https://github.com/PrefectHQ/prefect-gcp/pull/76) - -## 0.1.6 - -Released on October 7th, 2022. - -### Fixed - -- Validation errors for CPU and Memory being raised incorrectly - [#64](https://github.com/PrefectHQ/prefect-gcp/pull/64) - -## 0.1.5 - -Released on September 28th, 2022. - -### Changed - -- Invoke `google.auth.default` if both `service_account_info` and `service_account_file` is not specified - [#57](https://github.com/PrefectHQ/prefect-gcp/pull/57) - -### Fixed - -- Retrieving the `project_id` from service account or `quota_project_id` from gcloud CLI if `project` is not specified - [#57](https://github.com/PrefectHQ/prefect-gcp/pull/57) - -## 0.1.4 - -Released on September 19th, 2022. - -### Added - -- `CloudRunJob` infrastructure block - [#48](https://github.com/PrefectHQ/prefect-gcp/pull/48) -- `GcsBucket` block - [#41](https://github.com/PrefectHQ/prefect-gcp/pull/41) -- `external_config` keyword argument in `bigquery_create_table` task - [#53](https://github.com/PrefectHQ/prefect-gcp/pull/53) -- `content_type` keyword argument in `cloud_storage_upload_blob_from_file` task - [#47](https://github.com/PrefectHQ/prefect-gcp/pull/47) -- `**kwargs` for all tasks in the module `cloud_storage.py` - [#47](https://github.com/PrefectHQ/prefect-gcp/pull/47) - -### Changed - -- Made `schema` keyword argument optional in `bigquery_create_table` task, thus the position of the keyword changed - [#53](https://github.com/PrefectHQ/prefect-gcp/pull/53) -- Allowed `~` character to be used in the path for service account file - [#38](https://github.com/PrefectHQ/prefect-gcp/pull/38) - -### Fixed - -- `ValidationError` using `GcpCredentials.service_account_info` in `prefect-dbt` - [#44](https://github.com/PrefectHQ/prefect-gcp/pull/44) - -## 0.1.3 - -Released on July 22nd, 2022. - -### Added - -- Added setup.py entry point - [#35](https://github.com/PrefectHQ/prefect-gcp/pull/35) - -## 0.1.2 - -Released on July 22nd, 2022. - -### Changed - -- Updated tests to be compatible with core Prefect library (v2.0b9) and bumped required version - [#30](https://github.com/PrefectHQ/prefect-gcp/pull/30) -- Converted GcpCredentials into a Block - [#31](https://github.com/PrefectHQ/prefect-gcp/pull/31). - -## 0.1.1 - -Released on July 11th, 2022 - -### Changed - -- Improved error handle and instruction for extras - [#18](https://github.com/PrefectHQ/prefect-gcp/pull/18) - -## 0.1.0 - -Released on March 17th, 2022. - -### Added - -- `cloud_storage_copy_blob`, `cloud_storage_create_bucket`, `cloud_storage_download_blob_as_bytes`, `cloud_storage_download_blob_to_file`, `cloud_storage_upload_blob_from_file`, and `cloud_storage_upload_blob_from_string` tasks - [#1](https://github.com/PrefectHQ/prefect-gcp/pull/1) -- `bigquery_create_table`, `bigquery_insert_stream`, `bigquery_load_cloud_storage`, `bigquery_load_file`, and `bigquery_query`, tasks - [#2](https://github.com/PrefectHQ/prefect-gcp/pull/2) -- `create_secret`, `delete_secret`, `delete_secret_version`, `read_secret`, and `update_secret` tasks - [#3](https://github.com/PrefectHQ/prefect-gcp/pull/5) diff --git a/prefect_gcp/cloud_run.py b/prefect_gcp/cloud_run.py index fd4c0ba..b16989e 100644 --- a/prefect_gcp/cloud_run.py +++ b/prefect_gcp/cloud_run.py @@ -27,6 +27,7 @@ ``` """ + from __future__ import annotations import json @@ -298,6 +299,16 @@ class CloudRunJob(Infrastructure): "before raising an exception." ), ) + max_retries: Optional[int] = Field( + default=3, + ge=0, + le=10, + title="Max Retries", + description=( + "The maximum retries setting specifies the number of times a task is " + "allowed to restart in case of failure before being failed permanently." + ), + ) # For private use _job_name: str = None _execution: Optional[Execution] = None @@ -678,6 +689,7 @@ def _jobs_body(self) -> dict: "spec": { "containers": containers, "timeoutSeconds": timeout_seconds, + "maxRetries": self.max_retries, } # TaskSpec } }, diff --git a/prefect_gcp/utilities.py b/prefect_gcp/utilities.py new file mode 100644 index 0000000..2fba9ee --- /dev/null +++ b/prefect_gcp/utilities.py @@ -0,0 +1,29 @@ +from typing import Optional + +from slugify import slugify + + +def slugify_name(name: str, max_length: int = 30) -> Optional[str]: + """ + Slugify text for use as a name. + + Keeps only alphanumeric characters and dashes, and caps the length + of the slug at 30 chars. + + The 30 character length allows room to add a uuid for generating a unique + name for the job while keeping the total length of a name below 63 characters, + which is the limit for Cloud Run job names. + + Args: + name: The name of the job + + Returns: + The slugified job name or None if the slugified name is empty + """ + slug = slugify( + name, + max_length=max_length, + regex_pattern=r"[^a-zA-Z0-9-]+", + ) + + return slug if slug else None diff --git a/prefect_gcp/workers/cloud_run.py b/prefect_gcp/workers/cloud_run.py index e1f0263..85d0d34 100644 --- a/prefect_gcp/workers/cloud_run.py +++ b/prefect_gcp/workers/cloud_run.py @@ -31,7 +31,6 @@ "annotations": { "run.googleapis.com/launch-stage": "BETA", - "run.googleapis.com/vpc-access-connector": "{{ vpc_connector_name }}" } }, "spec": @@ -70,7 +69,15 @@ } } } + }, + "metadata": + { + "annotations": + { + "run.googleapis.com/vpc-access-connector": "{{ vpc_connector_name }}" + } } + }, }, "timeout": "{{ timeout }}", "keep_job": "{{ keep_job }}" @@ -101,7 +108,6 @@ { "run.googleapis.com/my-custom-annotation": "{{ my_custom_annotation }}", "run.googleapis.com/launch-stage": "BETA", - "run.googleapis.com/vpc-access-connector": "{{ vpc_connector_name }}" }, ... }, @@ -126,17 +132,21 @@ { "apiVersion": "run.googleapis.com/v1", "kind": "Job", - "metadata": + "spec": { - "name": "{{ name }}", - "annotations": + "template": { - "run.googleapis.com/launch-stage": "BETA", - "run.googleapis.com/vpc-access-connector": "my-vpc-connector" - } - ... - }, - ... + "metadata": + { + "annotations": + { + "run.googleapis.com/vpc-access-connector": "my-vpc-connector" + } + }, + ... + }, + ... + } } ``` """ @@ -145,6 +155,7 @@ import shlex import time from typing import TYPE_CHECKING, Any, Dict, Optional +from uuid import uuid4 import anyio import googleapiclient @@ -172,6 +183,7 @@ from prefect_gcp.cloud_run import Execution, Job from prefect_gcp.credentials import GcpCredentials +from prefect_gcp.utilities import slugify_name if TYPE_CHECKING: from prefect.client.schemas import FlowRun @@ -189,7 +201,6 @@ def _get_default_job_body_template() -> Dict[str, Any]: "annotations": { # See: https://cloud.google.com/run/docs/troubleshooting#launch-stage-validation # noqa "run.googleapis.com/launch-stage": "BETA", - "run.googleapis.com/vpc-access-connector": "{{ vpc_connector_name }}", }, }, "spec": { # JobSpec @@ -218,6 +229,11 @@ def _get_default_job_body_template() -> Dict[str, Any]: } }, }, + "metadata": { + "annotations": { + "run.googleapis.com/vpc-access-connector": "{{ vpc_connector_name }}" # noqa + } + }, }, }, } @@ -338,7 +354,9 @@ def _populate_name_if_not_present(self): """Adds the flow run name to the job if one is not already provided.""" try: if "name" not in self.job_body["metadata"]: - self.job_body["metadata"]["name"] = self.name + base_job_name = slugify_name(self.name) + job_name = f"{base_job_name}-{uuid4().hex}" + self.job_body["metadata"]["name"] = job_name except KeyError: raise ValueError("Unable to verify name due to invalid job body template.") diff --git a/prefect_gcp/workers/cloud_run_v2.py b/prefect_gcp/workers/cloud_run_v2.py index 4906ab5..956beb4 100644 --- a/prefect_gcp/workers/cloud_run_v2.py +++ b/prefect_gcp/workers/cloud_run_v2.py @@ -2,6 +2,7 @@ import shlex import time from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional +from uuid import uuid4 from anyio.abc import TaskStatus from google.api_core.client_options import ClientOptions @@ -24,12 +25,13 @@ from pydantic import VERSION as PYDANTIC_VERSION if PYDANTIC_VERSION.startswith("2."): - from pydantic.v1 import Field, validator + from pydantic.v1 import Field, PrivateAttr, validator else: - from pydantic import Field, validator + from pydantic import Field, validator, PrivateAttr from prefect_gcp.credentials import GcpCredentials from prefect_gcp.models.cloud_run_v2 import CloudRunJobV2Result, ExecutionV2, JobV2 +from prefect_gcp.utilities import slugify_name if TYPE_CHECKING: from prefect.client.schemas import FlowRun @@ -52,6 +54,7 @@ def _get_default_job_body_template() -> Dict[str, Any]: "serviceAccount": "{{ service_account_name }}", "maxRetries": "{{ max_retries }}", "timeout": "{{ timeout }}", + "vpcAccess": "{{ vpc_connector_name }}", "containers": [ { "env": [], @@ -125,6 +128,7 @@ class CloudRunWorkerJobV2Configuration(BaseJobConfiguration): "complete before raising an exception." ), ) + _job_name: str = PrivateAttr(default=None) @property def project(self) -> str: @@ -137,18 +141,19 @@ def project(self) -> str: return self.credentials.project @property - def job_name(self): + def job_name(self) -> str: """ - Returns the job name, if it does not exist, it creates it. - """ - pre_trim_cr_job_name = f"prefect-{self.name}" - - if len(pre_trim_cr_job_name) > 40: - pre_trim_cr_job_name = pre_trim_cr_job_name[:40] + Returns the name of the job. - pre_trim_cr_job_name = pre_trim_cr_job_name.rstrip("-") + Returns: + str: The name of the job. + """ + if self._job_name is None: + base_job_name = slugify_name(self.name) + job_name = f"{base_job_name}-{uuid4().hex}" + self._job_name = job_name - return pre_trim_cr_job_name + return self._job_name def prepare_for_flow_run( self, @@ -179,6 +184,7 @@ def prepare_for_flow_run( self._format_args_if_present() self._populate_image_if_not_present() self._populate_timeout() + self._populate_vpc_if_present() def _populate_timeout(self): """ @@ -229,6 +235,15 @@ def _format_args_if_present(self): "args" ] = shlex.split(args) + def _populate_vpc_if_present(self): + """ + Populates the job body with the VPC connector if present. + """ + if self.job_body["template"]["template"].get("vpcAccess") is not None: + self.job_body["template"]["template"]["vpcAccess"] = { + "connector": self.job_body["template"]["template"]["vpcAccess"], + } + # noinspection PyMethodParameters @validator("job_body") def _ensure_job_includes_all_required_components(cls, value: Dict[str, Any]): diff --git a/tests/test_cloud_run.py b/tests/test_cloud_run.py index 69da669..27c4145 100644 --- a/tests/test_cloud_run.py +++ b/tests/test_cloud_run.py @@ -520,6 +520,15 @@ def test_timeout_added_correctly(self, cloud_run_job): "timeoutSeconds" ] == str(timeout) + def test_max_retries_added_correctly(self, cloud_run_job): + max_retries = 2 + cloud_run_job.max_retries = max_retries + result = cloud_run_job._jobs_body() + assert ( + result["spec"]["template"]["spec"]["template"]["spec"]["maxRetries"] + == max_retries + ) + def test_vpc_connector_name_added_correctly(self, cloud_run_job): cloud_run_job.vpc_connector_name = "vpc_name" result = cloud_run_job._jobs_body() diff --git a/tests/test_cloud_run_worker.py b/tests/test_cloud_run_worker.py index 6d5fa74..7c62514 100644 --- a/tests/test_cloud_run_worker.py +++ b/tests/test_cloud_run_worker.py @@ -17,6 +17,7 @@ from prefect.server.schemas.actions import DeploymentCreate from prefect_gcp.credentials import GcpCredentials +from prefect_gcp.utilities import slugify_name from prefect_gcp.workers.cloud_run import ( CloudRunWorker, CloudRunWorkerJobConfiguration, @@ -68,11 +69,28 @@ def cloud_run_worker_job_config(service_account_info, jobs_body): ) +@pytest.fixture +def cloud_run_worker_job_config_noncompliant_name(service_account_info, jobs_body): + return CloudRunWorkerJobConfiguration( + name="MY_JOB_NAME", + image="gcr.io//not-a/real-image", + region="middle-earth2", + job_body=jobs_body, + credentials=GcpCredentials(service_account_info=service_account_info), + ) + + class TestCloudRunWorkerJobConfiguration: def test_job_name(self, cloud_run_worker_job_config): cloud_run_worker_job_config.job_body["metadata"]["name"] = "my-job-name" assert cloud_run_worker_job_config.job_name == "my-job-name" + def test_job_name_is_slug(self, cloud_run_worker_job_config_noncompliant_name): + cloud_run_worker_job_config_noncompliant_name._populate_name_if_not_present() + assert cloud_run_worker_job_config_noncompliant_name.job_name[ + :-33 + ] == slugify_name("MY_JOB_NAME") + def test_populate_envs( self, cloud_run_worker_job_config, @@ -124,7 +142,21 @@ def test_populate_name_if_not_present(self, cloud_run_worker_job_config): cloud_run_worker_job_config._populate_name_if_not_present() assert "name" in metadata - assert metadata["name"] == cloud_run_worker_job_config.name + assert metadata["name"][:-33] == cloud_run_worker_job_config.name + + def test_job_name_different_after_retry(self, cloud_run_worker_job_config): + metadata = cloud_run_worker_job_config.job_body["metadata"] + + assert "name" not in metadata + + cloud_run_worker_job_config._populate_name_if_not_present() + job_name_1 = metadata.pop("name") + + cloud_run_worker_job_config._populate_name_if_not_present() + job_name_2 = metadata.pop("name") + + assert job_name_1[:-33] == job_name_2[:-33] + assert job_name_1 != job_name_2 def test_populate_or_format_command_doesnt_exist(self, cloud_run_worker_job_config): container = cloud_run_worker_job_config.job_body["spec"]["template"]["spec"][ diff --git a/tests/test_cloud_run_worker_v2.py b/tests/test_cloud_run_worker_v2.py index 23468d9..5422430 100644 --- a/tests/test_cloud_run_worker_v2.py +++ b/tests/test_cloud_run_worker_v2.py @@ -2,6 +2,7 @@ from prefect.utilities.dockerutils import get_prefect_image_name from prefect_gcp.credentials import GcpCredentials +from prefect_gcp.utilities import slugify_name from prefect_gcp.workers.cloud_run_v2 import CloudRunWorkerJobV2Configuration @@ -14,6 +15,7 @@ def job_body(): "template": { "maxRetries": None, "timeout": None, + "vpcAccess": "projects/my_project/locations/us-central1/connectors/my-connector", # noqa: E501 "containers": [ { "env": [], @@ -44,12 +46,39 @@ def cloud_run_worker_v2_job_config(service_account_info, job_body): ) +@pytest.fixture +def cloud_run_worker_v2_job_config_noncompliant_name(service_account_info, job_body): + return CloudRunWorkerJobV2Configuration( + name="MY_JOB_NAME", + job_body=job_body, + credentials=GcpCredentials(service_account_info=service_account_info), + region="us-central1", + timeout=86400, + env={"ENV1": "VALUE1", "ENV2": "VALUE2"}, + ) + + class TestCloudRunWorkerJobV2Configuration: def test_project(self, cloud_run_worker_v2_job_config): assert cloud_run_worker_v2_job_config.project == "my_project" def test_job_name(self, cloud_run_worker_v2_job_config): - assert cloud_run_worker_v2_job_config.job_name == "prefect-my-job-name" + assert cloud_run_worker_v2_job_config.job_name[:-33] == "my-job-name" + + def test_job_name_is_slug(self, cloud_run_worker_v2_job_config_noncompliant_name): + assert cloud_run_worker_v2_job_config_noncompliant_name.job_name[ + :-33 + ] == slugify_name("MY_JOB_NAME") + + def test_job_name_different_after_retry(self, cloud_run_worker_v2_job_config): + job_name_1 = cloud_run_worker_v2_job_config.job_name + + cloud_run_worker_v2_job_config._job_name = None + + job_name_2 = cloud_run_worker_v2_job_config.job_name + + assert job_name_1[:-33] == job_name_2[:-33] + assert job_name_1 != job_name_2 def test_populate_timeout(self, cloud_run_worker_v2_job_config): cloud_run_worker_v2_job_config._populate_timeout() @@ -92,3 +121,13 @@ def test_format_args_if_present(self, cloud_run_worker_v2_job_config): assert cloud_run_worker_v2_job_config.job_body["template"]["template"][ "containers" ][0]["args"] == ["-m", "prefect.engine"] + + def test_populate_vpc_if_present(self, cloud_run_worker_v2_job_config): + cloud_run_worker_v2_job_config._populate_vpc_if_present() + + assert ( + cloud_run_worker_v2_job_config.job_body["template"]["template"][ + "vpcAccess" + ]["connector"] + == "projects/my_project/locations/us-central1/connectors/my-connector" + )