Skip to content

Commit

Permalink
feat: Added Partition support to generate multiple YAML config files (#…
Browse files Browse the repository at this point in the history
…653) (Issue #619,#662)

Features:
1. New command 'generate-table-partitions' added to generate partitions for `row` type validation
2. --partition-num: Number of partitions/config files to create.
    Range=[1,1000]
    If specified value is greater than count(*), value if coalesced to count(*)
3. --config-dir: Directory Path to store YAML Config Files. Either local or GCS path can be supplied
5. Added required arguments group to distinguish from optional arguments
6. Added mutually exclusive arguments group for --hash and --concat
7. --partition-key: Column on which the partitions would be generated. Column type must be integer. Defaults to Primary key

Tests:
Added unit tests for partition_builder.py, provides coverage for partition_row_builder.py

README.md & examples.md:
1. Added description for usage of 'generate-table-partitions' command
2. Added examples
  • Loading branch information
mohdt786 committed Jan 25, 2023
1 parent 37f5cad commit f79c308
Show file tree
Hide file tree
Showing 9 changed files with 1,124 additions and 16 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ terraform.rc

# Custom
*.yaml
partitions_dir

# Test temp files
source_table_data.json
target_table_data.json
source-sql.sql

# Commit msg
commits.txt
test.py
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,50 @@ data-validation (--verbose or -v) (--log-level or -ll) validate row
[--filter-status or -fs STATUSES_LIST]
Comma separated list of statuses to filter the validation results. Supported statuses are (success, fail). If no list is provided, all statuses are returned.
```
#### Generate Table Partitions for Large Table Row Validations

Below is the command syntax for generating table partitions in order to perform row validations on large tables with memory constraints.

The command generates and stores multiple YAML configs that represent chunks of the large table using filters (`WHERE partition_key > X AND partition_key < Y`). You can then run the configs in the directory serially with the `data-validation configs run --config-dir PATH` command as described [here](https://github.com/GoogleCloudPlatform/professional-services-data-validator#yaml-configuration-files).

The command takes the same parameters as required for `Row Validation` *plus* few commands to implement the partitioning logic.

(Note: As of now, only monotonically increasing key is supported for `--partition-key`.)

```
data-validation (--verbose or -v) (--log-level or -ll) generate-table-partitions
--source-conn or -sc SOURCE_CONN
Source connection details
See: *Data Source Configurations* section for each data source
--target-conn or -tc TARGET_CONN
Target connection details
See: *Connections* section for each data source
--tables-list or -tbls SOURCE_SCHEMA.SOURCE_TABLE=TARGET_SCHEMA.TARGET_TABLE
Comma separated list of tables in the form schema.table=target_schema.target_table
Target schema name and table name are optional.
i.e 'bigquery-public-data.new_york_citibike.citibike_trips'
--primary-keys PRIMARY_KEYS, -pk PRIMARY_KEYS
Comma separated list of primary key columns 'col_a,col_b'
--comparison-fields or -comp-fields FIELDS
Comma separated list of columns to compare. Can either be a physical column or an alias
See: *Calculated Fields* section for details
--hash COLUMNS Comma separated list of columns to hash or * for all columns
--concat COLUMNS Comma separated list of columns to concatenate or * for all columns (use if a common hash function is not available between databases)
--config-dir CONFIG_DIR, -cdir CONFIG_DIR
Directory Path to store YAML Config Files
GCS: Provide a full gs:// path of the target directory. Eg: `gs://<BUCKET>/partitions_dir`
Local: Provide a relative path of the target directory. Eg: `partitions_dir`
--partition-num [1-1000], -pn [1-1000]
Number of partitions/config files to generate
In case this value exceeds the row count of the source/target table, its will be decreased to max(source_row_count, target_row_count)
[--partition-key PARTITION_KEY, -partkey PARTITION_KEY]
Column on which the partitions would be generated. Column type must be integer. Defaults to Primary key
[--filters SOURCE_FILTER:TARGET_FILTER]
Colon spearated string values of source and target filters.
If target filter is not provided, the source filter will run on source and target tables.
See: *Filters* section
```
#### Schema Validations

Below is the syntax for schema validations. These can be used to compare case insensitive column names and
Expand Down
52 changes: 37 additions & 15 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import sys

from yaml import Dumper, dump

from argparse import Namespace
from data_validation import (
cli_tools,
clients,
Expand All @@ -28,6 +28,7 @@
)
from data_validation.config_manager import ConfigManager
from data_validation.data_validation import DataValidation
from data_validation.partition_builder import PartitionBuilder

# by default yaml dumps lists as pointers. This disables that feature
Dumper.ignore_aliases = lambda *args: True
Expand All @@ -50,15 +51,7 @@ def _get_arg_config_file(args):
return args.config_file


def _get_arg_config_dir(args):
"""Return String yaml config directory path."""
if not args.config_dir:
raise ValueError("YAML Config Directory was not supplied.")

return args.config_dir


def get_aggregate_config(args, config_manager):
def get_aggregate_config(args, config_manager: ConfigManager):
"""Return list of formated aggregation objects.
Args:
Expand Down Expand Up @@ -217,11 +210,15 @@ def build_config_from_args(args, config_manager):
return config_manager


def build_config_managers_from_args(args):
def build_config_managers_from_args(args: Namespace, validate_cmd: str = None):
"""Return a list of config managers ready to execute."""
configs = []

validate_cmd = args.validate_cmd.capitalize()
# Since `generate-table-partitions` defaults to `validate_cmd=row`,
# `validate_cmd` is passed along while calling this method
if validate_cmd is None:
validate_cmd = args.validate_cmd.capitalize()

if validate_cmd == "Schema":
config_type = consts.SCHEMA_VALIDATION
elif validate_cmd == "Column":
Expand Down Expand Up @@ -492,10 +489,33 @@ def store_yaml_config_file(args, config_managers):
cli_tools.store_validation(config_file_path, yaml_configs)


def run(args):
""" """
config_managers = build_config_managers_from_args(args)
def partition_and_store_config_files(args: Namespace) -> None:
"""Build multiple YAML Config files using user specified partition logic
Args:
args (Namespace): User specified Arguments
Returns:
None
"""
# Default Validate Type
config_managers = build_config_managers_from_args(args, consts.ROW_VALIDATION)
partition_builder = PartitionBuilder(config_managers, args)
partition_builder.partition_configs()


def run(args) -> None:
"""Splits execution into:
1. Build and save single Yaml Config file
2. Run Validations
Args:
args (Namespace): User specified Arguments.
Returns:
None
"""
config_managers = build_config_managers_from_args(args)
if args.config_file:
store_yaml_config_file(args, config_managers)
else:
Expand Down Expand Up @@ -564,6 +584,8 @@ def main():
print(run_raw_query_against_connection(args))
elif args.command == "validate":
validate(args)
elif args.command == "generate-table-partitions":
partition_and_store_config_files(args)
elif args.command == "deploy":
from data_validation import app

Expand Down
180 changes: 179 additions & 1 deletion data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
-c ex_yaml.yaml
data-validation run-config -c ex_yaml.yaml
command:
data-validation
"""

import argparse
Expand All @@ -48,6 +51,7 @@
import logging
import sys
import uuid
from argparse import Namespace

from data_validation import consts, state_manager

Expand Down Expand Up @@ -138,7 +142,7 @@
}


def get_parsed_args():
def get_parsed_args() -> Namespace:
"""Return ArgParser with configured CLI arguments."""
parser = configure_arg_parser()
args = ["--help"] if len(sys.argv) == 1 else None
Expand Down Expand Up @@ -168,9 +172,104 @@ def configure_arg_parser():
_configure_find_tables(subparsers)
_configure_raw_query(subparsers)
_configure_beta_parser(subparsers)
_configure_partition_parser(subparsers)
return parser


def _configure_partition_parser(subparsers):
"""Configure arguments to generate partitioned config files."""
partition_parser = subparsers.add_parser(
"generate-table-partitions",
help=(
"Generate partitions for validation and store the Config files in "
"a directory"
),
)

# Group all optional arguments together
optional_arguments = partition_parser.add_argument_group("optional arguments")
optional_arguments.add_argument(
"--threshold",
"-th",
type=threshold_float,
help="Float max threshold for percent difference",
)
optional_arguments.add_argument(
"--filters",
"-filters",
help="Filters in the format source_filter:target_filter",
)
optional_arguments.add_argument(
"--use-random-row",
"-rr",
action="store_true",
help="Finds a set of random rows of the first primary key supplied.",
)
optional_arguments.add_argument(
"--random-row-batch-size",
"-rbs",
help="Row batch size used for random row filters (default 10,000).",
)
# Keep these in order to support data-validation run command for
# backwards-compatibility
optional_arguments.add_argument("--type", "-t", help="Type of Validation")
optional_arguments.add_argument(
"--result-handler-config", "-rc", help="Result handler config details"
)

"""Configure arguments to generate partitions for row based validation."""

# Group all required arguments together
required_arguments = partition_parser.add_argument_group("required arguments")
required_arguments.add_argument(
"--primary-keys",
"-pk",
required=True,
help="Comma separated list of primary key columns 'col_a,col_b'",
)
required_arguments.add_argument(
"--tables-list",
"-tbls",
required=True,
help=(
"Comma separated tables list in the form "
"'schema.table=target_schema.target_table'"
),
)

# Group for mutually exclusive required arguments. Either must be supplied
mutually_exclusive_arguments = required_arguments.add_mutually_exclusive_group(
required=True
)
mutually_exclusive_arguments.add_argument(
"--hash",
"-hash",
help=(
"Comma separated list of columns for hash 'col_a,col_b' or * for "
"all columns"
),
)
mutually_exclusive_arguments.add_argument(
"--concat",
"-concat",
help=(
"Comma separated list of columns for concat 'col_a,col_b' or * "
"for all columns"
),
)

mutually_exclusive_arguments.add_argument(
"--comparison-fields",
"-comp-fields",
help=(
"Individual columns to compare. If comparing a calculated field use "
"the column alias."
),
)

_add_common_partition_arguments(optional_arguments, required_arguments)


def _configure_beta_parser(subparsers):
"""Configure beta commands for the parser."""
connection_parser = subparsers.add_parser(
Expand Down Expand Up @@ -646,6 +745,79 @@ def _add_common_arguments(parser):
)


def _add_common_partition_arguments(optional_arguments, required_arguments=None):
"""Add all arguments common to get-partition command"""

# Group all Required Arguments together
required_arguments.add_argument(
"--source-conn", "-sc", required=True, help="Source connection name"
)
required_arguments.add_argument(
"--target-conn", "-tc", required=True, help="Target connection name"
)
required_arguments.add_argument(
"--config-dir",
"-cdir",
required=True,
help="Directory Path to store YAML Config Files. "
"GCS: Provide a full gs:// path of the target directory. "
"Eg: `gs://<BUCKET>/partiitons_dir`. "
"Local: Provide a relative path of the target directory. "
"Eg: `partitions_dir`",
)
required_arguments.add_argument(
"--partition-num",
"-pn",
required=True,
help="Number of partitions/config files to generate",
type=int,
choices=range(1, 1001),
metavar="[1-1000]",
)

# Optional arguments
optional_arguments.add_argument(
"--bq-result-handler",
"-bqrh",
help="BigQuery result handler config details",
)
optional_arguments.add_argument(
"--labels", "-l", help="Key value pair labels for validation run"
)
optional_arguments.add_argument(
"--service-account",
"-sa",
help="Path to SA key file for result handler output",
)
optional_arguments.add_argument(
"--format",
"-fmt",
default="table",
help=(
"Set the format for printing command output, Supported formats are "
"(text, csv, json, table). Defaults to table"
),
)
optional_arguments.add_argument(
"--filter-status",
"-fs",
# TODO: update if we start to support other statuses
help=(
"Comma separated list of statuses to filter the validation results. "
"Supported statuses are (success, fail). If no list is provided, "
"all statuses are returned"
),
)
optional_arguments.add_argument(
"--partition-key",
"-partkey",
help=(
"Column on which the partitions would be generated. "
"Column type must be integer. Defaults to Primary key"
),
)


def get_connection_config_from_args(args):
"""Return dict with connection config supplied."""
config = {consts.SOURCE_TYPE: args.connect_type}
Expand Down Expand Up @@ -710,6 +882,12 @@ def store_validation(validation_file_name, yaml_config):
mgr.create_validation_yaml(validation_file_name, yaml_config)


def store_partition(target_file_path, yaml_config, target_folder_path=None):
"""Store the partition YAML config under the given name."""
mgr = state_manager.StateManager(target_folder_path)
mgr.create_partition_yaml(target_file_path, yaml_config)


def get_validation(validation_name, config_dir=None):
"""Return validation YAML for a specific connection."""
if config_dir:
Expand Down
Loading

0 comments on commit f79c308

Please sign in to comment.