Skip to content

Commit

Permalink
feat: Support for Kubernetes (#1058)
Browse files Browse the repository at this point in the history
* Update partition_table_prd.md potential way to address #923 and #950

* Fixed bug with specifying a config file in GCS - this did not work earlier
Added functionality to support Kubernetes Indexed jobs - which when provided with a directory will only run the job corresponding to the index.
Tested in a non Kubernetes setup

* Added a Implementation Note on how to scale DVT with Kubernetes
Shortened the option to 2 character code -kc

* Linted files and added unit tests for config_runner changes.

* Updated README on how to run multiple instances concurrently.

* Updated README.md

* Some Lint fixes

* Updated tests to mock build_config_managers_from_yaml.

* Fixed reference to directory.

* Update README.md

Co-authored-by: Neha Nene <[email protected]>

* Update README.md

Co-authored-by: Neha Nene <[email protected]>

* Update README.md

Co-authored-by: Neha Nene <[email protected]>

* Update docs/internal/kubernetes_jobs.md

Co-authored-by: Neha Nene <[email protected]>

* Updated docs

* Update README.md

Co-authored-by: Neha Nene <[email protected]>

* Some more doc changes.

* Final changes ?

* Final typos

---------

Co-authored-by: Neha Nene <[email protected]>
  • Loading branch information
sundar-mudupalli-work and nehanene15 committed Jan 8, 2024
1 parent c2b660b commit fdbdbe0
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 10 deletions.
34 changes: 30 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ data-validation (--verbose or -v) (--log-level or -ll) validate custom-query col
See: *Validation Reports* section
[--service-account or -sa PATH_TO_SA_KEY]
Service account to use for BigQuery result handler output.
[--config-file or -c CONFIG_FILE]
YAML Config File Path to be used for storing validations.
[--labels or -l KEY1=VALUE1,KEY2=VALUE2]
Comma-separated key value pair labels for the run.
[--format or -fmt] Format for stdout output. Supported formats are (text, csv, json, table).
Expand Down Expand Up @@ -375,6 +377,8 @@ data-validation (--verbose or -v) (--log-level or -ll) validate custom-query row
See: *Validation Reports* section
[--service-account or -sa PATH_TO_SA_KEY]
Service account to use for BigQuery result handler output.
[--config-file or -c CONFIG_FILE]
YAML Config File Path to be used for storing validations.
[--labels or -l KEY1=VALUE1,KEY2=VALUE2]
Comma-separated key value pair labels for the run.
[--format or -fmt] Format for stdout output. Supported formats are (text, csv, json, table).
Expand Down Expand Up @@ -411,12 +415,14 @@ For example, this flag can be used as follows:
}
```

### YAML Configuration Files
### Running DVT with YAML Configuration Files

You can customize the configuration for any given validation by providing use
case specific CLI arguments or editing the YAML configuration file.
Running DVT with YAML configuration files is the recommended approach if:
* you want to customize the configuration for any given validation OR
* you want to run DVT at scale (i.e. row validations across many partitions)
Nearly every validation command can take the argument `-c <file-name>` (the `generate-table-partitions` command takes `-cdir <directory-name>`) where one or more yaml files are produced. These yaml files can be modified for custom validations.

For example, the following command creates a YAML file for the validation of the
The following command creates a YAML file for the validation of the
`new_york_citibike` table: `data-validation validate column -sc my_bq_conn -tc
my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips -c
citibike.yaml`.
Expand Down Expand Up @@ -446,6 +452,26 @@ View the complete YAML file for a Grouped Column validation on the
[Examples](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/docs/examples.md#sample-yaml-config-grouped-column-validation) page.

You can view a list of all saved validation YAML files using `data-validation configs list`, and print a YAML config using `data-validation configs get -c citibike.yaml`.
#### Scaling DVT to run 10's to 1000's of validations concurrently
The above example `configs run -cdir` shows how you can run multiple validations with one command. If the directory used has been created by `generate-table-partitions`, you will have partition files numbered from `0000.yaml` to `<partno-1>.yaml`, where `<partno>` is the total number of partitions. These can be run as mentioned above and the partitions will be validated in order. This could take a long time if the number of partitions is large.

DVT validations can be run concurrently (horizontal scaling) using GKE (Kubernetes Jobs) or Cloud Run Jobs. In order to run DVT in a container, you have to build a docker image, [see instructions](https://github.com/GoogleCloudPlatform/professional-services-data-validator/tree/develop/samples/docker#readme). Set the `PSO_DV_CONFIG_HOME` environment variable to point to a GCS prefix where the connection configuration files are stored. In Cloud Run you can use the option `--set-env-vars` or `--update-env-vars` to pass [the environment variable](https://cloud.google.com/run/docs/configuring/services/environment-variables#setting). We recommend that you use the `bq-result-handler` to save your validation results. In order to validate partitions concurrently, run DVT in Kubernetes or Cloud Run as shown below:
```
data-validation (--verbose or -v) (--log-level or -ll) configs run
[--kube-completions or -kc] Specifies that validation is being run in Kubernetes or Cloud Run in indexed completion mode.
specifies to DVT that validation being run in indexed completion mode in Kubernetes or as multiple independent tasks in Cloud Run.
--config-dir or -cdir GCS_DIRECTORY
where GCS_DIRECTORY = CONFIG_DIR/SOURCE_SCHEMA.SOURCE_TABLE, where CONFIG_DIR, SOURCE_SCHEMA and SOURCE_TABLE are
the arguments provided to generate-table-partitions to generate the partition yamls. GCS_DIRECTORY is the directory
where the partition files numbered 0000.yaml to <partno-1>.yaml are stored.
```
If you are not familiar with Kubernetes or Cloud Run Jobs, review the [internal docs](docs/internal/kubernetes_jobs.md) for an overview of the orchestration of multiple validations.

The Cloud Run and Kubernetes pods must run in a network with access to the database servers. Every Cloud Run job is associated with a [service account](https://cloud.google.com/run/docs/securing/service-identity). You need to ensure that this service account has access to Google Cloud Storage (to read connection configuration and yaml files) and BigQuery (to publish results). If you are using Kubernetes, you will need to use a service account with the same privileges as mentioned for Cloud Run. In Kubernetes, you will need to set up [workload identity](https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity) so the DVT container can impersonate the service account.

In Cloud Run, the [job](https://cloud.google.com/run/docs/create-jobs) must be run as multiple, independent tasks with the task count set to the number of partitions generated. In Kubernetes, set the number of completions to the number of partitions generated - see [Kubernetes Parallel Jobs](https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs). The option `--kube-completions or -kc` tells DVT that many DVT containers are running in a Kubernetes cluster. Each DVT container only validates the specific partition yaml (based on the index assigned by Kubernetes control plane). If the `-kc` option is used and you are not running in indexed mode, you will receive a warning and the container will process all the validations sequentially. If the `-kc` option is used and a config directory is not provided (a `--config-file` is provided instead), a warning is issued.

By default, each partition validation is retried up to 3 times if there is an error. In Kubernetes and Cloud Run, you can set the parallelism to the number you want. Keep in mind that if you are validating 1000's of partitions in parallel, you may find that setting the parallelism too high (say 100) may result in timeouts and slow down the validation.

### Validation Reports

Expand Down
49 changes: 43 additions & 6 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,14 +314,51 @@ def build_config_managers_from_args(


def config_runner(args):
"""Config Runner is where the decision is made to run validations from one or more files.
One file can produce multiple validations - for example when more than one set of tables are being validated
between the source and target. If multiple files are to be run and if the associated configuration files
are numbered sequentially, say from '0000.yaml' to '0012.yaml' (for 13 validations),
it is possible to run them concurrently in a Kubernetes / Cloud Run environment.
If the user wants that, they need to specify a -kc or --kube-completions which tells
DVT to only run the validation corresponding to the index number provided in the
JOB_COMPLETION_INDEX (for Kubernetes) or CLOUD_RUN_TASK_INDEX (for Cloud Run) environment
variable. This environment variable is set by the Kubernetes/Cloud Run container orchestrator.
The orchestrator spins up containers to complete each validation, one at a time.
"""
if args.config_dir:
mgr = state_manager.StateManager(file_system_root_path=args.config_dir)
config_file_names = mgr.list_validations_in_dir(args.config_dir)

config_managers = []
for file in config_file_names:
config_managers.extend(build_config_managers_from_yaml(args, file))
if args.kube_completions and (
("JOB_COMPLETION_INDEX" in os.environ.keys())
or ("CLOUD_RUN_TASK_INDEX" in os.environ.keys())
):
# Running in Kubernetes in Job completions - only run the yaml file corresponding to index
job_index = (
int(os.environ.get("JOB_COMPLETION_INDEX"))
if "JOB_COMPLETION_INDEX" in os.environ.keys()
else int(os.environ.get("CLOUD_RUN_TASK_INDEX"))
)
config_file_path = (
f"{args.config_dir}{job_index:04d}.yaml"
if args.config_dir.endswith("/")
else f"{args.config_dir}/{job_index:04d}.yaml"
)
setattr(args, "config_dir", None)
setattr(args, "config_file", config_file_path)
config_managers = build_config_managers_from_yaml(args, config_file_path)
else:
if args.kube_completions:
logging.warning(
"--kube-completions or -kc specified, however not running in Kubernetes Job completion, check your command line."
)
mgr = state_manager.StateManager(file_system_root_path=args.config_dir)
config_file_names = mgr.list_validations_in_dir(args.config_dir)
config_managers = []
for file in config_file_names:
config_managers.extend(build_config_managers_from_yaml(args, file))
else:
if args.kube_completions:
logging.warning(
"--kube-completions or -kc specified, which requires a config directory, however a specific config file is provided."
)
config_file_path = _get_arg_config_file(args)
config_managers = build_config_managers_from_yaml(args, config_file_path)

Expand Down
6 changes: 6 additions & 0 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,12 @@ def _configure_validation_config_parser(subparsers):
"-cdir",
help="Directory path containing YAML Config Files to be used for running validations.",
)
run_parser.add_argument(
"--kube-completions",
"-kc",
action="store_true",
help="When validating multiple table partitions generated by generate-table-partitions, using DVT in Kubernetes in index completion mode use this flag so that all the validations are completed",
)

get_parser = configs_subparsers.add_parser(
"get", help="Get and print a validation config"
Expand Down
23 changes: 23 additions & 0 deletions docs/internal/kubernetes_jobs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Scaling Data Validation with Kubernetes Jobs

## Nature of Data Validation
Data Validation by nature is a batch process. We are presented with a set of arguments, the validation is performed, the results are provided and Data Validation completes. Data Validation can also take time (multiple secs, minutes) if a large amount of data needs to be validated.

Data Validation has the `generate-table-partitions` function that partitions a row validation into a specified number of smaller, equally sized validations. Using this feature, validation of large tables can be split into row validations of many partitions of the same tables. See [partition table PRD](partition_table_prd.md) for details on partitioning. This process generates a sequence of yaml files which can be used to validate the individual partitions.

## Kubernetes Workloads
Kubernetes supports different types of workloads including a few batch workload types. The Job workload is a batch workload that retries execution until a specified number of them successfully complete. If a row validation has been split into `n` partitions, then we need to validate each partition and merge the results of the validation. Using Kubernetes Jobs we need to successfully run `n` completions of the job, as long as we guarantee that each completion is associated with a different partition. Since release 1.21, Kubernetes provides a type of job management called indexed completions that supports the Data Validation use case. A Kubernetes job can use multiple parallel worker processes. Each worker process has an index number that the control plane sets which identifies which part of the overall task (i.e. which partition) to work on. Cloud Run uses slightly different terminology for a similar model - referring to each job completion as a task and all the tasks together as the job. The index is available in the environment variable `JOB_COMPLETION_INDEX` (in cloud run the environment variable is `CLOUD_RUN_TASK_INDEX`). An explanation of this is provided in [Introducing Indexed Jobs](https://kubernetes.io/blog/2021/04/19/introducing-indexed-jobs/#:~:text=Indexed%20%3A%20the%20Job%20is%20considered,and%20the%20JOB_COMPLETION_INDEX%20environment%20variable).

Indexed completion mode supports partitioned yaml files generated by the `generate-table-partitions` command if each worker process runs only the yaml file corresponding to its index. The`--kube-completions` or `-kc` flag when running configs from a directory indicates that the validation is running in indexed jobs mode on Kubernetes or Cloud Run. This way, each container only processes the specific validation yaml file corresponding to its index.
### Passing database connection parameters
Usually, DVT stores database connection parameters in the `$HOME/.config/google-pso-data-validator` directory. If the environment variable `PSO_DV_CONFIG_HOME` is set, the database connection information can be fetched from this location. When running DVT in a container, there are two options regarding where to locate the database connection parameters.
1. Build the container image with the database connection parameters. This is not recommended because the container image is specific to the databases being used OR
2. Build a DVT container image as specified in the [docker samples directory](../../samples/docker/README.md). When running the container, pass the `PSO_DV_CONFIG_HOME` environment variable to point to a GCS location containing the connection configuration. This is the suggested approach.

## Future Work
### Clean up the way DVT works so that config files can be stored either in the file system or GCS.
In some cases, the command `configs run` can take a GCS location where the yaml files can be found. In other cases, this has to be specified by setting the `PSO_DV_CONFIG_HOME` environment variable to point to the GCS location. This inconsistent behavior is challenging and should be fixed.
### Use GCP secret manager to store database connection configuration
DVT currently uses the GCP secret manager to secure each element of the database connection configuration. The current implementation stores the name of the secret for each element (host, port, user, password etc) in the database configuration file. When the connection needs to be made, the values are retrieved from the secret manager and used to connect to the database. While this is secure, database configurations are still stored in the filesystem.

Another way to use the secret manager is to store the complete connection configuration as the value of the secret. Then while specifying the connection, we can also indicate to DVT to look for the connection configuration in the secret manager rather than the file system. Since most DVT commands connect to one or more databases, nearly every DVT command can take optional arguments that tells DVT to look for the connection configuration in the secret manager.
Loading

0 comments on commit fdbdbe0

Please sign in to comment.