Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support for Kubernetes #1058

Merged
merged 22 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7ab2be7
Update partition_table_prd.md potential way to address #923 and #950
sundar-mudupalli-work Aug 24, 2023
a39b9d8
Fixed bug with specifying a config file in GCS - this did not work ea…
sundar-mudupalli-work Aug 26, 2023
6bb41e2
Merge branch 'K8-indexed' of https://github.com/GoogleCloudPlatform/p…
sundar-mudupalli-work Aug 26, 2023
0f22e62
Merge branch 'develop' into K8-indexed; Got all the latest fixes from…
sundar-mudupalli-work Sep 7, 2023
7d0ab39
Added a Implementation Note on how to scale DVT with Kubernetes
sundar-mudupalli-work Oct 14, 2023
4f1ebdb
Linted files and added unit tests for config_runner changes.
sundar-mudupalli-work Dec 27, 2023
0216c7f
Updated README on how to run multiple instances concurrently.
sundar-mudupalli-work Dec 27, 2023
d1275ec
Updated README.md
sundar-mudupalli-work Dec 27, 2023
4cca42a
Merge branch 'develop' into K8-indexed
sundar-mudupalli-work Dec 27, 2023
e17f661
Merge branch 'develop' into K8-indexed
sundar-mudupalli-work Dec 27, 2023
4fa260c
Some Lint fixes
sundar-mudupalli-work Dec 27, 2023
f21cc91
Updated tests to mock build_config_managers_from_yaml.
sundar-mudupalli-work Dec 30, 2023
007e738
Fixed reference to directory.
sundar-mudupalli-work Dec 30, 2023
82cc140
Update README.md
sundar-mudupalli-work Jan 5, 2024
c972507
Update README.md
sundar-mudupalli-work Jan 5, 2024
10fa94a
Update README.md
sundar-mudupalli-work Jan 5, 2024
f557df2
Update docs/internal/kubernetes_jobs.md
sundar-mudupalli-work Jan 5, 2024
e595bac
Updated docs
sundar-mudupalli-work Jan 5, 2024
992d0f1
Update README.md
sundar-mudupalli-work Jan 6, 2024
5c2d7ac
Some more doc changes.
sundar-mudupalli-work Jan 8, 2024
492a2e7
Final changes ?
sundar-mudupalli-work Jan 8, 2024
3105d50
Final typos
sundar-mudupalli-work Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ data-validation (--verbose or -v) (--log-level or -ll) validate custom-query col
See: *Validation Reports* section
[--service-account or -sa PATH_TO_SA_KEY]
Service account to use for BigQuery result handler output.
[--config-file or -c CONFIG_FILE]
YAML Config File Path to be used for storing validations.
[--labels or -l KEY1=VALUE1,KEY2=VALUE2]
Comma-separated key value pair labels for the run.
[--format or -fmt] Format for stdout output. Supported formats are (text, csv, json, table).
Expand Down Expand Up @@ -375,6 +377,8 @@ data-validation (--verbose or -v) (--log-level or -ll) validate custom-query row
See: *Validation Reports* section
[--service-account or -sa PATH_TO_SA_KEY]
Service account to use for BigQuery result handler output.
[--config-file or -c CONFIG_FILE]
YAML Config File Path to be used for storing validations.
[--labels or -l KEY1=VALUE1,KEY2=VALUE2]
Comma-separated key value pair labels for the run.
[--format or -fmt] Format for stdout output. Supported formats are (text, csv, json, table).
Expand Down Expand Up @@ -411,12 +415,14 @@ For example, this flag can be used as follows:
}
```

### YAML Configuration Files
### Running DVT with YAML Configuration Files

You can customize the configuration for any given validation by providing use
case specific CLI arguments or editing the YAML configuration file.
Running DVT with YAML configuration files is the recommended approach if:
* you want to customize the configuration for any given validation by providing use case specific CLI arguments or editing the YAML configuration file OR
sundar-mudupalli-work marked this conversation as resolved.
Show resolved Hide resolved
* you want to run DVT at scale (i.e. row validations across many partitions)
Nearly every validation command can take the argument `-c <file-name>` (the `generate-table-partitions` command takes `-cdir <directory-name>`) where one or more yaml files are produced. These yaml files can be modified (for more specific validations), see below
sundar-mudupalli-work marked this conversation as resolved.
Show resolved Hide resolved

For example, the following command creates a YAML file for the validation of the
The following command creates a YAML file for the validation of the
`new_york_citibike` table: `data-validation validate column -sc my_bq_conn -tc
my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips -c
citibike.yaml`.
Expand All @@ -441,6 +447,24 @@ OR

data-validation configs run -cdir gs://my-bucket/my-validations/
```
#### Scaling DVT to run 10's to 1000's of validations concurrently
The above example `configs run -cdir` shows how you can run multiple validations with one command. If the directory used has been created by `generate-table-partitions`, you will have partition files numbered from `0000.yaml` to `<partno-1>.yaml`, where `<partno>` is the total number of partitions. These can be run as mentioned above and the partitions will be validated in order. This could take a long time if the number of partitions is large.

DVT validations can be run concurrently (horizontal scaling) using GKE (Kubernetes Jobs) or Cloud Run Jobs, GCP's managed Kubernetes offering. In order to run DVT in a container, you have to build a docker image, [see instructions](https://github.com/GoogleCloudPlatform/professional-services-data-validator/tree/develop/samples/docker#readme). Set the `PSO_DV_CONFIG` environment variable to point to a GCS prefix where the connection configuration files are stored. In Cloud Run you can use the option `--set-env-vars` or `--update-env-vars` to pass [the environment variable](https://cloud.google.com/run/docs/configuring/services/environment-variables#setting). We recommend that you use the `bq-result-handler` to save your validation results. In order to validate partitions concurrently, run DVT in Kubernetes or Cloud Run as shown below:
```
data-validation (--verbose or -v) (--log-level or -ll) configs run
[--kube-completions or -kc]
sundar-mudupalli-work marked this conversation as resolved.
Show resolved Hide resolved
sundar-mudupalli-work marked this conversation as resolved.
Show resolved Hide resolved
--config-dir or -cdir GCS_DIRECTORY
where GCS_DIRECTORY = CONFIG_DIR/SOURCE_SCHEMA.SOURCE_TABLE, where CONFIG_DIR, SOURCE_SCHEMA and SOURCE_TABLE are
the arguments provided to generate-table-partitions to generate the partition yamls. GCS_DIRECTORY is the directory
where the partition files numbered 0000.yaml to <partno-1>.yaml are stored.
```

The Cloud Run and Kubernetes pods must run in a network with access to the database servers. Every Cloud Run job is associated with a [service account](https://cloud.google.com/run/docs/securing/service-identity). You need to ensure that this service account has access to Google Cloud Storage (to read connection configuration and yaml files) and BigQuery (to publish results). If you are using Kubernetes, you will need to use a service account with the same privileges as mentioned for Cloud Run. In Kubernetes, you will need to set up [workload identity](https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity) so the DVT container can impersonate the service account.
nehanene15 marked this conversation as resolved.
Show resolved Hide resolved

In Cloud Run, the [job](https://cloud.google.com/run/docs/create-jobs) must be run as multiple, independent tasks with the task count set to the number of partitions generated. In Kubernetes, set the number of completions to the number of partitions generated - see [Kubernetes Parallel Jobs](https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs). The option `--kube-completions or -kc` tells DVT that many DVT containers are running in a Kubernetes cluster. Each DVT container only validates the specific partition yaml (based on the index assigned by Kubernetes control plane). If the `-kc` option is used and you are not running in indexed mode, you will receive a warning and the container will process all the validations sequentially. If the `-kc` option is used and a config directory is not provided (a `--config-file` is provided instead), a warning is issued.

By default, each partition validation is retried upto 3 times if there is any error. In Kubernetes and Cloud Run, you can set the parallelism to the number you want. If the parallelism is set to too high a number, then the databases being queried can become overloaded, resulting in timeouts and errors. If you are validating 1000's of partitions in parallel, you may find that setting the parallelism too high (say 100) may result in timeouts and slow down the validation.
sundar-mudupalli-work marked this conversation as resolved.
Show resolved Hide resolved

View the complete YAML file for a Grouped Column validation on the
sundar-mudupalli-work marked this conversation as resolved.
Show resolved Hide resolved
[Examples](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/docs/examples.md#sample-yaml-config-grouped-column-validation) page.
Expand Down
49 changes: 43 additions & 6 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,14 +314,51 @@ def build_config_managers_from_args(


def config_runner(args):
"""Config Runner is where the decision is made to run validations from one or more files.
One file can produce multiple validations - for example when more than one set of tables are being validated
between the source and target. If multiple files are to be run and if the associated configuration files
are numbered sequentially, say from '0000.yaml' to '0012.yaml' (for 13 validations),
it is possible to run them concurrently in a Kubernetes / Cloud Run environment.
If the user wants that, they need to specify a -kc or --kube-completions which tells
DVT to only run the validation corresponding to the index number provided in the
JOB_COMPLETION_INDEX (for Kubernetes) or CLOUD_RUN_TASK_INDEX (for Cloud Run) environment
variable. This environment variable is set by the Kubernetes/Cloud Run container orchestrator.
The orchestrator spins up containers to complete each validation, one at a time.
"""
if args.config_dir:
mgr = state_manager.StateManager(file_system_root_path=args.config_dir)
config_file_names = mgr.list_validations_in_dir(args.config_dir)

config_managers = []
for file in config_file_names:
config_managers.extend(build_config_managers_from_yaml(args, file))
if args.kube_completions and (
("JOB_COMPLETION_INDEX" in os.environ.keys())
or ("CLOUD_RUN_TASK_INDEX" in os.environ.keys())
):
# Running in Kubernetes in Job completions - only run the yaml file corresponding to index
job_index = (
int(os.environ.get("JOB_COMPLETION_INDEX"))
if "JOB_COMPLETION_INDEX" in os.environ.keys()
else int(os.environ.get("CLOUD_RUN_TASK_INDEX"))
)
config_file_path = (
f"{args.config_dir}{job_index:04d}.yaml"
if args.config_dir.endswith("/")
else f"{args.config_dir}/{job_index:04d}.yaml"
)
setattr(args, "config_dir", None)
setattr(args, "config_file", config_file_path)
config_managers = build_config_managers_from_yaml(args, config_file_path)
else:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we want to convert this into an elif block?

elif args.kube_completions:
   # log warning
else:
   # create state manager and config_managers

if args.kube_completions:
logging.warning(
"--kube-completions or -kc specified, however not running in Kubernetes Job completion, check your command line."
)
mgr = state_manager.StateManager(file_system_root_path=args.config_dir)
config_file_names = mgr.list_validations_in_dir(args.config_dir)
config_managers = []
for file in config_file_names:
config_managers.extend(build_config_managers_from_yaml(args, file))
else:
if args.kube_completions:
logging.warning(
"--kube-completions or -kc specified, which requires a config directory, however a specific config file is provided."
)
config_file_path = _get_arg_config_file(args)
config_managers = build_config_managers_from_yaml(args, config_file_path)

Expand Down
6 changes: 6 additions & 0 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,12 @@ def _configure_validation_config_parser(subparsers):
"-cdir",
help="Directory path containing YAML Config Files to be used for running validations.",
)
run_parser.add_argument(
"--kube-completions",
"-kc",
action="store_true",
help="When validating multiple table partitions generated by generate-table-partitions, using DVT in Kubernetes in index completion mode use this flag so that all the validations are completed",
)

get_parser = configs_subparsers.add_parser(
"get", help="Get and print a validation config"
Expand Down
17 changes: 17 additions & 0 deletions docs/internal/kubernetes_jobs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Scaling Data Validation with Kubernetes Jobs

## Nature of Data Validation
Data Validation by nature is a batch process. We are presented with a set of arguments, the validation is performed, the results are provided and Data Validation completes. Data Validation can also take time (multiple secs, minutes) if a large amount of data that needs to be validated.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Data Validation by nature is a batch process. We are presented with a set of arguments, the validation is performed, the results are provided and Data Validation completes. Data Validation can also take time (multiple secs, minutes) if a large amount of data that needs to be validated.
Data Validation by nature is a batch process. We are presented with a set of arguments, the validation is performed, the results are provided and Data Validation completes. Data Validation can also take time if a large amount of data needs to be validated.


Data Validation has the `generate-table-partitions` function that partitions a row validation into a specified number of smaller, equally sized validations. Using this feature, validation of two large tables can be split into a number of row validations of partitions of the same tables. See [partition table PRD](partition_table_prd.md) for details on partitioning. This process generates a sequence of yaml files which can be used to validate the individual partitions.
sundar-mudupalli-work marked this conversation as resolved.
Show resolved Hide resolved

## Kubernetes Workloads
Kubernetes supports different types of workloads including a few batch workload types. The Job workload is a batch workload that retries execution until a specified number of them successfully complete. If a row validation has been split into `n` partitions, then we need to validate each partition and merge the results of the validation. Using Kubernetes Jobs we need to successfully run `n` completions of the job, as long as we guarantee that each completion is associated with a different partition. Kubernetes provides a type of job management called indexed completions that supports the Data Validation use case. A Kubernetes job can use multiple parallel worker processes. Each worker process has an index number that the control plane sets which identifies which part of of the overall task (i.e. which partition) to work on. The index is available in the environment variable `JOB_COMPLETION_INDEX` (in cloud run the environment variable is `CLOUD_RUN_TASK_INDEX`). An explanation of this is provided in [Introducing Indexed Jobs](https://kubernetes.io/blog/2021/04/19/introducing-indexed-jobs/#:~:text=Indexed%20%3A%20the%20Job%20is%20considered,and%20the%20JOB_COMPLETION_INDEX%20environment%20variable).
sundar-mudupalli-work marked this conversation as resolved.
Show resolved Hide resolved

Indexed completion mode supports partitioned yaml files generated by `generate-table-partitions` in Data Validation, if each worker process ran only the yaml file corresponding to its index. I have an introduced an optional parameter `--kube-completions` or `-kc`. When this flag is used with `data-validation configs run` with a config directory and the container runs in indexed jobs mode, each container only processes the specific validation yaml file corresponding to its index. If the flag is used `data-validation configs run` with a config directory and DVT is not running in indexed jobs mode, a warning is issued. In all other instances, this flag is ignored.
sundar-mudupalli-work marked this conversation as resolved.
Show resolved Hide resolved
### IAM Permissions
sundar-mudupalli-work marked this conversation as resolved.
Show resolved Hide resolved
### Passing database connection parameters
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this section accurate? I know we plan on creating a separate PR that tackles Secret Manager support. Until then, users should be able to use GCS connections for Kubernetes/Cloud Run orchestration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated document and addded things to Future Work regarding using secret manage and other clieanup

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should get rid of the 'Future Work' section personally since it doesn't belong in the public documentation.. these are internal roadmap items that we can track with issues.

If you want to keep it in, we should at least delete the line "This inconsistent behavior is challenging and should be fixed." since it doesn't belong in a product's public docs

DVT stores database connection parameters are saved in `$HOME/.config/google-pso-data-validator` directory with passwords in raw text. This can be insecure and does not support rotation of passwords. A better approach would be use the (GCP) Secret Manager and retrieve it just when we connect to the database. DVT uses the Secret Manager for retrieving secrets and stores them in the `.config` directory when the connections are added.

I am proposing a simple change. Whenever a connection parameter is specified, allow the user to optionally specify a secret manager (provider, project-id). If a secret manager is specified, then DVT retrieves the connection information directly from the secret manager at the time of creating the connection. With this change, DVT can be run in a container in Cloud Run or Kubernetes fetching the connection information from the GCP Secret Manager. Cloud Run currently has a limitation that multiple secrets [cannot be mounted at the same path](https://cloud.google.com/run/docs/configuring/services/secrets#disallowed_paths_and_limitations). Since DVT requires connections to two different databases the connection info being mounted in the same directory, i.e. `$HOME/.config/google-pso-data-validator`, DVT cannot effectively run within Cloud Run.
## Future Work
121 changes: 121 additions & 0 deletions tests/unit/test__main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.

import argparse
import logging
import os
from unittest import mock

from data_validation import cli_tools, consts
Expand Down Expand Up @@ -56,6 +58,36 @@
}
]

CONFIG_RUNNER_ARGS_1 = {
"verbose": False,
"log_level": "INFO",
"command": "configs",
"validation_config_cmd": "run",
"dry_run": False,
"config_file": "gs://pso-kokoro-resources/resources/test/unit/test__main/3validations/first.yaml",
"config_dir": None,
"kube_completions": True,
}
CONFIG_RUNNER_ARGS_2 = {
"verbose": False,
"log_level": "INFO",
"dry_run": False,
"command": "configs",
"validation_config_cmd": "run",
"kube_completions": True,
"config_dir": "gs://pso-kokoro-resources/resources/test/unit/test__main/3validations",
}

CONFIG_RUNNER_ARGS_3 = {
"verbose": False,
"log_level": "INFO",
"dry_run": False,
"command": "configs",
"validation_config_cmd": "run",
"kube_completions": True,
"config_dir": "gs://pso-kokoro-resources/resources/test/unit/test__main/4partitions",
}


@mock.patch(
"argparse.ArgumentParser.parse_args",
Expand All @@ -74,3 +106,92 @@ def test__compare_match_tables():
table_configs = main._compare_match_tables(SOURCE_TABLE_MAP, TARGET_TABLE_MAP)

assert table_configs == RESULT_TABLE_CONFIGS


@mock.patch("data_validation.__main__.run_validations")
@mock.patch(
"data_validation.__main__.build_config_managers_from_yaml",
return_value=["config dict from one file"],
)
@mock.patch(
"argparse.ArgumentParser.parse_args",
return_value=argparse.Namespace(**CONFIG_RUNNER_ARGS_1),
)
def test_config_runner_1(mock_args, mock_build, mock_run, caplog):
"""config_runner, runs the validations, so we have to mock run_validations and examine the arguments
passed to it. Build Config Managers reads the yaml files and builds the validation configs,
which also includes creating a connection to the database. That is beyond a unit test, so mock
build_config_managers_from_yaml.
First test - run validation on a single file - and provide the -kc argument
Expected result
1. One config manager created
2. Warning about inappropriate use of -kc
Other test cases can be developed.
"""
caplog.set_level(logging.WARNING)
args = cli_tools.get_parsed_args()
caplog.clear()
main.config_runner(args)
# assert warning is seen
assert caplog.messages == [
"--kube-completions or -kc specified, which requires a config directory, however a specific config file is provided."
]
# assert that only one config manager object is present
assert len(mock_run.call_args.args[1]) == 1


@mock.patch("data_validation.__main__.run_validations")
@mock.patch(
"data_validation.__main__.build_config_managers_from_yaml",
return_value=["config dict from one file"],
)
@mock.patch(
"argparse.ArgumentParser.parse_args",
return_value=argparse.Namespace(**CONFIG_RUNNER_ARGS_2),
)
def test_config_runner_2(mock_args, mock_build, mock_run, caplog):

"""Second test - run validation on a directory - and provide the -kc argument,
but not running in a Kubernetes Completion Configuration. Expected result
1. Multiple (3) config manager created for validation
2. Warning about inappropriate use of -kc"""
caplog.set_level(logging.WARNING)
args = cli_tools.get_parsed_args()
caplog.clear()
main.config_runner(args)
# assert warning is seen
assert caplog.messages == [
"--kube-completions or -kc specified, however not running in Kubernetes Job completion, check your command line."
]
# assert that 3 config managers are present
assert len(mock_run.call_args.args[1]) == 3


@mock.patch("data_validation.__main__.run_validations")
@mock.patch(
"data_validation.__main__.build_config_managers_from_yaml",
return_value=["config dict from one file"],
)
@mock.patch(
"argparse.ArgumentParser.parse_args",
return_value=argparse.Namespace(**CONFIG_RUNNER_ARGS_3),
)
def test_config_runner_3(mock_args, mock_build, mock_run, caplog):
"""Second test - run validation on a directory - and provide the -kc argument,
have system believe it is running in a Kubernetes Completion Environment. Expected result
1. No warnings
2. run validation called as though config file is provided (config_dir is None)
3. run validation config file name corresponds to value of JOB_COMPLETION_INDEX
4. One config manager created for validation
"""
caplog.set_level(logging.WARNING)
os.environ["JOB_COMPLETION_INDEX"] = "2"
args = cli_tools.get_parsed_args()
caplog.clear()
main.config_runner(args)
# assert no warnings
assert caplog.messages == []
# assert that only one config manager and one validation corresponding to JOB_COMPLETION_INDEX is set.
assert mock_run.call_args.args[0].config_dir is None
assert os.path.basename(mock_run.call_args.args[0].config_file) == "0002.yaml"
assert len(mock_run.call_args.args[1]) == 1