diff --git a/README.md b/README.md index 3e0fba27..2fd8e63a 100644 --- a/README.md +++ b/README.md @@ -435,7 +435,7 @@ For example, this flag can be used as follows: Running DVT with YAML configuration files is the recommended approach if: * you want to customize the configuration for any given validation OR -* you want to run DVT at scale (i.e. row validations across many partitions) +* you want to run DVT at scale (i.e. run multiple validations sequentially or in parallel) We recommend generating YAML configs with the `--config-file ` flag when running a validation command, which supports GCS and local paths. @@ -451,7 +451,7 @@ data-validation (--verbose or -v) (--log-level or -ll) configs run [--dry-run or -dr] If this flag is present, prints the source and target SQL generated in lieu of running the validation. [--kube-completions or -kc] Flag to indicate usage in Kubernetes index completion mode. - See *Scaling DVT to run 10's to 1000's of validations concurrently* section + See *Scaling DVT* section ``` ``` @@ -469,19 +469,17 @@ View the complete YAML file for a Grouped Column validation on the [Examples](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/docs/examples.md#sample-yaml-config-grouped-column-validation) page. -#### Scaling DVT to run 10's to 1000's of validations concurrently +### Scaling DVT -As described above, you can run multiple validation configs sequentially with the `--config-dir` flag. To optimize the validation speed for large tables, you can run DVT concurrently using GKE Jobs ([Google Kubernetes Jobs](https://cloud.google.com/kubernetes-engine/docs/how-to/deploying-workloads-overview#batch_jobs)) or [Cloud Run Jobs](https://cloud.google.com/run/docs/create-jobs). If you are not familiar with Kubernetes or Cloud Run Jobs, see [Scaling DVT with Kubernetes](docs/internal/kubernetes_jobs.md) for a detailed overview. +You can scale DVT for large table validations by running the tool in a distributed manner. To optimize the validation speed for large tables, you can use GKE Jobs ([Google Kubernetes Jobs](https://cloud.google.com/kubernetes-engine/docs/how-to/deploying-workloads-overview#batch_jobs)) or [Cloud Run Jobs](https://cloud.google.com/run/docs/create-jobs). If you are not familiar with Kubernetes or Cloud Run Jobs, see [Scaling DVT with Distributed Jobs](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/docs/internal/distributed_jobs.md) for a detailed overview. -In order to validate partitions concurrently, both the `--kube-completions` and `--config-dir` flags are required. The `--kube-completions` flag specifies that the validation is being run in indexed completion mode in Kubernetes or as multiple independent tasks in Cloud Run. The `--config-dir` flag will specify the directory with the YAML files to be executed in parallel. If you used `generate-table-partitions` to generate the YAMLs, this would be the directory where the partition files numbered `0000.yaml` to `.yaml` are stored i.e (`my_config_dir/source_schema.source_table/`) -In order to run DVT in a container, you have to build a Docker image - [see instructions here](https://github.com/GoogleCloudPlatform/professional-services-data-validator/tree/develop/samples/docker#readme). You will also need to set the `PSO_DV_CONN_HOME` environment variable to point to a GCS directory where the connection configuration files are stored. In Cloud Run you can use the option `--set-env-vars` or `--update-env-vars` to pass [the environment variable](https://cloud.google.com/run/docs/configuring/services/environment-variables#setting). We recommend that you use the `bq-result-handler` to save your validation results to BigQuery. +We recommend first generating table partitions with the `generate-table-partitions` command for your large tables. Then, use Cloud Run or GKE to distribute validating each chunk in parallel. See the [Cloud Run Jobs Quickstart sample](https://github.com/GoogleCloudPlatform/professional-services-data-validator/tree/develop/samples/cloud_run_jobs) to get started. -The Cloud Run and Kubernetes pods must run in a network with access to the database servers. Every Cloud Run job is associated with a [service account](https://cloud.google.com/run/docs/securing/service-identity). You need to ensure that this service account has access to Google Cloud Storage (to read connection configuration and YAML files) and BigQuery (to publish results). If you are using Kubernetes, you will need to use a service account with the same privileges as mentioned for Cloud Run. In Kubernetes, you will need to set up [workload identity](https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity) so the DVT container can impersonate the service account. +When running DVT in a distributed fashion, both the `--kube-completions` and `--config-dir` flags are required. The `--kube-completions` flag specifies that the validation is being run in indexed completion mode in Kubernetes or as multiple independent tasks in Cloud Run. If the `-kc` option is used and you are not running in indexed mode, you will receive a warning and the container will process all the validations sequentially. If the `-kc` option is used and a config directory is not provided (i.e. a `--config-file` is provided instead), a warning is issued. -In Cloud Run, the [job](https://cloud.google.com/run/docs/create-jobs) must be run as multiple, independent tasks with the task count set to the number of partitions generated. In Kubernetes, set the number of completions to the number of partitions generated - see [Kubernetes Parallel Jobs](https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs). The option `--kube-completions or -kc` tells DVT that many DVT containers are running in a Kubernetes cluster. Each DVT container only validates the specific partition YAML (based on the index assigned by Kubernetes control plane). If the `-kc` option is used and you are not running in indexed mode, you will receive a warning and the container will process all the validations sequentially. If the `-kc` option is used and a config directory is not provided (a `--config-file` is provided instead), a warning is issued. +The `--config-dir` flag will specify the directory with the YAML files to be executed in parallel. If you used `generate-table-partitions` to generate the YAMLs, this would be the directory where the partition files numbered `0000.yaml` to `.yaml` are stored i.e (`gs://my_config_dir/source_schema.source_table/`). When creating your Cloud Run Job, set the number of tasks equal to the number of table partitions so the task index matches the YAML file to be validated. When executed, each Cloud Run task will validate a partition in parallel. -By default, each partition validation is retried up to 3 times if there is an error. In Kubernetes and Cloud Run, you can set the parallelism to the number you want. Keep in mind that if you are validating 1000's of partitions in parallel, you may find that setting the parallelism too high (say 100) may result in timeouts and slow down the validation. ### Validation Reports diff --git a/docs/examples.md b/docs/examples.md index 962e09c7..36a7f0b7 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -51,10 +51,10 @@ data-validation generate-table-partitions \ --primary-keys tree_id \ --hash '*' \ --filters 'tree_id>3000' \ - -cdir partitions_dir \ + --config-dir partitions_dir \ --partition-num 200 ```` -Above command creates multiple partitions based on `--partition-key`. Number of generated configuration files is decided by `--partition-num` +Above command creates multiple partitions based on the primary key. Number of generated configuration files is decided by `--partition-num` #### Run COUNT validations for all columns ````shell script diff --git a/docs/internal/kubernetes_jobs.md b/docs/internal/distributed_jobs.md similarity index 72% rename from docs/internal/kubernetes_jobs.md rename to docs/internal/distributed_jobs.md index 855a9321..13303c8b 100644 --- a/docs/internal/kubernetes_jobs.md +++ b/docs/internal/distributed_jobs.md @@ -1,22 +1,28 @@ -# Scaling Data Validation with Kubernetes Jobs +# Scaling Data Validation with Distributed Jobs ## Nature of Data Validation -Data Validation by nature is a batch process. We are presented with a set of arguments, the validation is performed, the results are provided and Data Validation completes. Data Validation can also take time (multiple secs, minutes) if a large amount of data needs to be validated. -Data Validation has the `generate-table-partitions` function that partitions a row validation into a specified number of smaller, equally sized validations. Using this feature, validation of large tables can be split into row validations of many partitions of the same tables. See [partition table PRD](partition_table_prd.md) for details on partitioning. This process generates a sequence of yaml files which can be used to validate the individual partitions. +Data Validation by nature is a batch process. We are presented with a set of arguments, the validation is performed, the results are provided and Data Validation completes. Data Validation can also take time (multiple secs, minutes) if a large amount of data needs to be validated. + +Data Validation has the `generate-table-partitions` function that partitions a row validation into a specified number of smaller, equally sized validations. Using this feature, validation of large tables can be split into row validations of many partitions of the same tables. See [partition table PRD](partition_table_prd.md) for details on partitioning. This process generates a sequence of yaml files which can be used to validate the individual partitions. + +## GKE and Cloud Run Jobs -## Kubernetes Workloads Kubernetes supports different types of workloads including a few batch workload types. The Job workload is a batch workload that retries execution until a specified number of them successfully complete. If a row validation has been split into `n` partitions, then we need to validate each partition and merge the results of the validation. Using Kubernetes Jobs we need to successfully run `n` completions of the job, as long as we guarantee that each completion is associated with a different partition. Since release 1.21, Kubernetes provides a type of job management called indexed completions that supports the Data Validation use case. A Kubernetes job can use multiple parallel worker processes. Each worker process has an index number that the control plane sets which identifies which part of the overall task (i.e. which partition) to work on. Cloud Run uses slightly different terminology for a similar model - referring to each job completion as a task and all the tasks together as the job. The index is available in the environment variable `JOB_COMPLETION_INDEX` (in Cloud Run the environment variable is `CLOUD_RUN_TASK_INDEX`). An explanation of this is provided in [Introducing Indexed Jobs](https://kubernetes.io/blog/2021/04/19/introducing-indexed-jobs/#:~:text=Indexed%20%3A%20the%20Job%20is%20considered,and%20the%20JOB_COMPLETION_INDEX%20environment%20variable). -Indexed completion mode supports partitioned yaml files generated by the `generate-table-partitions` command if each worker process runs only the yaml file corresponding to its index. The`--kube-completions` or `-kc` flag when running configs from a directory indicates that the validation is running in indexed jobs mode on Kubernetes or Cloud Run. This way, each container only processes the specific validation yaml file corresponding to its index. +Indexed completion mode supports partitioned yaml files generated by the `generate-table-partitions` command if each worker process runs only the yaml file corresponding to its index. The`--kube-completions` or `-kc` flag when running configs from a directory indicates that the validation is running in indexed jobs mode on Kubernetes or Cloud Run. This way, each container only processes the specific validation yaml file corresponding to its index. ### Passing database connection parameters + Usually, DVT stores database connection parameters in the `$HOME/.config/google-pso-data-validator` directory. If the environment variable `PSO_DV_CONN_HOME` is set, the database connection information can be fetched from this location. When running DVT in a container, there are two options regarding where to locate the database connection parameters. -1. Build the container image with the database connection parameters. This is not recommended because the container image is specific to the databases being used OR -2. Build a DVT container image as specified in the [Docker samples directory](../../samples/docker/README.md). When running the container, pass the `PSO_DV_CONN_HOME` environment variable to point to a GCS location containing the connection configuration. This is the suggested approach. -## Future Work -### Use GCP secret manager to store database connection configuration -DVT currently uses the GCP secret manager to secure each element of the database connection configuration. The current implementation stores the name of the secret for each element (host, port, user, password etc) in the database configuration file. When the connection needs to be made, the values are retrieved from the secret manager and used to connect to the database. While this is secure, database configurations are still stored in the filesystem. +1. Build the container image with the database connection parameters. This is not recommended because the container image would be limited to the databases being used. +1. Build a DVT container image as specified in the [Cloud Run Jobs samples directory](../../samples/cloud_run_jobs/README.md). When creating a job, specify the `PSO_DV_CONN_HOME` environment variable to point to a GCS location containing the connection configuration. This is the suggested approach. + +## Future Work + +### Use GCP Secret Manager to store database connection configuration + +DVT currently uses the GCP Secret Manager to secure each element of the database connection configuration. The current implementation stores the name of the secret for each element (host, port, user, password etc) in the database configuration file. When the connection needs to be made, the values are retrieved from the Secret Manager and used to connect to the database. While this is secure, connection configurations referencing secrets are still stored in the file system. -Another way to use the secret manager is to store the complete connection configuration as the value of the secret. Then while specifying the connection, we can also indicate to DVT to look for the connection configuration in the secret manager rather than the file system. Since most DVT commands connect to one or more databases, nearly every DVT command can take optional arguments that tells DVT to look for the connection configuration in the secret manager. \ No newline at end of file +Another way to use the Secret Manager is to store the complete connection configuration as the value of the secret. Then, while specifying the connection, DVT can look for the connection configuration in the Secret Manager rather than the file system. diff --git a/docs/internal/partition_table_prd.md b/docs/internal/partition_table_prd.md index fce3149d..08eda43a 100644 --- a/docs/internal/partition_table_prd.md +++ b/docs/internal/partition_table_prd.md @@ -1,21 +1,29 @@ -# Partition Table +# Partitioned Tables Design -## Why Partition ? -Data Validation Tool performs row validation by comparing every row in the source database with the corresponding row in the target database. Since the comparison is done in memory, all the rows have to be in memory. Databases typically have a large number of rows that don't all fit in memory, therefore Data Validation Tool can run into MemoryError and fail. One way to address this error is to partition the source and target table into several corresponding partitions. Then validate the corresponding source and target partitions one at a time (either sequentially or in parallel). If the validation is performed in parallel on multiple VMs or containers, this approach has the added benefit of speeding up data validation. There are other approaches available to address the MemoryError - see future work below. Column validation and schema validation do not bring each row of the table into memory. Therefore, they do not run into MemoryErrors. +## Why Partition? + +The Data Validation Tool performs row validation by comparing every row in the source database with the corresponding row in the target database. Since the comparison is done in memory, this can create constraints with large tables. One way to address this error is to partition the source and target table into several corresponding partitions. Then, validate the source and target partitions one at a time (either sequentially or in parallel). If the validation is performed in parallel on multiple VMs or containers, this approach has the added benefit of speeding up data validation. This primarily applies to row validation, as column and schema validations don't bring each row into memory. ## How to partition? -Data Validation Tool matches rows based on the specified primary key(s). If we split up the table into small enough partitions, then each partition can be validated without running into MemoryError. This can be depicted pictorially as shown below: + +Data Validation Tool matches rows based on the specified primary key(s). If we split up the table into small enough partitions, then each partition can be validated without running into memory errors. This can be depicted pictorially as shown below: ![Alt text](./partition_picture.png?raw=true "Title") Here, both tables are sorted by primary key(s). The blue line denotes the first row of each partition. With tables partitioned as shown, the complete tables can be row validated by concatenating the results of validating each partition source table against the corresponding partition of the target table. + ### What filter clause to use? + Data Validation Tool has an option in the row validation command for a `--filters` parameter which allows a WHERE statement to be applied to the query used to validate. This parameter can be used to partition the table. The filter clauses are: + 1. For the first partition, it is every row with primary key(s) value smaller than the first row of second partition -2. For the last partition, it is every row with primary key(s) value larger than or equal to the the first row of last partition. -3. For all other partitions, it is every row with primary key(s) value larger than or equal to to the first row of the current partition *AND* with primary key(s) values less than the first row of the next partition. +1. For the last partition, it is every row with primary key(s) value larger than or equal to the the first row of last partition. +1. For all other partitions, it is every row with primary key(s) value larger than or equal to to the first row of the current partition *AND* with primary key(s) values less than the first row of the next partition. + ### How to calculate the first row of each partition? -The first version of generate partitions used the NTILE function. Unfortunately, Teradata does not have the NTILE function. Most every database has a ROWNUMBER() function which assigns a row number to each row. This function can be used to generate equal sized partitions. Let the rows be numbered from 0 to count where count is the number of rows in the table. Let us assume we want to partition the table in n equal sized partitions. The partition number associated with a row is its _ceiling(rownumber * n / count)_. We specifically only need to identify first element of the partition. The first element of a paritition is the one whose remainder of _(rownumber * n) % count_ is _> 0 and <= n_. The following SQL statement gets the value of the primary key(s) for the first row of each partition: -``` -SELECT + +The first version of generate partitions used the NTILE function. Unfortunately, Teradata does not have the NTILE function. Almost every database has a ROWNUMBER() function which assigns a row number to each row. This function can be used to generate equal sized partitions. Let the rows be numbered from 0 to count where count is the number of rows in the table. Let us assume we want to partition the table in n equal sized partitions. The partition number associated with a row is its _ceiling(rownumber * n / count)_. We specifically only need to identify first element of the partition. The first element of a partition is the one whose remainder of _(rownumber * n) % count_ is _> 0 and \<= n_. The following SQL statement gets the value of the primary key(s) for the first row of each partition: + +```sql +SELECT primary_key_1, primary_key_2, .... @@ -29,10 +37,13 @@ SELECT WHERE ((row_num * n) % count > 0) and ((row_num *n) % count <=n) ORDER BY row_num ASC; ``` -### How to generate the where clauses -Once we have the first row of each partition, we have to generate the where clauses for each partition in the source and target tables. The best way may be to generate the ibis table expression including the provided filter clause and the additional filter clause from the first rows we have calculated. We can then have _ibis_ `to_sql` convert the table expression into plain text, extract the where clause and use that. _ibis_ depends on _sqlalchemy_, which has a bug in that it does not support rendering date and timestamps by `to_sql` for versions of _sqlalchemy_ prior to 2.0. Until we migrate to using _sqlalchemy_ 2.0, we may not be able to support dates and timestamps as a primary key column. + +### How to generate the WHERE clauses + +Once we have the first row of each partition, we have to generate the where clauses for each partition in the source and target tables. We generate the Ibis table expression including the provided filter clause and the additional filter clause from the first rows we have calculated. We can then have _ibis_ `to_sql` convert the table expression into plain text, extract the where clause and use that. _ibis_ depends on _sqlalchemy_, which has a bug in that it does not support rendering date and timestamps by `to_sql` for versions of _sqlalchemy_ prior to 2.0. Until we migrate to using _sqlalchemy_ 2.0, we may not be able to support dates and timestamps as a primary key column. + ## Future Work -### How many partitions do I need? -Partition table requires that the user decide on the number of partitions into which they need to divide the table to avoid MemoryError. Data Validation Tool can run on different VMs with different shapes, so the number of partitions depends on the amount of memory available. How does the user figure out the number of partitions they need? Right now, it is by trial and error, say start with 10, then try 100, try to see if 50 still results in MemoryError etc. This is not optimal. Python's `psutil` package has a function [virtual_memory()](https://psutil.readthedocs.io/en/latest/#psutil.virtual_memory) which tell us the total and available memory. `generate-table-partitions` is provided with all the parameters used in `validate row`, and the memory grows linearly to the number of rows being validated. `generate-table-partitions` can bring say 10,000 rows into memory as though performing a row validation. Using the virtual_memory() function in `psutil`, `generate-table-partitions` can estimate the number of rows that will fit in memory for row validation. Since we can calculate the total number of rows, we can estimate the number of partitions needed. This may need some experimentation, as we may need to allow for memory usage by other functions/objects in Data Validation Tool. -### Can Data Validation Tool run without MemoryError? -The above paragraph suggests that Data Validation Tool can bring in a limited number of rows into memory at a time, perform row validation and avoid MemoryError. This is certainly possible and is complicated. If every row in the source has a corresponding row in the target (and vice versa), and the source and target table are sorted, then `validate row` can read a fixed number of rows from both source and target tables into memory and perform row validation and repeat until all rows have been processed. There may not be a corresponding row in the target for every row in the source, so the target may have additional rows in memory for which the corresponding rows in the source table are in the next partition of rows. Therefore validating in this situation without MemoryError can be challenging. `generate-table-partitions` allows parallelization of data validation, so it is a useful function to have even if Data Validation Tool is modified to validate rows without MemoryError. + +### Recommending partition numbers + +The `generate-table-partitions` command requires that the user provide the number of partitions to create. Currently, the user needs to decide the partition number by trial and error based on the compute resources available. Python's `psutil` package has a function [virtual_memory()](https://psutil.readthedocs.io/en/latest/#psutil.virtual_memory) which tells us the total and available memory. `generate-table-partitions` is provided with all the parameters used in `validate row`, and the memory grows linearly to the number of rows being validated. `generate-table-partitions` can bring say 10,000 rows into memory as though performing a row validation. Using the virtual_memory() function in `psutil`, `generate-table-partitions` can estimate the number of rows that will fit in memory for row validation. Since we can calculate the total number of rows, we can estimate the number of partitions needed. This may need some experimentation, as we may need to allow for memory usage by other functions/objects in the Data Validation Tool. diff --git a/samples/functions/README.md b/samples/cloud_functions/README.md similarity index 100% rename from samples/functions/README.md rename to samples/cloud_functions/README.md diff --git a/samples/functions/deploy.sh b/samples/cloud_functions/deploy.sh similarity index 100% rename from samples/functions/deploy.sh rename to samples/cloud_functions/deploy.sh diff --git a/samples/functions/main.py b/samples/cloud_functions/main.py similarity index 100% rename from samples/functions/main.py rename to samples/cloud_functions/main.py diff --git a/samples/functions/requirements.txt b/samples/cloud_functions/requirements.txt similarity index 100% rename from samples/functions/requirements.txt rename to samples/cloud_functions/requirements.txt diff --git a/samples/run/Dockerfile b/samples/cloud_run/Dockerfile similarity index 100% rename from samples/run/Dockerfile rename to samples/cloud_run/Dockerfile diff --git a/samples/run/README.md b/samples/cloud_run/README.md similarity index 99% rename from samples/run/README.md rename to samples/cloud_run/README.md index 1cf1f1bb..9b42d928 100644 --- a/samples/run/README.md +++ b/samples/cloud_run/README.md @@ -1,4 +1,4 @@ -# Data Validation on Cloud Run +# Data Validation on Cloud Run (Service) ### Quick Steps diff --git a/samples/run/deploy.sh b/samples/cloud_run/deploy.sh similarity index 100% rename from samples/run/deploy.sh rename to samples/cloud_run/deploy.sh diff --git a/samples/run/main.py b/samples/cloud_run/main.py similarity index 100% rename from samples/run/main.py rename to samples/cloud_run/main.py diff --git a/samples/cloud_run_jobs/Dockerfile b/samples/cloud_run_jobs/Dockerfile new file mode 100644 index 00000000..12e29c76 --- /dev/null +++ b/samples/cloud_run_jobs/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.9.0-slim + +RUN apt-get update \ + && apt-get install -y git gcc \ + && apt-get clean +RUN pip install --upgrade pip +RUN git clone https://github.com/GoogleCloudPlatform/professional-services-data-validator.git +WORKDIR professional-services-data-validator +RUN pip install . +ENTRYPOINT ["python","-m","data_validation"] diff --git a/samples/cloud_run_jobs/README.md b/samples/cloud_run_jobs/README.md new file mode 100644 index 00000000..9a45bc15 --- /dev/null +++ b/samples/cloud_run_jobs/README.md @@ -0,0 +1,89 @@ +# Distributed Data Validation with Cloud Run Jobs + +This is an example of distributed DVT usage using [Cloud Run Jobs](https://cloud.google.com/run/docs/create-jobs). Distributed DVT usage alleviates memory constraints when running large table validations. In this sample, you will first generate table partitions with the `generate-table-partitions` command, which will partition your large table into smaller pieces with filters. Read more on generating partitions [here](https://github.com/GoogleCloudPlatform/professional-services-data-validator?#generate-table-partitions-for-large-table-row-validations).The Cloud Run Job can then distribute each partition's YAML configuration as a Cloud Run Task in parallel. + +## Quickstart + +### Build a Docker Image + +You will need to build a Docker image to be used by your Cloud Run Job with DVT installed. + +``` +export PROJECT_ID= +gcloud builds submit --tag gcr.io/${PROJECT_ID}/data-validation \ + --project=${PROJECT_ID} +``` + +### Store connections in Cloud Storage + +Store your connections in GCS so they will be accessible to your Cloud Run Job. + +``` +export PSO_DV_CONN_HOME= +data-validation connections add --connection-name bq BigQuery --project-id ${PROJECT_ID} +``` + +The `PSO_DV_CONN_HOME` environment variable will indicate that you want your connection files stored and retrieved from GCS automatically. Read more about it [here](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/docs/connections.md#gcs-connection-management-recommended). + +### Generate Table Partition YAMLs in GCS + +Generate table partitions for the large table you want to validate. In this example, we will use the public table `bigquery-public-data.new_york_trees.tree_census_2015` +and validate the table against itself. + +First, create a GCS path to store the YAML configs for this table. + +``` +export CONFIG_GCS_BUCKET_NAME= +export CONFIG_GCS_BUCKET_LOCATION= # i.e. 'us-central1' +gcloud storage buckets create gs://${CONFIG_GCS_BUCKET_NAME} + --location=${CONFIG_GCS_BUCKET_LOCATION} +``` + +Next, generate the table partitions and store the YAMLs in the GCS bucket you created. + +``` +export BQ_RESULT_TABLE= # i.e. 'project.dataset.table' +data-validation generate-table-partitions \ + -sc bq \ + -tc bq \ + -tbls bigquery-public-data.new_york_trees.tree_census_2015 \ + --primary-keys tree_id \ + --hash '*' \ + --config-dir "gs://${CONFIG_GCS_BUCKET_NAME}/" \ + --partition-num 50 \ + -bqrh ${BQ_RESULT_TABLE} +``` + +The `generate-table-partitions` command will create a folder named `bigquery-public-data.new_york_trees.tree_census_2015` within your GCS bucket populated with the 50 YAML files. + +### Create a Cloud Run Job + +First, ensure you have the [correct permissions](https://cloud.google.com/run/docs/create-jobs#iam_permissions_required_to_create_and_execute) to create and +execute Cloud Run Jobs. Also, make sure that the Cloud Run service account has access to Google Cloud Storage (to read connection configuration and YAML files) and BigQuery (to publish results). + +Set the number of tasks to the number of partitions you created. In this example, we created 50 partitions. + +By default, each partition validation is retried up to 3 times if there is an error. Keep in mind that if you are validating 1000's of partitions in parallel, you may find that setting the parallelism too high (say 100) may result in timeouts and slow down the validation. + +``` +export JOB_NAME= +export REGION= # i.e us-central1 +gcloud run jobs create ${JOB_NAME} + --image gcr.io/${PROJECT_ID}/data-validation + --tasks 50 --max-retries 2 --parallelism 15 + --set-env-vars PSO_DV_CONN_HOME=${PSO_DV_CONN_HOME} + --args "-ll, WARNING, configs,run,-kc,-cdir,gs://${CONFIG_GCS_BUCKET_NAME}/bigquery-public-data.new_york_trees.tree_census_2015" + --region ${REGION} +``` + +We set the `--log-level (-ll)` flag to 'WARNING' to prevent logging validation results to stdout as well as to BigQuery. The `--kube-completions (-kc)` flag indicates you are running in Kubernetes or Cloud Run and signals DVT to only run the validation corresponding to the `CLOUD_RUN_TASK_INDEX` environment variable that is set by Cloud Run. + +See the full list of supported flags for the `gcloud run jobs create` command [here](https://cloud.google.com/sdk/gcloud/reference/run/jobs/create). + +### Execute the Cloud Run Job + +Finally, execute the Cloud Run Job and see your validation results in your BigQuery results table. + +``` +gcloud run jobs execute ${JOB_NAME} --region ${REGION} +``` diff --git a/samples/run/test.py b/samples/run/test.py deleted file mode 100644 index 8c263cfd..00000000 --- a/samples/run/test.py +++ /dev/null @@ -1,70 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import re -import requests - -PROJECT_ID = os.environ.get("PROJECT_ID") - -DESCRIBE_SERVICE = """ -gcloud run services describe {service_name} --region=us-central1 --project={project_id} -""" - - -def get_token(): - with os.popen("gcloud auth print-identity-token") as cmd: - token = cmd.read().strip() - - return token - - -def get_cloud_run_url(service_name, project_id): - describe_service = DESCRIBE_SERVICE.format( - service_name=service_name, project_id=project_id - ) - with os.popen(describe_service) as service: - description = service.read() - - return re.findall("URL:.*\n", description)[0].split()[1].strip() - -# You can get the JSON content specific for your scenario by using our CLI and providing the argument to generate the JSON config file [`--config-file-json` or `-cj .json`]. -# IMPORTANT: do not forget to make the necessary adjustments between JSON and Python objects, check this link as a reference: https://python-course.eu/applications-python/json-and-python.php. -data = { - "source_conn": { - "source_type": "BigQuery", - "project_id": PROJECT_ID, - }, - "target_conn": { - "source_type": "BigQuery", - "project_id": PROJECT_ID, - }, - "type": "Column", - "schema_name": "bigquery-public-data.new_york_citibike", - "table_name": "citibike_stations", - "target_schema_name": "bigquery-public-data.new_york_citibike", - "target_table_name": "citibike_stations", - "aggregates": [ - { - "source_column": None, - "target_column": None, - "field_alias": "count", - "type": "count", - } - ], -} - -url = get_cloud_run_url("data-validation", PROJECT_ID) -res = requests.post(url, headers={"Authorization": "Bearer " + get_token()}, json=data) -print(res.content.decode())