Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Issue619 determine the method for generating batches efficiently #653

Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
7d99ebf
init
mohdt786 Dec 15, 2022
80c0f8b
init
mohdt786 Dec 15, 2022
032f883
init
mohdt786 Dec 16, 2022
bf95ea4
init
mohdt786 Dec 16, 2022
2703427
Added Function: build_and_store_config_files
mohdt786 Dec 18, 2022
1dedce3
feat: Added Partition support to generate multiple yaml config files
mohdt786 Dec 21, 2022
7863f67
Merge branch 'develop' into issue619-determine-the-method-for-generat…
mohdt786 Dec 21, 2022
09e2132
fix: Added Partition support to generate multiple yaml config files
mohdt786 Dec 21, 2022
524db2d
testing and linting
mohdt786 Dec 21, 2022
259c548
fix: linting and test
mohdt786 Dec 21, 2022
96cf91d
fix: Added Partition support to generate multiple yaml config files
mohdt786 Dec 21, 2022
b6255bc
feat: Added Partition support to generate multiple yaml config files
mohdt786 Dec 21, 2022
4bf5f4f
fix: removed type hint for method - get_pandas_df
mohdt786 Dec 21, 2022
58a7a00
linting and fixes
mohdt786 Dec 22, 2022
07333d8
type hints and lint
mohdt786 Dec 22, 2022
32fd216
fix: Raise value error for:
mohdt786 Dec 22, 2022
b5145b8
Merge branch 'develop' into issue619-determine-the-method-for-generat…
mohdt786 Dec 28, 2022
da69e8b
fix: Updated arguments and argument parser
mohdt786 Dec 28, 2022
99bb79c
feat: Added Partition support to generate multiple yaml config files
mohdt786 Jan 4, 2023
1e28a4a
Linting and TypeHint added
mohdt786 Jan 4, 2023
2b2e76a
fix: Added Warning logs and Documentation
mohdt786 Jan 4, 2023
1af3f35
fix: removed unwanted Docs
mohdt786 Jan 4, 2023
45e51b9
fix: Added --tables-list to required for generate-partitions row command
mohdt786 Jan 6, 2023
57549b3
Merge branch 'develop' into issue619-determine-the-method-for-generat…
mohdt786 Jan 6, 2023
c34bd8a
Merge branch 'develop' into issue619-determine-the-method-for-generat…
mohdt786 Jan 11, 2023
d208a18
refactor: Removed ROW command and added desc to README.md
mohdt786 Jan 16, 2023
06bb8e3
feat: Added support to save partition configs to GCS bucket
mohdt786 Jan 18, 2023
654061f
Merge branch 'develop' into issue619-determine-the-method-for-generat…
mohdt786 Jan 18, 2023
e031ca5
fix: fixed path conflicts
mohdt786 Jan 18, 2023
fda93a4
fix: re-wrote logs which are used in tests/unit/test_cli_tools.py::te…
mohdt786 Jan 18, 2023
ffba13e
test: Added test cases for partition_builder.py
mohdt786 Jan 19, 2023
20d9556
fix: extra line adjustment in consts.py
mohdt786 Jan 19, 2023
040d801
fix: updated source/target partition_key datatype check to support py…
mohdt786 Jan 19, 2023
4c2e330
Added test cases for storing partitions in local dir
mohdt786 Jan 20, 2023
56e4354
Merge branch 'develop' into issue619-determine-the-method-for-generat…
mohdt786 Jan 20, 2023
35d99bc
lint: tests/unit/test_partition_builder.py
mohdt786 Jan 20, 2023
936421f
Added test inputs to tests/unit/test_inputs/test_partition_builder.json
mohdt786 Jan 24, 2023
bb82253
lint: tests/unit.test_partition_builder.py
mohdt786 Jan 24, 2023
4fc5a55
fix: Reduced test input size
mohdt786 Jan 24, 2023
e531d8e
fix: Added logging before writing partitions
mohdt786 Jan 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ terraform.rc

# Custom
*.yaml
partitions_dir

# Test temp files
source_table_data.json
target_table_data.json
source-sql.sql

# Commit msg
commits.txt
test.py
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,50 @@ data-validation (--verbose or -v) (--log-level or -ll) validate row
[--filter-status or -fs STATUSES_LIST]
Comma separated list of statuses to filter the validation results. Supported statuses are (success, fail). If no list is provided, all statuses are returned.
```
#### Generate Table Partitions for Large Table Row Validations

Below is the command syntax for generating table partitions in order to perform row validations on large tables with memory constraints.

The command generates and stores multiple YAML configs that represent chunks of the large table using filters (`WHERE partition_key > X AND partition_key < Y`). You can then run the configs in the directory serially with the `data-validation configs run --config-dir PATH` command as described [here](https://github.com/GoogleCloudPlatform/professional-services-data-validator#yaml-configuration-files).

The command takes the same parameters as required for `Row Validation` *plus* few commands to implement the partitioning logic.

(Note: As of now, only monotonically increasing key is supported for `--partition-key`.)

```
data-validation (--verbose or -v) (--log-level or -ll) generate-table-partitions

--source-conn or -sc SOURCE_CONN
Source connection details
See: *Data Source Configurations* section for each data source
--target-conn or -tc TARGET_CONN
Target connection details
See: *Connections* section for each data source
--tables-list or -tbls SOURCE_SCHEMA.SOURCE_TABLE=TARGET_SCHEMA.TARGET_TABLE
Comma separated list of tables in the form schema.table=target_schema.target_table
Target schema name and table name are optional.
i.e 'bigquery-public-data.new_york_citibike.citibike_trips'
--primary-keys PRIMARY_KEYS, -pk PRIMARY_KEYS
Comma separated list of primary key columns 'col_a,col_b'
--comparison-fields or -comp-fields FIELDS
Comma separated list of columns to compare. Can either be a physical column or an alias
See: *Calculated Fields* section for details
--hash COLUMNS Comma separated list of columns to hash or * for all columns
--concat COLUMNS Comma separated list of columns to concatenate or * for all columns (use if a common hash function is not available between databases)
--config-dir CONFIG_DIR, -cdir CONFIG_DIR
Directory Path to store YAML Config Files
nehanene15 marked this conversation as resolved.
Show resolved Hide resolved
GCS: Provide a full gs:// path of the target directory. Eg: `gs://<BUCKET>/partitions_dir`
Local: Provide a relative path of the target directory. Eg: `partitions_dir`
--partition-num [1-1000], -pn [1-1000]
Number of partitions/config files to generate
In case this value exceeds the row count of the source/target table, its will be decreased to max(source_row_count, target_row_count)
[--partition-key PARTITION_KEY, -partkey PARTITION_KEY]
Column on which the partitions would be generated. Column type must be integer. Defaults to Primary key
[--filters SOURCE_FILTER:TARGET_FILTER]
Colon spearated string values of source and target filters.
If target filter is not provided, the source filter will run on source and target tables.
See: *Filters* section
```
#### Schema Validations

Below is the syntax for schema validations. These can be used to compare case insensitive column names and
Expand Down
52 changes: 37 additions & 15 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import sys

from yaml import Dumper, dump

from argparse import Namespace
from data_validation import (
cli_tools,
clients,
Expand All @@ -28,6 +28,7 @@
)
from data_validation.config_manager import ConfigManager
from data_validation.data_validation import DataValidation
from data_validation.partition_builder import PartitionBuilder

# by default yaml dumps lists as pointers. This disables that feature
Dumper.ignore_aliases = lambda *args: True
Expand All @@ -50,15 +51,7 @@ def _get_arg_config_file(args):
return args.config_file


def _get_arg_config_dir(args):
"""Return String yaml config directory path."""
if not args.config_dir:
raise ValueError("YAML Config Directory was not supplied.")

return args.config_dir


def get_aggregate_config(args, config_manager):
def get_aggregate_config(args, config_manager: ConfigManager):
"""Return list of formated aggregation objects.

Args:
Expand Down Expand Up @@ -217,11 +210,15 @@ def build_config_from_args(args, config_manager):
return config_manager


def build_config_managers_from_args(args):
def build_config_managers_from_args(args: Namespace, validate_cmd: str = None):
"""Return a list of config managers ready to execute."""
configs = []

validate_cmd = args.validate_cmd.capitalize()
# Since `generate-table-partitions` defaults to `validate_cmd=row`,
# `validate_cmd` is passed along while calling this method
if validate_cmd is None:
validate_cmd = args.validate_cmd.capitalize()

if validate_cmd == "Schema":
config_type = consts.SCHEMA_VALIDATION
elif validate_cmd == "Column":
Expand Down Expand Up @@ -492,10 +489,33 @@ def store_yaml_config_file(args, config_managers):
cli_tools.store_validation(config_file_path, yaml_configs)


def run(args):
""" """
config_managers = build_config_managers_from_args(args)
def partition_and_store_config_files(args: Namespace) -> None:
"""Build multiple YAML Config files using user specified partition logic

Args:
args (Namespace): User specified Arguments

Returns:
None
"""
# Default Validate Type
config_managers = build_config_managers_from_args(args, consts.ROW_VALIDATION)
partition_builder = PartitionBuilder(config_managers, args)
partition_builder.partition_configs()


def run(args) -> None:
"""Splits execution into:
1. Build and save single Yaml Config file
2. Run Validations

Args:
args (Namespace): User specified Arguments.

Returns:
None
"""
config_managers = build_config_managers_from_args(args)
if args.config_file:
store_yaml_config_file(args, config_managers)
else:
Expand Down Expand Up @@ -564,6 +584,8 @@ def main():
print(run_raw_query_against_connection(args))
elif args.command == "validate":
validate(args)
elif args.command == "generate-table-partitions":
partition_and_store_config_files(args)
elif args.command == "deploy":
from data_validation import app

Expand Down
180 changes: 179 additions & 1 deletion data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
-c ex_yaml.yaml

data-validation run-config -c ex_yaml.yaml

command:
data-validation
"""

import argparse
Expand All @@ -48,6 +51,7 @@
import logging
import sys
import uuid
from argparse import Namespace

from data_validation import consts, state_manager

Expand Down Expand Up @@ -138,7 +142,7 @@
}


def get_parsed_args():
def get_parsed_args() -> Namespace:
"""Return ArgParser with configured CLI arguments."""
parser = configure_arg_parser()
args = ["--help"] if len(sys.argv) == 1 else None
Expand Down Expand Up @@ -168,9 +172,104 @@ def configure_arg_parser():
_configure_find_tables(subparsers)
_configure_raw_query(subparsers)
_configure_beta_parser(subparsers)
_configure_partition_parser(subparsers)
return parser


def _configure_partition_parser(subparsers):
"""Configure arguments to generate partitioned config files."""
partition_parser = subparsers.add_parser(
"generate-table-partitions",
help=(
"Generate partitions for validation and store the Config files in "
"a directory"
),
)

# Group all optional arguments together
optional_arguments = partition_parser.add_argument_group("optional arguments")
optional_arguments.add_argument(
mohdt786 marked this conversation as resolved.
Show resolved Hide resolved
"--threshold",
"-th",
type=threshold_float,
help="Float max threshold for percent difference",
)
optional_arguments.add_argument(
"--filters",
"-filters",
help="Filters in the format source_filter:target_filter",
)
optional_arguments.add_argument(
"--use-random-row",
"-rr",
action="store_true",
help="Finds a set of random rows of the first primary key supplied.",
)
optional_arguments.add_argument(
"--random-row-batch-size",
"-rbs",
help="Row batch size used for random row filters (default 10,000).",
)
# Keep these in order to support data-validation run command for
# backwards-compatibility
optional_arguments.add_argument("--type", "-t", help="Type of Validation")
optional_arguments.add_argument(
"--result-handler-config", "-rc", help="Result handler config details"
)

"""Configure arguments to generate partitions for row based validation."""

# Group all required arguments together
required_arguments = partition_parser.add_argument_group("required arguments")
required_arguments.add_argument(
"--primary-keys",
"-pk",
required=True,
help="Comma separated list of primary key columns 'col_a,col_b'",
)
required_arguments.add_argument(
"--tables-list",
"-tbls",
required=True,
help=(
"Comma separated tables list in the form "
"'schema.table=target_schema.target_table'"
),
)

# Group for mutually exclusive required arguments. Either must be supplied
mutually_exclusive_arguments = required_arguments.add_mutually_exclusive_group(
required=True
)
mutually_exclusive_arguments.add_argument(
"--hash",
"-hash",
help=(
"Comma separated list of columns for hash 'col_a,col_b' or * for "
"all columns"
),
)
mutually_exclusive_arguments.add_argument(
"--concat",
"-concat",
help=(
"Comma separated list of columns for concat 'col_a,col_b' or * "
"for all columns"
),
)

mutually_exclusive_arguments.add_argument(
"--comparison-fields",
"-comp-fields",
help=(
"Individual columns to compare. If comparing a calculated field use "
"the column alias."
),
)

_add_common_partition_arguments(optional_arguments, required_arguments)


def _configure_beta_parser(subparsers):
"""Configure beta commands for the parser."""
connection_parser = subparsers.add_parser(
Expand Down Expand Up @@ -646,6 +745,79 @@ def _add_common_arguments(parser):
)


def _add_common_partition_arguments(optional_arguments, required_arguments=None):
"""Add all arguments common to get-partition command"""

# Group all Required Arguments together
required_arguments.add_argument(
"--source-conn", "-sc", required=True, help="Source connection name"
)
required_arguments.add_argument(
"--target-conn", "-tc", required=True, help="Target connection name"
)
required_arguments.add_argument(
"--config-dir",
"-cdir",
required=True,
help="Directory Path to store YAML Config Files. "
"GCS: Provide a full gs:// path of the target directory. "
"Eg: `gs://<BUCKET>/partiitons_dir`. "
mohdt786 marked this conversation as resolved.
Show resolved Hide resolved
"Local: Provide a relative path of the target directory. "
"Eg: `partitions_dir`",
)
required_arguments.add_argument(
"--partition-num",
"-pn",
required=True,
help="Number of partitions/config files to generate",
type=int,
choices=range(1, 1001),
metavar="[1-1000]",
)

# Optional arguments
optional_arguments.add_argument(
mohdt786 marked this conversation as resolved.
Show resolved Hide resolved
"--bq-result-handler",
"-bqrh",
help="BigQuery result handler config details",
)
optional_arguments.add_argument(
"--labels", "-l", help="Key value pair labels for validation run"
)
optional_arguments.add_argument(
"--service-account",
"-sa",
help="Path to SA key file for result handler output",
)
optional_arguments.add_argument(
"--format",
"-fmt",
default="table",
help=(
"Set the format for printing command output, Supported formats are "
"(text, csv, json, table). Defaults to table"
),
)
optional_arguments.add_argument(
"--filter-status",
"-fs",
# TODO: update if we start to support other statuses
help=(
"Comma separated list of statuses to filter the validation results. "
"Supported statuses are (success, fail). If no list is provided, "
"all statuses are returned"
),
)
optional_arguments.add_argument(
"--partition-key",
"-partkey",
help=(
"Column on which the partitions would be generated. "
"Column type must be integer. Defaults to Primary key"
),
)


def get_connection_config_from_args(args):
"""Return dict with connection config supplied."""
config = {consts.SOURCE_TYPE: args.connect_type}
Expand Down Expand Up @@ -710,6 +882,12 @@ def store_validation(validation_file_name, yaml_config):
mgr.create_validation_yaml(validation_file_name, yaml_config)


def store_partition(target_file_path, yaml_config, target_folder_path=None):
"""Store the partition YAML config under the given name."""
mgr = state_manager.StateManager(target_folder_path)
mgr.create_partition_yaml(target_file_path, yaml_config)


def get_validation(validation_name, config_dir=None):
"""Return validation YAML for a specific connection."""
if config_dir:
Expand Down
Loading