Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding scaffold for concatenate as a cli operation #566

Merged
merged 14 commits into from
Oct 6, 2022
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ data-validation (--verbose or -v) (--log-level or -ll) validate row
Comma separated list of columns to compare. Can either be a physical column or an alias
See: *Calculated Fields* section for details
--hash COLUMNS Comma separated list of columns to hash or * for all columns
--concat COLUMNS Comma separated list of columns to concatenate or * for all columns
[--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE]
BigQuery destination for validation results. Defaults to stdout.
See: *Validation Reports* section
Expand Down
23 changes: 20 additions & 3 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,22 @@ def get_calculated_config(args, config_manager):
col_list = None if args.hash == "*" else cli_tools.get_arg_list(args.hash)
fields = config_manager._build_dependent_aliases("hash", col_list)
aliases = [field["name"] for field in fields]

# config_manager.append_dependent_aliases(aliases)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove comment

# Add to list of necessary columns for selective hashing in order to drop
# excess columns with invalid data types (i.e structs) when generating source/target DFs
if col_list:
nehanene15 marked this conversation as resolved.
Show resolved Hide resolved
config_manager.append_dependent_aliases(col_list)
config_manager.append_dependent_aliases(aliases)
elif args.concat:
col_list = None if args.concat == "*" else cli_tools.get_arg_list(args.concat)
fields = config_manager._build_dependent_aliases("concat", col_list)
aliases = [field["name"] for field in fields]
# config_manager.append_dependent_aliases(aliases)
# Add to list of necessary columns for selective concatenation in order to drop
# excess columns with invalid data types (i.e structs) when generating source/target DFs
if col_list:
config_manager.append_dependent_aliases(col_list)
config_manager.append_dependent_aliases(aliases)

if len(fields) > 0:
max_depth = max([x["depth"] for x in fields])
Expand All @@ -158,6 +168,12 @@ def get_calculated_config(args, config_manager):
["hash__all"], depth=max_depth
)
)
elif args.concat:
config_manager.append_comparison_fields(
config_manager.build_config_comparison_fields(
["concat__all"], depth=max_depth
)
)
return calculated_configs


Expand All @@ -177,22 +193,23 @@ def build_config_from_args(args, config_manager):
config_manager.build_column_configs(grouped_columns)
)
elif config_manager.validation_type == consts.ROW_VALIDATION:
calc_type = args.hash or args.concat
if args.comparison_fields is not None:
comparison_fields = cli_tools.get_arg_list(
args.comparison_fields, default_value=[]
)
config_manager.append_comparison_fields(
config_manager.build_config_comparison_fields(comparison_fields)
)
if args.hash != "*":
if calc_type is not None and calc_type != "*":
config_manager.append_dependent_aliases(comparison_fields)

if args.primary_keys is not None:
primary_keys = cli_tools.get_arg_list(args.primary_keys)
config_manager.append_primary_keys(
config_manager.build_column_configs(primary_keys)
)
if args.hash != "*":
nehanene15 marked this conversation as resolved.
Show resolved Hide resolved
if calc_type is not None and calc_type != "*":
config_manager.append_dependent_aliases(primary_keys)

if config_manager.validation_type == consts.CUSTOM_QUERY:
Expand Down
10 changes: 10 additions & 0 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,11 @@ def _configure_row_parser(row_parser):
"-hash",
help="Comma separated list of columns for hash 'col_a,col_b' or * for all columns",
)
row_parser.add_argument(
renzokuken marked this conversation as resolved.
Show resolved Hide resolved
"--concat",
"-concat",
help="Comma separated list of columns for concat 'col_a,col_b' or * for all columns",
)
row_parser.add_argument(
"--comparison-fields",
"-comp-fields",
Expand Down Expand Up @@ -410,6 +415,11 @@ def _configure_column_parser(column_parser):
"-hash",
help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns",
)
column_parser.add_argument(
"--concat",
"-concat",
help="Comma separated list of columns for a concatenate operation 'col_a,col_b' or * for all columns",
)
column_parser.add_argument(
"--bit_xor",
"-bit_xor",
Expand Down
5 changes: 0 additions & 5 deletions data_validation/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,12 @@ def generate_report(
"Expected source and target to have same schema, got "
f"source: {source_names} target: {target_names}"
)

differences_pivot = _calculate_differences(
source, target, join_on_fields, run_metadata.validations, is_value_comparison
)

source_pivot = _pivot_result(
source, join_on_fields, run_metadata.validations, consts.RESULT_TYPE_SOURCE
)

target_pivot = _pivot_result(
target, join_on_fields, run_metadata.validations, consts.RESULT_TYPE_TARGET
)
Expand Down Expand Up @@ -242,7 +239,6 @@ def _pivot_result(result, join_on_fields, validations, result_type):
else all_fields
)
pivots = []

for field in validation_fields:
if field not in validations:
continue
Expand Down Expand Up @@ -284,7 +280,6 @@ def _pivot_result(result, join_on_fields, validations, result_type):
+ join_on_fields
)
)

pivot = functools.reduce(lambda pivot1, pivot2: pivot1.union(pivot2), pivots)
return pivot

Expand Down
8 changes: 8 additions & 0 deletions data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,14 @@ def _build_dependent_aliases(self, calc_type, col_list=None):
"concat",
"hash",
]
if calc_type == "concat":
renzokuken marked this conversation as resolved.
Show resolved Hide resolved
order_of_operations = [
"cast",
"ifnull",
"rstrip",
"upper",
"concat",
]
column_aliases = {}
col_names = []
for i, calc in enumerate(order_of_operations):
Expand Down