Skip to content

Commit

Permalink
feat: first class support for row level hashing (#345)
Browse files Browse the repository at this point in the history
* adding scaffolding for calc field builder in config manager

* exposing cast via calculated fields. Don't know if we necessarily need this just adding for consistency

* diff check

* config file generating as expected

* expanding cli for row level validations

* splitting out comparison fields from aggregates

* row comparisons operational (sort of)

* re-enabling aggregate validations

* cohabitation of validation types!

* figuring out why unit tests are borked

* continuing field split

* stash before merge

* testing diff

* tests passing

* removing extra print statements

* tests and lint

* adding fail tests

* first round of requested changes

* change requests round two.

* refactor CLI and lint

* swapping out farm fingerprint for sha256 as default

* changes per CR

* fixing text result tests

* adding docs

* hash example

* linting

* think I found the broken test

* fixed tests

* setting default for depth length

* relaxing system test
  • Loading branch information
renzokuken committed Feb 23, 2022
1 parent 310747d commit 3d78ee5
Show file tree
Hide file tree
Showing 15 changed files with 798 additions and 212 deletions.
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,51 @@ sum , min, etc.) is provided, the default aggregation will run.
The [Examples](docs/examples.md) page provides many examples of how a tool can
used to run powerful validations without writing any queries.

#### Row Validations

Below is the command syntax for row validations. In order to run row level
validations you need to pass a `--primary-key` flag which defines what field(s)
the validation will be compared along, as well as a `--comparison-fields` flag
which specifies the values (e.g. columns) whose raw values will be compared
based on the primary key join. Additionally you can use
[Calculated Fields](#calculated-fields) to compare derived values such as string
counts and hashes of multiple columns.

```
data-validation (--verbose or -v) validate row
--source-conn or -sc SOURCE_CONN
Source connection details
See: *Data Source Configurations* section for each data source
--target-conn or -tc TARGET_CONN
Target connection details
See: *Connections* section for each data source
--tables-list or -tbls SOURCE_SCHEMA.SOURCE_TABLE=TARGET_SCHEMA.TARGET_TABLE
Comma separated list of tables in the form schema.table=target_schema.target_table
Target schema name and table name are optional.
i.e 'bigquery-public-data.new_york_citibike.citibike_trips'
[--primary-keys or -pk PRIMARY_KEYS]
Comma separated list of columns to use as primary keys
[--comparison-fields or -fields comparison-fields]
Comma separated list of columns to compare. Can either be a physical column or an alias
See: *Calculated Fields* section for details
[--hash COLUMNS] Comma separated list of columns to perform a hash operation on or * for all columns
[--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE]
BigQuery destination for validation results. Defaults to stdout.
See: *Validation Reports* section
[--service-account or -sa PATH_TO_SA_KEY]
Service account to use for BigQuery result handler output.
[--filters SOURCE_FILTER:TARGET_FILTER]
Colon spearated string values of source and target filters.
If target filter is not provided, the source filter will run on source and target tables.
See: *Filters* section
[--config-file or -c CONFIG_FILE]
YAML Config File Path to be used for storing validations.
[--labels or -l KEY1=VALUE1,KEY2=VALUE2]
Comma-separated key value pair labels for the run.
[--format or -fmt] Format for stdout output. Supported formats are (text, csv, json, table).
Defaults to table.
```

#### Schema Validations

Below is the syntax for schema validations. These can be used to compare column
Expand Down Expand Up @@ -289,6 +334,12 @@ Grouped Columns contain the fields you want your aggregations to be broken out
by, e.g. `SELECT last_updated::DATE, COUNT(*) FROM my.table` will produce a
resultset that breaks down the count of rows per calendar date.

### Comparison Fields

For row validations you need to specify the specific columns that you want to
compare. These values will be compared via a JOIN on their corresponding primary
key and will be evaluated for an exact match.

### Calculated Fields

Sometimes direct comparisons are not feasible between databases due to
Expand Down
92 changes: 67 additions & 25 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# limitations under the License.

import os

import logging
import json
import sys
from yaml import Dumper, dump

from data_validation import (
cli_tools,
Expand All @@ -27,8 +27,9 @@
from data_validation.config_manager import ConfigManager
from data_validation.data_validation import DataValidation

from yaml import dump
import sys

# by default yaml dumps lists as pointers. This disables that feature
Dumper.ignore_aliases = lambda *args: True


def _get_arg_config_file(args):
Expand Down Expand Up @@ -78,30 +79,73 @@ def get_aggregate_config(args, config_manager):
aggregate_configs += config_manager.build_config_column_aggregates(
"max", col_args, consts.NUMERIC_DATA_TYPES
)
if args.bit_xor:
col_args = None if args.bit_xor == "*" else cli_tools.get_arg_list(args.bit_xor)
aggregate_configs += config_manager.build_config_column_aggregates(
"bit_xor", col_args, consts.NUMERIC_DATA_TYPES
)
return aggregate_configs


def get_calculated_config(args, config_manager):
"""Return list of formatted calculated objects.
Args:
config_manager(ConfigManager): Validation config manager instance.
"""
calculated_configs = []
fields = []
if args.hash:
fields = config_manager._build_dependent_aliases("hash")
if len(fields) > 0:
max_depth = max([x["depth"] for x in fields])
else:
max_depth = 0
for field in fields:
calculated_configs.append(
config_manager.build_config_calculated_fields(
field["reference"],
field["calc_type"],
field["name"],
field["depth"],
None,
)
)
if args.hash:
config_manager.append_comparison_fields(
config_manager.build_config_comparison_fields(
["hash__all"], depth=max_depth
)
)
return calculated_configs


def build_config_from_args(args, config_manager):
"""Return config manager object ready to execute.
Args:
config_manager (ConfigManager): Validation config manager instance.
"""
config_manager.append_aggregates(get_aggregate_config(args, config_manager))
if args.primary_keys and not args.grouped_columns:
if not args.grouped_columns and not config_manager.use_random_rows():
logging.warning(
"No Grouped columns or Random Rows specified, ignoring primary keys."
config_manager.append_calculated_fields(get_calculated_config(args, config_manager))
if config_manager.validation_type == consts.COLUMN_VALIDATION:
config_manager.append_aggregates(get_aggregate_config(args, config_manager))
if args.grouped_columns is not None:
grouped_columns = cli_tools.get_arg_list(args.grouped_columns)
config_manager.append_query_groups(
config_manager.build_config_grouped_columns(grouped_columns)
)
if args.grouped_columns:
grouped_columns = cli_tools.get_arg_list(args.grouped_columns)
config_manager.append_query_groups(
config_manager.build_config_grouped_columns(grouped_columns)
)
if args.primary_keys:
primary_keys = cli_tools.get_arg_list(args.primary_keys, default_value=[])
elif config_manager.validation_type == consts.ROW_VALIDATION:
if args.comparison_fields is not None:
comparison_fields = cli_tools.get_arg_list(
args.comparison_fields, default_value=[]
)
config_manager.append_comparison_fields(
config_manager.build_config_comparison_fields(comparison_fields)
)
if args.primary_keys is not None:
primary_keys = cli_tools.get_arg_list(args.primary_keys)
config_manager.append_primary_keys(
config_manager.build_config_grouped_columns(primary_keys)
config_manager.build_config_comparison_fields(primary_keys)
)

# TODO(GH#18): Add query filter config logic
Expand All @@ -118,11 +162,9 @@ def build_config_managers_from_args(args):
if validate_cmd == "Schema":
config_type = consts.SCHEMA_VALIDATION
elif validate_cmd == "Column":
# TODO: We need to discuss how GroupedColumn and Row are differentiated.
if args.grouped_columns:
config_type = consts.GROUPED_COLUMN_VALIDATION
else:
config_type = consts.COLUMN_VALIDATION
config_type = consts.COLUMN_VALIDATION
elif validate_cmd == "Row":
config_type = consts.ROW_VALIDATION
else:
raise ValueError(f"Unknown Validation Type: {validate_cmd}")
else:
Expand All @@ -140,7 +182,7 @@ def build_config_managers_from_args(args):

# Schema validation will not accept filters, labels, or threshold as flags
filter_config, labels, threshold = [], [], 0.0
if config_type != consts.SCHEMA_VALIDATION:
if config_type != consts.COLUMN_VALIDATION:
if args.filters:
filter_config = cli_tools.get_filters(args.filters)
if args.threshold:
Expand Down Expand Up @@ -386,8 +428,8 @@ def run_validation_configs(args):


def validate(args):
"""Run commands related to data validation."""
if args.validate_cmd == "column" or args.validate_cmd == "schema":
""" Run commands related to data validation."""
if args.validate_cmd in ["column", "row", "schema"]:
run(args)
else:
raise ValueError(f"Validation Argument '{args.validate_cmd}' is not supported")
Expand Down
122 changes: 86 additions & 36 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
["port", "Teradata port to connect on"],
["user_name", "User used to connect"],
["password", "Password for supplied user"],
["logmech", "Log on mechanism"],
],
"Oracle": [
["host", "Desired Oracle host"],
Expand Down Expand Up @@ -269,41 +268,6 @@ def _configure_run_parser(subparsers):
"-tbls",
help="Comma separated tables list in the form 'schema.table=target_schema.target_table'",
)
run_parser.add_argument(
"--count",
"-count",
help="Comma separated list of columns for count 'col_a,col_b' or * for all columns",
)
run_parser.add_argument(
"--sum",
"-sum",
help="Comma separated list of columns for sum 'col_a,col_b' or * for all columns",
)
run_parser.add_argument(
"--avg",
"-avg",
help="Comma separated list of columns for avg 'col_a,col_b' or * for all columns",
)
run_parser.add_argument(
"--min",
"-min",
help="Comma separated list of columns for min 'col_a,col_b' or * for all columns",
)
run_parser.add_argument(
"--max",
"-max",
help="Comma separated list of columns for max 'col_a,col_b' or * for all columns",
)
run_parser.add_argument(
"--grouped-columns",
"-gc",
help="Comma separated list of columns to use in GroupBy 'col_a,col_b'",
)
run_parser.add_argument(
"--primary-keys",
"-pk",
help="Comma separated list of primary key columns 'col_a,col_b'",
)
run_parser.add_argument(
"--result-handler-config", "-rc", help="Result handler config details"
)
Expand All @@ -318,6 +282,11 @@ def _configure_run_parser(subparsers):
run_parser.add_argument(
"--labels", "-l", help="Key value pair labels for validation run",
)
run_parser.add_argument(
"--hash",
"-hash",
help="Comma separated list of columns for hash 'col_a,col_b' or * for all columns",
)
run_parser.add_argument(
"--service-account",
"-sa",
Expand Down Expand Up @@ -408,12 +377,70 @@ def _configure_validate_parser(subparsers):
)
_configure_column_parser(column_parser)

row_parser = validate_subparsers.add_parser("row", help="Run a row validation")
_configure_row_parser(row_parser)

schema_parser = validate_subparsers.add_parser(
"schema", help="Run a schema validation"
)
_configure_schema_parser(schema_parser)


def _configure_row_parser(row_parser):
"""Configure arguments to run row level validations."""
_add_common_arguments(row_parser)
row_parser.add_argument(
"--hash",
"-hash",
help="Comma separated list of columns for hash 'col_a,col_b' or * for all columns",
)
row_parser.add_argument(
"--comparison-fields",
"-comp-fields",
help="Individual columns to compare. If comparing a calculated field use the column alias.",
)
row_parser.add_argument(
"--calculated-fields",
"-calc-fields",
help="list of calculated fields to generate.",
)
row_parser.add_argument(
"--primary-keys",
"-pk",
help="Comma separated list of primary key columns 'col_a,col_b'",
)
row_parser.add_argument(
"--labels", "-l", help="Key value pair labels for validation run"
)
row_parser.add_argument(
"--threshold",
"-th",
type=threshold_float,
help="Float max threshold for percent difference",
)
row_parser.add_argument(
"--grouped-columns",
"-gc",
help="Comma separated list of columns to use in GroupBy 'col_a,col_b'",
)
row_parser.add_argument(
"--filters",
"-filters",
help="Filters in the format source_filter:target_filter",
)
row_parser.add_argument(
"--use-random-row",
"-rr",
action="store_true",
help="Finds a set of random rows of the first primary key supplied.",
)
row_parser.add_argument(
"--random-row-batch-size",
"-rbs",
help="Row batch size used for random row filters (default 10,000).",
)


def _configure_column_parser(column_parser):
"""Configure arguments to run column level validations."""
_add_common_arguments(column_parser)
Expand Down Expand Up @@ -442,6 +469,26 @@ def _configure_column_parser(column_parser):
"-max",
help="Comma separated list of columns for max 'col_a,col_b' or * for all columns",
)
column_parser.add_argument(
"--hash",
"-hash",
help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns",
)
column_parser.add_argument(
"--bit_xor",
"-bit_xor",
help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns",
)
column_parser.add_argument(
"--comparison-fields",
"-comp-fields",
help="list of fields to perform exact comparisons to. Use column aliases if this is calculated.",
)
column_parser.add_argument(
"--calculated-fields",
"-calc-fields",
help="list of calculated fields to generate.",
)
column_parser.add_argument(
"--grouped-columns",
"-gc",
Expand Down Expand Up @@ -713,6 +760,9 @@ def get_arg_list(arg_value, default_value=None):
return default_value

try:
if isinstance(arg_value, list):
arg_value = str(arg_value)
# arg_value = "hash_all"
arg_list = json.loads(arg_value)
except json.decoder.JSONDecodeError:
arg_list = arg_value.split(",")
Expand Down
Loading

0 comments on commit 3d78ee5

Please sign in to comment.