Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: first class support for row level hashing #345

Merged
merged 33 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
570afed
adding scaffolding for calc field builder in config manager
renzokuken Nov 17, 2021
ef3f916
exposing cast via calculated fields. Don't know if we necessarily nee…
renzokuken Nov 24, 2021
c9c212f
diff check
renzokuken Dec 22, 2021
25aed20
config file generating as expected
renzokuken Dec 28, 2021
6140822
Merge branch 'develop' into issue300-make-hashing-user-friendly
renzokuken Dec 28, 2021
ea7f8e7
expanding cli for row level validations
renzokuken Jan 5, 2022
0358193
splitting out comparison fields from aggregates
renzokuken Jan 6, 2022
abd82af
row comparisons operational (sort of)
renzokuken Jan 6, 2022
e09dcc7
re-enabling aggregate validations
renzokuken Jan 7, 2022
2852b15
cohabitation of validation types!
renzokuken Jan 7, 2022
fb372f6
merge develop
renzokuken Jan 7, 2022
8695edb
figuring out why unit tests are borked
renzokuken Jan 11, 2022
f67e1c2
continuing field split
renzokuken Jan 13, 2022
6c3a919
stash before merge
renzokuken Jan 13, 2022
81b97bb
testing diff
renzokuken Jan 20, 2022
4647bae
tests passing
renzokuken Jan 24, 2022
8ed194b
removing extra print statements
renzokuken Jan 27, 2022
a7afbbc
tests and lint
renzokuken Jan 28, 2022
f3e5be8
adding fail tests
renzokuken Jan 29, 2022
312add8
first round of requested changes
renzokuken Feb 1, 2022
2901fbb
change requests round two.
renzokuken Feb 1, 2022
7e9644f
refactor CLI and lint
renzokuken Feb 3, 2022
7581cfc
swapping out farm fingerprint for sha256 as default
renzokuken Feb 10, 2022
1e0deb6
changes per CR
renzokuken Feb 11, 2022
2949597
fixing text result tests
renzokuken Feb 14, 2022
af887ff
adding docs
renzokuken Feb 16, 2022
6b638bc
hash example
renzokuken Feb 22, 2022
efd2966
linting
renzokuken Feb 22, 2022
8c20e31
think I found the broken test
renzokuken Feb 22, 2022
daa26be
merge conflict
renzokuken Feb 23, 2022
59b76ba
fixed tests
renzokuken Feb 23, 2022
bcc7990
setting default for depth length
renzokuken Feb 23, 2022
0feb788
relaxing system test
renzokuken Feb 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 51 additions & 12 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from data_validation.config_manager import ConfigManager
from data_validation.data_validation import DataValidation

Dumper.ignore_aliases = lambda *args : True


def _get_arg_config_file(args):
"""Return String yaml config file path."""
Expand Down Expand Up @@ -78,32 +80,67 @@ def get_aggregate_config(args, config_manager):
aggregate_configs += config_manager.build_config_column_aggregates(
"max", col_args, consts.NUMERIC_DATA_TYPES
)
if args.bit_xor:
col_args = None if args.bit_xor == "*" else cli_tools.get_arg_list(args.bit_xor)
aggregate_configs += config_manager.build_config_column_aggregates(
"bit_xor", col_args, consts.NUMERIC_DATA_TYPES
)
return aggregate_configs

def get_calculated_config(args, config_manager):
"""Return list of formatted calculated objects.

Args:
config_manager(ConfigManager): Validation config manager instance.
"""
calculated_configs = []
fields = []
if hasattr(args, 'hash'):
col_args = None if args.hash == "*" else cli_tools.get_arg_list(args.hash)
fields = config_manager._build_dependent_aliases("hash")
config_manager.append_comparison_fields(
config_manager.build_config_comparison_fields(["hash__all"])
)
for field in fields:
calculated_configs.append(config_manager.build_config_calculated_fields(
field['reference'],
field['calc_type'],
field['name'],
field['depth'],
None))
return calculated_configs


def build_config_from_args(args, config_manager):
"""Return config manager object ready to execute.

Args:
config_manager (ConfigManager): Validation config manager instance.
"""
config_manager.append_aggregates(get_aggregate_config(args, config_manager))
if args.primary_keys and not args.grouped_columns:
if not args.grouped_columns and not config_manager.use_random_rows():
logging.warning(
"No Grouped columns or Random Rows specified, ignoring primary keys."
)
if args.grouped_columns:
config_manager.append_calculated_fields(get_calculated_config(args, config_manager))
if config_manager.validation_type == consts.ROW_VALIDATION:
pass
else:
config_manager.append_aggregates(get_aggregate_config(args, config_manager))
if config_manager.primary_keys and not config_manager.query_groups:
raise ValueError(
"Grouped columns must be specified for primary key level validation."
)
if hasattr(args, 'grouped_columns'):
grouped_columns = cli_tools.get_arg_list(args.grouped_columns)
config_manager.append_query_groups(
config_manager.build_config_grouped_columns(grouped_columns)
)
if args.primary_keys:
if hasattr(args, 'primary_keys'):
primary_keys = cli_tools.get_arg_list(args.primary_keys, default_value=[])
config_manager.append_primary_keys(
config_manager.build_config_grouped_columns(primary_keys)
config_manager.build_config_comparison_fields(primary_keys)
)
if hasattr(args, 'comparison_fields'):
comparison_fields = cli_tools.get_arg_list(args.comparison_fields, default_value=[])
config_manager.append_comparison_fields(
config_manager.build_config_comparison_fields(comparison_fields)
)

# TODO(GH#18): Add query filter config logic

return config_manager
Expand All @@ -123,6 +160,8 @@ def build_config_managers_from_args(args):
config_type = consts.GROUPED_COLUMN_VALIDATION
else:
config_type = consts.COLUMN_VALIDATION
elif validate_cmd == "Row":
config_type = consts.ROW_VALIDATION
else:
raise ValueError(f"Unknown Validation Type: {validate_cmd}")
else:
Expand All @@ -140,7 +179,7 @@ def build_config_managers_from_args(args):

# Schema validation will not accept filters, labels, or threshold as flags
filter_config, labels, threshold = [], [], 0.0
if config_type != consts.SCHEMA_VALIDATION:
if config_type == consts.COLUMN_VALIDATION:
renzokuken marked this conversation as resolved.
Show resolved Hide resolved
if args.filters:
filter_config = cli_tools.get_filters(args.filters)
if args.threshold:
Expand Down Expand Up @@ -369,7 +408,7 @@ def run_connections(args):

def validate(args):
""" Run commands related to data validation."""
if args.validate_cmd == "column" or args.validate_cmd == "schema":
if args.validate_cmd in ["column", "row", "schema"]:
run(args)
else:
raise ValueError(f"Validation Argument '{args.validate_cmd}' is not supported")
Expand Down
77 changes: 76 additions & 1 deletion data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
["port", "Teradata port to connect on"],
["user_name", "User used to connect"],
["password", "Password for supplied user"],
["logmech", "Log on mechanism"],
],
"Oracle": [
["host", "Desired Oracle host"],
Expand Down Expand Up @@ -266,6 +265,16 @@ def _configure_run_parser(subparsers):
"-max",
help="Comma separated list of columns for max 'col_a,col_b' or * for all columns",
)
run_parser.add_argument(
renzokuken marked this conversation as resolved.
Show resolved Hide resolved
"--hash",
"-hash",
help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns",
)
run_parser.add_argument(
"--fields",
"-fields",
help="Individual columns to compare. If comparing a calculated field use the column alias."
)
run_parser.add_argument(
"--grouped-columns",
"-gc",
Expand Down Expand Up @@ -380,11 +389,59 @@ def _configure_validate_parser(subparsers):
)
_configure_column_parser(column_parser)

row_parser = validate_subparsers.add_parser(
"row", help="Run a row validation"
)
_configure_row_parser(row_parser)

schema_parser = validate_subparsers.add_parser(
"schema", help="Run a schema validation"
)
_configure_schema_parser(schema_parser)

def _configure_row_parser(row_parser):
"""Configure arguments to run row level validations."""
_add_common_arguments(row_parser)
row_parser.add_argument(
"--hash",
"-hash",
help="Comma separated list of columns for hash 'col_a,col_b' or * for all columns",
)
row_parser.add_argument(
"--fields",
"-fields",
help="Individual columns to compare. If comparing a calculated field use the column alias."
)
row_parser.add_argument(
"--primary-keys",
"-pk",
help="Comma separated list of primary key columns 'col_a,col_b'",
)
row_parser.add_argument(
"--labels", "-l", help="Key value pair labels for validation run"
)
row_parser.add_argument(
"--threshold",
"-th",
type=threshold_float,
help="Float max threshold for percent difference",
)
row_parser.add_argument(
renzokuken marked this conversation as resolved.
Show resolved Hide resolved
"--filters",
"-filters",
help="Filters in the format source_filter:target_filter",
)
row_parser.add_argument(
"--use-random-row",
"-rr",
action="store_true",
help="Finds a set of random rows of the first primary key supplied.",
)
row_parser.add_argument(
"--random-row-batch-size",
"-rbs",
help="Row batch size used for random row filters (default 10,000).",
)

def _configure_column_parser(column_parser):
"""Configure arguments to run column level validations."""
Expand Down Expand Up @@ -414,6 +471,21 @@ def _configure_column_parser(column_parser):
"-max",
help="Comma separated list of columns for max 'col_a,col_b' or * for all columns",
)
column_parser.add_argument(
"--hash",
"-hash",
help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns"
)
column_parser.add_argument(
"--bit_xor",
"-bit_xor",
help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns"
)
column_parser.add_argument(
"--fields",
"-fields",
help="list of fields to perform exact comparisons to. Use column aliases if this is calculated."
)
column_parser.add_argument(
"--grouped-columns",
"-gc",
Expand Down Expand Up @@ -665,6 +737,9 @@ def get_arg_list(arg_value, default_value=None):
return default_value

try:
if isinstance (arg_value, list):
arg_value = str(arg_value)
# arg_value = "hash_all"
arg_list = json.loads(arg_value)
except json.decoder.JSONDecodeError:
arg_list = arg_value.split(",")
Expand Down
90 changes: 45 additions & 45 deletions data_validation/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import ibis
import ibis.expr.datatypes
import numpy

from data_validation import consts

Expand Down Expand Up @@ -52,7 +53,7 @@ def generate_report(
join_on_fields (Sequence[str]):
A collection of column names to use to join source and target.
These are the columns that both the source and target queries
grouped by.
are grouped by.
is_value_comparison (boolean): Boolean representing if source and
target agg values should be compared with 'equals to' rather than
a 'difference' comparison.
Expand All @@ -72,7 +73,6 @@ def generate_report(
"Expected source and target to have same schema, got "
f"source: {source_names} target: {target_names}"
)

differences_pivot = _calculate_differences(
source, target, join_on_fields, run_metadata.validations, is_value_comparison
)
Expand All @@ -98,11 +98,11 @@ def _calculate_difference(field_differences, datatype, validation, is_value_comp
pct_threshold = ibis.literal(validation.threshold)

if isinstance(datatype, ibis.expr.datatypes.Timestamp):
source_value = field_differences["differences_source_agg_value"].epoch_seconds()
target_value = field_differences["differences_target_agg_value"].epoch_seconds()
source_value = field_differences["differences_source_value"].epoch_seconds()
target_value = field_differences["differences_target_value"].epoch_seconds()
else:
source_value = field_differences["differences_source_agg_value"]
target_value = field_differences["differences_target_agg_value"]
source_value = field_differences["differences_source_value"]
target_value = field_differences["differences_target_value"]

# Does not calculate difference between agg values for row hash due to int64 overflow
if is_value_comparison:
Expand Down Expand Up @@ -161,30 +161,27 @@ def _calculate_differences(
differences_joined = source.cross_join(target)

differences_pivots = []

for field, field_type in schema.items():
if field not in validation_fields:
if field not in validations.keys():
renzokuken marked this conversation as resolved.
Show resolved Hide resolved
continue

validation = validations[field]

field_differences = differences_joined.projection(
[
source[field].name("differences_source_agg_value"),
target[field].name("differences_target_agg_value"),
]
+ [source[join_field] for join_field in join_on_fields]
)
differences_pivots.append(
field_differences[
(ibis.literal(field).name("validation_name"),)
+ join_on_fields
+ _calculate_difference(
else:
validation = validations[field]
field_differences = differences_joined.projection(
[
source[field].name("differences_source_value"),
target[field].name("differences_target_value"),
]
+ [source[join_field] for join_field in join_on_fields]
)
differences_pivots.append(
field_differences[
(ibis.literal(field).name("validation_name"),)
+ join_on_fields
+ _calculate_difference(
field_differences, field_type, validation, is_value_comparison
)
]
)

)
]
)
differences_pivot = functools.reduce(
lambda pivot1, pivot2: pivot1.union(pivot2), differences_pivots
)
Expand All @@ -197,26 +194,29 @@ def _pivot_result(result, join_on_fields, validations, result_type):
pivots = []

for field in validation_fields:
validation = validations[field]
pivots.append(
result.projection(
(
ibis.literal(field).name("validation_name"),
ibis.literal(validation.validation_type).name("validation_type"),
ibis.literal(validation.aggregation_type).name("aggregation_type"),
ibis.literal(validation.get_table_name(result_type)).name(
"table_name"
),
# Cast to string to ensure types match, even when column
# name is NULL (such as for count aggregations).
ibis.literal(validation.get_column_name(result_type))
.cast("string")
.name("column_name"),
result[field].cast("string").name("agg_value"),
if field not in validations.keys():
continue
else:
validation = validations[field]
pivots.append(
result.projection(
(
ibis.literal(field).name("validation_name"),
ibis.literal(validation.validation_type).name("validation_type"),
ibis.literal(validation.aggregation_type).name("aggregation_type"),
ibis.literal(validation.get_table_name(result_type)).name(
"table_name"
),
# Cast to string to ensure types match, even when column
# name is NULL (such as for count aggregations).
ibis.literal(validation.get_column_name(result_type))
.cast("string")
.name("column_name"),
result[field].cast("string").name("agg_value"),
)
+ join_on_fields
)
+ join_on_fields
)
)
pivot = functools.reduce(lambda pivot1, pivot2: pivot1.union(pivot2), pivots)
return pivot

Expand Down
Loading