From fdbdbe06bd7a5a52cae63b4d701d7eb7270f20fa Mon Sep 17 00:00:00 2001 From: sundar-mudupalli-work <106282841+sundar-mudupalli-work@users.noreply.github.com> Date: Mon, 8 Jan 2024 09:13:50 -0800 Subject: [PATCH] feat: Support for Kubernetes (#1058) * Update partition_table_prd.md potential way to address #923 and #950 * Fixed bug with specifying a config file in GCS - this did not work earlier Added functionality to support Kubernetes Indexed jobs - which when provided with a directory will only run the job corresponding to the index. Tested in a non Kubernetes setup * Added a Implementation Note on how to scale DVT with Kubernetes Shortened the option to 2 character code -kc * Linted files and added unit tests for config_runner changes. * Updated README on how to run multiple instances concurrently. * Updated README.md * Some Lint fixes * Updated tests to mock build_config_managers_from_yaml. * Fixed reference to directory. * Update README.md Co-authored-by: Neha Nene * Update README.md Co-authored-by: Neha Nene * Update README.md Co-authored-by: Neha Nene * Update docs/internal/kubernetes_jobs.md Co-authored-by: Neha Nene * Updated docs * Update README.md Co-authored-by: Neha Nene * Some more doc changes. * Final changes ? * Final typos --------- Co-authored-by: Neha Nene --- README.md | 34 ++++++++- data_validation/__main__.py | 49 +++++++++++-- data_validation/cli_tools.py | 6 ++ docs/internal/kubernetes_jobs.md | 23 ++++++ tests/unit/test__main.py | 121 +++++++++++++++++++++++++++++++ 5 files changed, 223 insertions(+), 10 deletions(-) create mode 100644 docs/internal/kubernetes_jobs.md diff --git a/README.md b/README.md index 2282cc850..181608237 100644 --- a/README.md +++ b/README.md @@ -318,6 +318,8 @@ data-validation (--verbose or -v) (--log-level or -ll) validate custom-query col See: *Validation Reports* section [--service-account or -sa PATH_TO_SA_KEY] Service account to use for BigQuery result handler output. + [--config-file or -c CONFIG_FILE] + YAML Config File Path to be used for storing validations. [--labels or -l KEY1=VALUE1,KEY2=VALUE2] Comma-separated key value pair labels for the run. [--format or -fmt] Format for stdout output. Supported formats are (text, csv, json, table). @@ -375,6 +377,8 @@ data-validation (--verbose or -v) (--log-level or -ll) validate custom-query row See: *Validation Reports* section [--service-account or -sa PATH_TO_SA_KEY] Service account to use for BigQuery result handler output. + [--config-file or -c CONFIG_FILE] + YAML Config File Path to be used for storing validations. [--labels or -l KEY1=VALUE1,KEY2=VALUE2] Comma-separated key value pair labels for the run. [--format or -fmt] Format for stdout output. Supported formats are (text, csv, json, table). @@ -411,12 +415,14 @@ For example, this flag can be used as follows: } ``` -### YAML Configuration Files +### Running DVT with YAML Configuration Files -You can customize the configuration for any given validation by providing use -case specific CLI arguments or editing the YAML configuration file. +Running DVT with YAML configuration files is the recommended approach if: +* you want to customize the configuration for any given validation OR +* you want to run DVT at scale (i.e. row validations across many partitions) +Nearly every validation command can take the argument `-c ` (the `generate-table-partitions` command takes `-cdir `) where one or more yaml files are produced. These yaml files can be modified for custom validations. -For example, the following command creates a YAML file for the validation of the +The following command creates a YAML file for the validation of the `new_york_citibike` table: `data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips -c citibike.yaml`. @@ -446,6 +452,26 @@ View the complete YAML file for a Grouped Column validation on the [Examples](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/docs/examples.md#sample-yaml-config-grouped-column-validation) page. You can view a list of all saved validation YAML files using `data-validation configs list`, and print a YAML config using `data-validation configs get -c citibike.yaml`. +#### Scaling DVT to run 10's to 1000's of validations concurrently +The above example `configs run -cdir` shows how you can run multiple validations with one command. If the directory used has been created by `generate-table-partitions`, you will have partition files numbered from `0000.yaml` to `.yaml`, where `` is the total number of partitions. These can be run as mentioned above and the partitions will be validated in order. This could take a long time if the number of partitions is large. + +DVT validations can be run concurrently (horizontal scaling) using GKE (Kubernetes Jobs) or Cloud Run Jobs. In order to run DVT in a container, you have to build a docker image, [see instructions](https://github.com/GoogleCloudPlatform/professional-services-data-validator/tree/develop/samples/docker#readme). Set the `PSO_DV_CONFIG_HOME` environment variable to point to a GCS prefix where the connection configuration files are stored. In Cloud Run you can use the option `--set-env-vars` or `--update-env-vars` to pass [the environment variable](https://cloud.google.com/run/docs/configuring/services/environment-variables#setting). We recommend that you use the `bq-result-handler` to save your validation results. In order to validate partitions concurrently, run DVT in Kubernetes or Cloud Run as shown below: +``` +data-validation (--verbose or -v) (--log-level or -ll) configs run + [--kube-completions or -kc] Specifies that validation is being run in Kubernetes or Cloud Run in indexed completion mode. + specifies to DVT that validation being run in indexed completion mode in Kubernetes or as multiple independent tasks in Cloud Run. + --config-dir or -cdir GCS_DIRECTORY + where GCS_DIRECTORY = CONFIG_DIR/SOURCE_SCHEMA.SOURCE_TABLE, where CONFIG_DIR, SOURCE_SCHEMA and SOURCE_TABLE are + the arguments provided to generate-table-partitions to generate the partition yamls. GCS_DIRECTORY is the directory + where the partition files numbered 0000.yaml to .yaml are stored. +``` +If you are not familiar with Kubernetes or Cloud Run Jobs, review the [internal docs](docs/internal/kubernetes_jobs.md) for an overview of the orchestration of multiple validations. + +The Cloud Run and Kubernetes pods must run in a network with access to the database servers. Every Cloud Run job is associated with a [service account](https://cloud.google.com/run/docs/securing/service-identity). You need to ensure that this service account has access to Google Cloud Storage (to read connection configuration and yaml files) and BigQuery (to publish results). If you are using Kubernetes, you will need to use a service account with the same privileges as mentioned for Cloud Run. In Kubernetes, you will need to set up [workload identity](https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity) so the DVT container can impersonate the service account. + +In Cloud Run, the [job](https://cloud.google.com/run/docs/create-jobs) must be run as multiple, independent tasks with the task count set to the number of partitions generated. In Kubernetes, set the number of completions to the number of partitions generated - see [Kubernetes Parallel Jobs](https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs). The option `--kube-completions or -kc` tells DVT that many DVT containers are running in a Kubernetes cluster. Each DVT container only validates the specific partition yaml (based on the index assigned by Kubernetes control plane). If the `-kc` option is used and you are not running in indexed mode, you will receive a warning and the container will process all the validations sequentially. If the `-kc` option is used and a config directory is not provided (a `--config-file` is provided instead), a warning is issued. + +By default, each partition validation is retried up to 3 times if there is an error. In Kubernetes and Cloud Run, you can set the parallelism to the number you want. Keep in mind that if you are validating 1000's of partitions in parallel, you may find that setting the parallelism too high (say 100) may result in timeouts and slow down the validation. ### Validation Reports diff --git a/data_validation/__main__.py b/data_validation/__main__.py index 2ed30c73d..e0dc7f163 100644 --- a/data_validation/__main__.py +++ b/data_validation/__main__.py @@ -314,14 +314,51 @@ def build_config_managers_from_args( def config_runner(args): + """Config Runner is where the decision is made to run validations from one or more files. + One file can produce multiple validations - for example when more than one set of tables are being validated + between the source and target. If multiple files are to be run and if the associated configuration files + are numbered sequentially, say from '0000.yaml' to '0012.yaml' (for 13 validations), + it is possible to run them concurrently in a Kubernetes / Cloud Run environment. + If the user wants that, they need to specify a -kc or --kube-completions which tells + DVT to only run the validation corresponding to the index number provided in the + JOB_COMPLETION_INDEX (for Kubernetes) or CLOUD_RUN_TASK_INDEX (for Cloud Run) environment + variable. This environment variable is set by the Kubernetes/Cloud Run container orchestrator. + The orchestrator spins up containers to complete each validation, one at a time. + """ if args.config_dir: - mgr = state_manager.StateManager(file_system_root_path=args.config_dir) - config_file_names = mgr.list_validations_in_dir(args.config_dir) - - config_managers = [] - for file in config_file_names: - config_managers.extend(build_config_managers_from_yaml(args, file)) + if args.kube_completions and ( + ("JOB_COMPLETION_INDEX" in os.environ.keys()) + or ("CLOUD_RUN_TASK_INDEX" in os.environ.keys()) + ): + # Running in Kubernetes in Job completions - only run the yaml file corresponding to index + job_index = ( + int(os.environ.get("JOB_COMPLETION_INDEX")) + if "JOB_COMPLETION_INDEX" in os.environ.keys() + else int(os.environ.get("CLOUD_RUN_TASK_INDEX")) + ) + config_file_path = ( + f"{args.config_dir}{job_index:04d}.yaml" + if args.config_dir.endswith("/") + else f"{args.config_dir}/{job_index:04d}.yaml" + ) + setattr(args, "config_dir", None) + setattr(args, "config_file", config_file_path) + config_managers = build_config_managers_from_yaml(args, config_file_path) + else: + if args.kube_completions: + logging.warning( + "--kube-completions or -kc specified, however not running in Kubernetes Job completion, check your command line." + ) + mgr = state_manager.StateManager(file_system_root_path=args.config_dir) + config_file_names = mgr.list_validations_in_dir(args.config_dir) + config_managers = [] + for file in config_file_names: + config_managers.extend(build_config_managers_from_yaml(args, file)) else: + if args.kube_completions: + logging.warning( + "--kube-completions or -kc specified, which requires a config directory, however a specific config file is provided." + ) config_file_path = _get_arg_config_file(args) config_managers = build_config_managers_from_yaml(args, config_file_path) diff --git a/data_validation/cli_tools.py b/data_validation/cli_tools.py index 2474b9b4c..a9ea88ed3 100644 --- a/data_validation/cli_tools.py +++ b/data_validation/cli_tools.py @@ -347,6 +347,12 @@ def _configure_validation_config_parser(subparsers): "-cdir", help="Directory path containing YAML Config Files to be used for running validations.", ) + run_parser.add_argument( + "--kube-completions", + "-kc", + action="store_true", + help="When validating multiple table partitions generated by generate-table-partitions, using DVT in Kubernetes in index completion mode use this flag so that all the validations are completed", + ) get_parser = configs_subparsers.add_parser( "get", help="Get and print a validation config" diff --git a/docs/internal/kubernetes_jobs.md b/docs/internal/kubernetes_jobs.md new file mode 100644 index 000000000..c99916fca --- /dev/null +++ b/docs/internal/kubernetes_jobs.md @@ -0,0 +1,23 @@ +# Scaling Data Validation with Kubernetes Jobs + +## Nature of Data Validation +Data Validation by nature is a batch process. We are presented with a set of arguments, the validation is performed, the results are provided and Data Validation completes. Data Validation can also take time (multiple secs, minutes) if a large amount of data needs to be validated. + +Data Validation has the `generate-table-partitions` function that partitions a row validation into a specified number of smaller, equally sized validations. Using this feature, validation of large tables can be split into row validations of many partitions of the same tables. See [partition table PRD](partition_table_prd.md) for details on partitioning. This process generates a sequence of yaml files which can be used to validate the individual partitions. + +## Kubernetes Workloads +Kubernetes supports different types of workloads including a few batch workload types. The Job workload is a batch workload that retries execution until a specified number of them successfully complete. If a row validation has been split into `n` partitions, then we need to validate each partition and merge the results of the validation. Using Kubernetes Jobs we need to successfully run `n` completions of the job, as long as we guarantee that each completion is associated with a different partition. Since release 1.21, Kubernetes provides a type of job management called indexed completions that supports the Data Validation use case. A Kubernetes job can use multiple parallel worker processes. Each worker process has an index number that the control plane sets which identifies which part of the overall task (i.e. which partition) to work on. Cloud Run uses slightly different terminology for a similar model - referring to each job completion as a task and all the tasks together as the job. The index is available in the environment variable `JOB_COMPLETION_INDEX` (in cloud run the environment variable is `CLOUD_RUN_TASK_INDEX`). An explanation of this is provided in [Introducing Indexed Jobs](https://kubernetes.io/blog/2021/04/19/introducing-indexed-jobs/#:~:text=Indexed%20%3A%20the%20Job%20is%20considered,and%20the%20JOB_COMPLETION_INDEX%20environment%20variable). + +Indexed completion mode supports partitioned yaml files generated by the `generate-table-partitions` command if each worker process runs only the yaml file corresponding to its index. The`--kube-completions` or `-kc` flag when running configs from a directory indicates that the validation is running in indexed jobs mode on Kubernetes or Cloud Run. This way, each container only processes the specific validation yaml file corresponding to its index. +### Passing database connection parameters +Usually, DVT stores database connection parameters in the `$HOME/.config/google-pso-data-validator` directory. If the environment variable `PSO_DV_CONFIG_HOME` is set, the database connection information can be fetched from this location. When running DVT in a container, there are two options regarding where to locate the database connection parameters. +1. Build the container image with the database connection parameters. This is not recommended because the container image is specific to the databases being used OR +2. Build a DVT container image as specified in the [docker samples directory](../../samples/docker/README.md). When running the container, pass the `PSO_DV_CONFIG_HOME` environment variable to point to a GCS location containing the connection configuration. This is the suggested approach. + +## Future Work +### Clean up the way DVT works so that config files can be stored either in the file system or GCS. +In some cases, the command `configs run` can take a GCS location where the yaml files can be found. In other cases, this has to be specified by setting the `PSO_DV_CONFIG_HOME` environment variable to point to the GCS location. This inconsistent behavior is challenging and should be fixed. +### Use GCP secret manager to store database connection configuration +DVT currently uses the GCP secret manager to secure each element of the database connection configuration. The current implementation stores the name of the secret for each element (host, port, user, password etc) in the database configuration file. When the connection needs to be made, the values are retrieved from the secret manager and used to connect to the database. While this is secure, database configurations are still stored in the filesystem. + +Another way to use the secret manager is to store the complete connection configuration as the value of the secret. Then while specifying the connection, we can also indicate to DVT to look for the connection configuration in the secret manager rather than the file system. Since most DVT commands connect to one or more databases, nearly every DVT command can take optional arguments that tells DVT to look for the connection configuration in the secret manager. \ No newline at end of file diff --git a/tests/unit/test__main.py b/tests/unit/test__main.py index 406e3613f..9d3e132ce 100644 --- a/tests/unit/test__main.py +++ b/tests/unit/test__main.py @@ -13,6 +13,8 @@ # limitations under the License. import argparse +import logging +import os from unittest import mock from data_validation import cli_tools, consts @@ -56,6 +58,36 @@ } ] +CONFIG_RUNNER_ARGS_1 = { + "verbose": False, + "log_level": "INFO", + "command": "configs", + "validation_config_cmd": "run", + "dry_run": False, + "config_file": "gs://pso-kokoro-resources/resources/test/unit/test__main/3validations/first.yaml", + "config_dir": None, + "kube_completions": True, +} +CONFIG_RUNNER_ARGS_2 = { + "verbose": False, + "log_level": "INFO", + "dry_run": False, + "command": "configs", + "validation_config_cmd": "run", + "kube_completions": True, + "config_dir": "gs://pso-kokoro-resources/resources/test/unit/test__main/3validations", +} + +CONFIG_RUNNER_ARGS_3 = { + "verbose": False, + "log_level": "INFO", + "dry_run": False, + "command": "configs", + "validation_config_cmd": "run", + "kube_completions": True, + "config_dir": "gs://pso-kokoro-resources/resources/test/unit/test__main/4partitions", +} + @mock.patch( "argparse.ArgumentParser.parse_args", @@ -74,3 +106,92 @@ def test__compare_match_tables(): table_configs = main._compare_match_tables(SOURCE_TABLE_MAP, TARGET_TABLE_MAP) assert table_configs == RESULT_TABLE_CONFIGS + + +@mock.patch("data_validation.__main__.run_validations") +@mock.patch( + "data_validation.__main__.build_config_managers_from_yaml", + return_value=["config dict from one file"], +) +@mock.patch( + "argparse.ArgumentParser.parse_args", + return_value=argparse.Namespace(**CONFIG_RUNNER_ARGS_1), +) +def test_config_runner_1(mock_args, mock_build, mock_run, caplog): + """config_runner, runs the validations, so we have to mock run_validations and examine the arguments + passed to it. Build Config Managers reads the yaml files and builds the validation configs, + which also includes creating a connection to the database. That is beyond a unit test, so mock + build_config_managers_from_yaml. + First test - run validation on a single file - and provide the -kc argument + Expected result + 1. One config manager created + 2. Warning about inappropriate use of -kc + Other test cases can be developed. + """ + caplog.set_level(logging.WARNING) + args = cli_tools.get_parsed_args() + caplog.clear() + main.config_runner(args) + # assert warning is seen + assert caplog.messages == [ + "--kube-completions or -kc specified, which requires a config directory, however a specific config file is provided." + ] + # assert that only one config manager object is present + assert len(mock_run.call_args.args[1]) == 1 + + +@mock.patch("data_validation.__main__.run_validations") +@mock.patch( + "data_validation.__main__.build_config_managers_from_yaml", + return_value=["config dict from one file"], +) +@mock.patch( + "argparse.ArgumentParser.parse_args", + return_value=argparse.Namespace(**CONFIG_RUNNER_ARGS_2), +) +def test_config_runner_2(mock_args, mock_build, mock_run, caplog): + + """Second test - run validation on a directory - and provide the -kc argument, + but not running in a Kubernetes Completion Configuration. Expected result + 1. Multiple (3) config manager created for validation + 2. Warning about inappropriate use of -kc""" + caplog.set_level(logging.WARNING) + args = cli_tools.get_parsed_args() + caplog.clear() + main.config_runner(args) + # assert warning is seen + assert caplog.messages == [ + "--kube-completions or -kc specified, however not running in Kubernetes Job completion, check your command line." + ] + # assert that 3 config managers are present + assert len(mock_run.call_args.args[1]) == 3 + + +@mock.patch("data_validation.__main__.run_validations") +@mock.patch( + "data_validation.__main__.build_config_managers_from_yaml", + return_value=["config dict from one file"], +) +@mock.patch( + "argparse.ArgumentParser.parse_args", + return_value=argparse.Namespace(**CONFIG_RUNNER_ARGS_3), +) +def test_config_runner_3(mock_args, mock_build, mock_run, caplog): + """Second test - run validation on a directory - and provide the -kc argument, + have system believe it is running in a Kubernetes Completion Environment. Expected result + 1. No warnings + 2. run validation called as though config file is provided (config_dir is None) + 3. run validation config file name corresponds to value of JOB_COMPLETION_INDEX + 4. One config manager created for validation + """ + caplog.set_level(logging.WARNING) + os.environ["JOB_COMPLETION_INDEX"] = "2" + args = cli_tools.get_parsed_args() + caplog.clear() + main.config_runner(args) + # assert no warnings + assert caplog.messages == [] + # assert that only one config manager and one validation corresponding to JOB_COMPLETION_INDEX is set. + assert mock_run.call_args.args[0].config_dir is None + assert os.path.basename(mock_run.call_args.args[0].config_file) == "0002.yaml" + assert len(mock_run.call_args.args[1]) == 1