Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: first class support for row level hashing #345

Merged
merged 33 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
570afed
adding scaffolding for calc field builder in config manager
renzokuken Nov 17, 2021
ef3f916
exposing cast via calculated fields. Don't know if we necessarily nee…
renzokuken Nov 24, 2021
c9c212f
diff check
renzokuken Dec 22, 2021
25aed20
config file generating as expected
renzokuken Dec 28, 2021
6140822
Merge branch 'develop' into issue300-make-hashing-user-friendly
renzokuken Dec 28, 2021
ea7f8e7
expanding cli for row level validations
renzokuken Jan 5, 2022
0358193
splitting out comparison fields from aggregates
renzokuken Jan 6, 2022
abd82af
row comparisons operational (sort of)
renzokuken Jan 6, 2022
e09dcc7
re-enabling aggregate validations
renzokuken Jan 7, 2022
2852b15
cohabitation of validation types!
renzokuken Jan 7, 2022
fb372f6
merge develop
renzokuken Jan 7, 2022
8695edb
figuring out why unit tests are borked
renzokuken Jan 11, 2022
f67e1c2
continuing field split
renzokuken Jan 13, 2022
6c3a919
stash before merge
renzokuken Jan 13, 2022
81b97bb
testing diff
renzokuken Jan 20, 2022
4647bae
tests passing
renzokuken Jan 24, 2022
8ed194b
removing extra print statements
renzokuken Jan 27, 2022
a7afbbc
tests and lint
renzokuken Jan 28, 2022
f3e5be8
adding fail tests
renzokuken Jan 29, 2022
312add8
first round of requested changes
renzokuken Feb 1, 2022
2901fbb
change requests round two.
renzokuken Feb 1, 2022
7e9644f
refactor CLI and lint
renzokuken Feb 3, 2022
7581cfc
swapping out farm fingerprint for sha256 as default
renzokuken Feb 10, 2022
1e0deb6
changes per CR
renzokuken Feb 11, 2022
2949597
fixing text result tests
renzokuken Feb 14, 2022
af887ff
adding docs
renzokuken Feb 16, 2022
6b638bc
hash example
renzokuken Feb 22, 2022
efd2966
linting
renzokuken Feb 22, 2022
8c20e31
think I found the broken test
renzokuken Feb 22, 2022
daa26be
merge conflict
renzokuken Feb 23, 2022
59b76ba
fixed tests
renzokuken Feb 23, 2022
bcc7990
setting default for depth length
renzokuken Feb 23, 2022
0feb788
relaxing system test
renzokuken Feb 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,51 @@ sum , min, etc.) is provided, the default aggregation will run.
The [Examples](docs/examples.md) page provides many examples of how a tool can
used to run powerful validations without writing any queries.

#### Row Validations

Below is the command syntax for row validations. In order to run row level
validations you need to pass a `--primary-key` flag which defines what field(s)
the validation will be compared along, as well as a `--comparison-fields` flag
which specifies the values (e.g. columns) whose raw values will be compared
based on the primary key join. Additionally you can use
[Calculated Fields](#calculated-fields) to compare derived values such as string
counts and hashes of multiple columns.

```
data-validation (--verbose or -v) validate row
--source-conn or -sc SOURCE_CONN
Source connection details
See: *Data Source Configurations* section for each data source
--target-conn or -tc TARGET_CONN
Target connection details
See: *Connections* section for each data source
--tables-list or -tbls SOURCE_SCHEMA.SOURCE_TABLE=TARGET_SCHEMA.TARGET_TABLE
Comma separated list of tables in the form schema.table=target_schema.target_table
Target schema name and table name are optional.
i.e 'bigquery-public-data.new_york_citibike.citibike_trips'
[--primary-keys or -pk PRIMARY_KEYS]
Comma separated list of columns to use as primary keys
[--comparison-fields or -fields comparison-fields]
Comma separated list of columns to compare. Can either be a physical column or an alias
See: *Calculated Fields* section for details
[--hash COLUMNS] Comma separated list of columns to perform a hash operation on or * for all columns
[--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE]
BigQuery destination for validation results. Defaults to stdout.
See: *Validation Reports* section
[--service-account or -sa PATH_TO_SA_KEY]
Service account to use for BigQuery result handler output.
[--filters SOURCE_FILTER:TARGET_FILTER]
Colon spearated string values of source and target filters.
If target filter is not provided, the source filter will run on source and target tables.
See: *Filters* section
[--config-file or -c CONFIG_FILE]
YAML Config File Path to be used for storing validations.
[--labels or -l KEY1=VALUE1,KEY2=VALUE2]
Comma-separated key value pair labels for the run.
[--format or -fmt] Format for stdout output. Supported formats are (text, csv, json, table).
Defaults to table.
```

#### Schema Validations

Below is the syntax for schema validations. These can be used to compare column
Expand Down Expand Up @@ -285,6 +330,12 @@ Grouped Columns contain the fields you want your aggregations to be broken out
by, e.g. `SELECT last_updated::DATE, COUNT(*) FROM my.table` will produce a
resultset that breaks down the count of rows per calendar date.

### Comparison Fields

For row validations you need to specify the specific columns that you want to
compare. These values will be compared via a JOIN on their corresponding primary
key and will be evaluated for an exact match.

### Calculated Fields

Sometimes direct comparisons are not feasible between databases due to
Expand Down
82 changes: 61 additions & 21 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import os

import logging
import json
from yaml import dump, load, Dumper, Loader

Expand All @@ -28,6 +27,9 @@
from data_validation.config_manager import ConfigManager
from data_validation.data_validation import DataValidation

# by default yaml dumps lists as pointers. This disables that feature
Dumper.ignore_aliases = lambda *args: True
renzokuken marked this conversation as resolved.
Show resolved Hide resolved


def _get_arg_config_file(args):
"""Return String yaml config file path."""
Expand Down Expand Up @@ -78,30 +80,70 @@ def get_aggregate_config(args, config_manager):
aggregate_configs += config_manager.build_config_column_aggregates(
"max", col_args, consts.NUMERIC_DATA_TYPES
)
if args.bit_xor:
col_args = None if args.bit_xor == "*" else cli_tools.get_arg_list(args.bit_xor)
aggregate_configs += config_manager.build_config_column_aggregates(
"bit_xor", col_args, consts.NUMERIC_DATA_TYPES
)
return aggregate_configs


def get_calculated_config(args, config_manager):
"""Return list of formatted calculated objects.

Args:
config_manager(ConfigManager): Validation config manager instance.
"""
calculated_configs = []
fields = []
if args.hash:
fields = config_manager._build_dependent_aliases("hash")
max_depth = max([x["depth"] for x in fields])
for field in fields:
calculated_configs.append(
config_manager.build_config_calculated_fields(
field["reference"],
field["calc_type"],
field["name"],
field["depth"],
None,
)
)
if args.hash:
config_manager.append_comparison_fields(
config_manager.build_config_comparison_fields(
["hash__all"], depth=max_depth
)
)
return calculated_configs


def build_config_from_args(args, config_manager):
"""Return config manager object ready to execute.

Args:
config_manager (ConfigManager): Validation config manager instance.
"""
config_manager.append_aggregates(get_aggregate_config(args, config_manager))
if args.primary_keys and not args.grouped_columns:
if not args.grouped_columns and not config_manager.use_random_rows():
logging.warning(
"No Grouped columns or Random Rows specified, ignoring primary keys."
config_manager.append_calculated_fields(get_calculated_config(args, config_manager))
if config_manager.validation_type == consts.COLUMN_VALIDATION:
renzokuken marked this conversation as resolved.
Show resolved Hide resolved
config_manager.append_aggregates(get_aggregate_config(args, config_manager))
if args.grouped_columns is not None:
grouped_columns = cli_tools.get_arg_list(args.grouped_columns)
config_manager.append_query_groups(
config_manager.build_config_grouped_columns(grouped_columns)
)
if args.grouped_columns:
grouped_columns = cli_tools.get_arg_list(args.grouped_columns)
config_manager.append_query_groups(
config_manager.build_config_grouped_columns(grouped_columns)
)
if args.primary_keys:
primary_keys = cli_tools.get_arg_list(args.primary_keys, default_value=[])
elif config_manager.validation_type == consts.ROW_VALIDATION:
if args.comparison_fields is not None:
comparison_fields = cli_tools.get_arg_list(
args.comparison_fields, default_value=[]
)
config_manager.append_comparison_fields(
config_manager.build_config_comparison_fields(comparison_fields)
)
if args.primary_keys is not None:
primary_keys = cli_tools.get_arg_list(args.primary_keys)
config_manager.append_primary_keys(
config_manager.build_config_grouped_columns(primary_keys)
config_manager.build_config_comparison_fields(primary_keys)
)

# TODO(GH#18): Add query filter config logic
Expand All @@ -118,11 +160,9 @@ def build_config_managers_from_args(args):
if validate_cmd == "Schema":
config_type = consts.SCHEMA_VALIDATION
elif validate_cmd == "Column":
# TODO: We need to discuss how GroupedColumn and Row are differentiated.
if args.grouped_columns:
config_type = consts.GROUPED_COLUMN_VALIDATION
else:
config_type = consts.COLUMN_VALIDATION
config_type = consts.COLUMN_VALIDATION
elif validate_cmd == "Row":
config_type = consts.ROW_VALIDATION
else:
raise ValueError(f"Unknown Validation Type: {validate_cmd}")
else:
Expand All @@ -140,7 +180,7 @@ def build_config_managers_from_args(args):

# Schema validation will not accept filters, labels, or threshold as flags
filter_config, labels, threshold = [], [], 0.0
if config_type != consts.SCHEMA_VALIDATION:
if config_type != consts.COLUMN_VALIDATION:
if args.filters:
filter_config = cli_tools.get_filters(args.filters)
if args.threshold:
Expand Down Expand Up @@ -369,7 +409,7 @@ def run_connections(args):

def validate(args):
""" Run commands related to data validation."""
if args.validate_cmd == "column" or args.validate_cmd == "schema":
if args.validate_cmd in ["column", "row", "schema"]:
run(args)
else:
raise ValueError(f"Validation Argument '{args.validate_cmd}' is not supported")
Expand Down
122 changes: 86 additions & 36 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
["port", "Teradata port to connect on"],
["user_name", "User used to connect"],
["password", "Password for supplied user"],
["logmech", "Log on mechanism"],
],
"Oracle": [
["host", "Desired Oracle host"],
Expand Down Expand Up @@ -241,41 +240,6 @@ def _configure_run_parser(subparsers):
"-tbls",
help="Comma separated tables list in the form 'schema.table=target_schema.target_table'",
)
run_parser.add_argument(
"--count",
"-count",
help="Comma separated list of columns for count 'col_a,col_b' or * for all columns",
)
run_parser.add_argument(
"--sum",
"-sum",
help="Comma separated list of columns for sum 'col_a,col_b' or * for all columns",
)
run_parser.add_argument(
"--avg",
"-avg",
help="Comma separated list of columns for avg 'col_a,col_b' or * for all columns",
)
run_parser.add_argument(
"--min",
"-min",
help="Comma separated list of columns for min 'col_a,col_b' or * for all columns",
)
run_parser.add_argument(
"--max",
"-max",
help="Comma separated list of columns for max 'col_a,col_b' or * for all columns",
)
run_parser.add_argument(
"--grouped-columns",
"-gc",
help="Comma separated list of columns to use in GroupBy 'col_a,col_b'",
)
run_parser.add_argument(
"--primary-keys",
"-pk",
help="Comma separated list of primary key columns 'col_a,col_b'",
)
run_parser.add_argument(
"--result-handler-config", "-rc", help="Result handler config details"
)
Expand All @@ -290,6 +254,11 @@ def _configure_run_parser(subparsers):
run_parser.add_argument(
"--labels", "-l", help="Key value pair labels for validation run",
)
run_parser.add_argument(
"--hash",
"-hash",
help="Comma separated list of columns for hash 'col_a,col_b' or * for all columns",
)
run_parser.add_argument(
"--service-account",
"-sa",
Expand Down Expand Up @@ -380,12 +349,70 @@ def _configure_validate_parser(subparsers):
)
_configure_column_parser(column_parser)

row_parser = validate_subparsers.add_parser("row", help="Run a row validation")
_configure_row_parser(row_parser)

schema_parser = validate_subparsers.add_parser(
"schema", help="Run a schema validation"
)
_configure_schema_parser(schema_parser)


def _configure_row_parser(row_parser):
"""Configure arguments to run row level validations."""
_add_common_arguments(row_parser)
row_parser.add_argument(
"--hash",
"-hash",
help="Comma separated list of columns for hash 'col_a,col_b' or * for all columns",
)
row_parser.add_argument(
"--comparison-fields",
"-comp-fields",
help="Individual columns to compare. If comparing a calculated field use the column alias.",
)
row_parser.add_argument(
"--calculated-fields",
"-calc-fields",
help="list of calculated fields to generate.",
)
row_parser.add_argument(
"--primary-keys",
"-pk",
help="Comma separated list of primary key columns 'col_a,col_b'",
)
row_parser.add_argument(
"--labels", "-l", help="Key value pair labels for validation run"
)
row_parser.add_argument(
"--threshold",
"-th",
type=threshold_float,
help="Float max threshold for percent difference",
)
row_parser.add_argument(
renzokuken marked this conversation as resolved.
Show resolved Hide resolved
"--grouped-columns",
"-gc",
help="Comma separated list of columns to use in GroupBy 'col_a,col_b'",
)
row_parser.add_argument(
"--filters",
"-filters",
help="Filters in the format source_filter:target_filter",
)
row_parser.add_argument(
"--use-random-row",
"-rr",
action="store_true",
help="Finds a set of random rows of the first primary key supplied.",
)
row_parser.add_argument(
"--random-row-batch-size",
"-rbs",
help="Row batch size used for random row filters (default 10,000).",
)


def _configure_column_parser(column_parser):
"""Configure arguments to run column level validations."""
_add_common_arguments(column_parser)
Expand Down Expand Up @@ -414,6 +441,26 @@ def _configure_column_parser(column_parser):
"-max",
help="Comma separated list of columns for max 'col_a,col_b' or * for all columns",
)
column_parser.add_argument(
"--hash",
"-hash",
help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns",
)
column_parser.add_argument(
"--bit_xor",
"-bit_xor",
help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns",
)
column_parser.add_argument(
"--comparison-fields",
"-comp-fields",
help="list of fields to perform exact comparisons to. Use column aliases if this is calculated.",
)
column_parser.add_argument(
"--calculated-fields",
"-calc-fields",
help="list of calculated fields to generate.",
)
column_parser.add_argument(
"--grouped-columns",
"-gc",
Expand Down Expand Up @@ -665,6 +712,9 @@ def get_arg_list(arg_value, default_value=None):
return default_value

try:
if isinstance(arg_value, list):
arg_value = str(arg_value)
# arg_value = "hash_all"
arg_list = json.loads(arg_value)
except json.decoder.JSONDecodeError:
arg_list = arg_value.split(",")
Expand Down
Loading