Skip to content

Commit

Permalink
feat: adding scaffold for concatenate as a cli operation (#566)
Browse files Browse the repository at this point in the history
* adding scaffold for cocnatenate as a cli operation

* debugging diff

* stash

* changing condition for adding dependent aliases

* removing commented out lines

* adding argument to column parser

* updating per review

* Update README.md

Co-authored-by: Neha Nene <[email protected]>

* updating check for args

* linting

* footnote on readme

Co-authored-by: Neha Nene <[email protected]>
  • Loading branch information
renzokuken and nehanene15 committed Oct 6, 2022
1 parent c76311c commit ec4ef33
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 8 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ data-validation (--verbose or -v) (--log-level or -ll) validate row
Comma separated list of columns to compare. Can either be a physical column or an alias
See: *Calculated Fields* section for details
--hash COLUMNS Comma separated list of columns to hash or * for all columns
--concat COLUMNS Comma separated list of columns to concatenate or * for all columns (use if a common hash function is not available between databases)
[--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE]
BigQuery destination for validation results. Defaults to stdout.
See: *Validation Reports* section
Expand Down
23 changes: 20 additions & 3 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,22 @@ def get_calculated_config(args, config_manager):
col_list = None if args.hash == "*" else cli_tools.get_arg_list(args.hash)
fields = config_manager._build_dependent_aliases("hash", col_list)
aliases = [field["name"] for field in fields]

# config_manager.append_dependent_aliases(aliases)
# Add to list of necessary columns for selective hashing in order to drop
# excess columns with invalid data types (i.e structs) when generating source/target DFs
if col_list:
config_manager.append_dependent_aliases(col_list)
config_manager.append_dependent_aliases(aliases)
elif args.concat:
col_list = None if args.concat == "*" else cli_tools.get_arg_list(args.concat)
fields = config_manager._build_dependent_aliases("concat", col_list)
aliases = [field["name"] for field in fields]
# config_manager.append_dependent_aliases(aliases)
# Add to list of necessary columns for selective concatenation in order to drop
# excess columns with invalid data types (i.e structs) when generating source/target DFs
if col_list:
config_manager.append_dependent_aliases(col_list)
config_manager.append_dependent_aliases(aliases)

if len(fields) > 0:
max_depth = max([x["depth"] for x in fields])
Expand All @@ -158,6 +168,12 @@ def get_calculated_config(args, config_manager):
["hash__all"], depth=max_depth
)
)
elif args.concat:
config_manager.append_comparison_fields(
config_manager.build_config_comparison_fields(
["concat__all"], depth=max_depth
)
)
return calculated_configs


Expand All @@ -177,22 +193,23 @@ def build_config_from_args(args, config_manager):
config_manager.build_column_configs(grouped_columns)
)
elif config_manager.validation_type == consts.ROW_VALIDATION:
calc_type = args.hash or args.concat
if args.comparison_fields is not None:
comparison_fields = cli_tools.get_arg_list(
args.comparison_fields, default_value=[]
)
config_manager.append_comparison_fields(
config_manager.build_config_comparison_fields(comparison_fields)
)
if args.hash != "*":
if calc_type is not None and calc_type != "*":
config_manager.append_dependent_aliases(comparison_fields)

if args.primary_keys is not None:
primary_keys = cli_tools.get_arg_list(args.primary_keys)
config_manager.append_primary_keys(
config_manager.build_column_configs(primary_keys)
)
if args.hash != "*":
if calc_type is not None and calc_type != "*":
config_manager.append_dependent_aliases(primary_keys)

if config_manager.validation_type == consts.CUSTOM_QUERY:
Expand Down
10 changes: 10 additions & 0 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,11 @@ def _configure_row_parser(row_parser):
"-hash",
help="Comma separated list of columns for hash 'col_a,col_b' or * for all columns",
)
row_parser.add_argument(
"--concat",
"-concat",
help="Comma separated list of columns for concat 'col_a,col_b' or * for all columns",
)
row_parser.add_argument(
"--comparison-fields",
"-comp-fields",
Expand Down Expand Up @@ -410,6 +415,11 @@ def _configure_column_parser(column_parser):
"-hash",
help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns",
)
column_parser.add_argument(
"--concat",
"-concat",
help="Comma separated list of columns for a concatenate operation 'col_a,col_b' or * for all columns",
)
column_parser.add_argument(
"--bit_xor",
"-bit_xor",
Expand Down
5 changes: 0 additions & 5 deletions data_validation/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,12 @@ def generate_report(
"Expected source and target to have same schema, got "
f"source: {source_names} target: {target_names}"
)

differences_pivot = _calculate_differences(
source, target, join_on_fields, run_metadata.validations, is_value_comparison
)

source_pivot = _pivot_result(
source, join_on_fields, run_metadata.validations, consts.RESULT_TYPE_SOURCE
)

target_pivot = _pivot_result(
target, join_on_fields, run_metadata.validations, consts.RESULT_TYPE_TARGET
)
Expand Down Expand Up @@ -242,7 +239,6 @@ def _pivot_result(result, join_on_fields, validations, result_type):
else all_fields
)
pivots = []

for field in validation_fields:
if field not in validations:
continue
Expand Down Expand Up @@ -284,7 +280,6 @@ def _pivot_result(result, join_on_fields, validations, result_type):
+ join_on_fields
)
)

pivot = functools.reduce(lambda pivot1, pivot2: pivot1.union(pivot2), pivots)
return pivot

Expand Down
8 changes: 8 additions & 0 deletions data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,14 @@ def _build_dependent_aliases(self, calc_type, col_list=None):
"concat",
"hash",
]
if calc_type == "concat":
order_of_operations = [
"cast",
"ifnull",
"rstrip",
"upper",
"concat",
]
column_aliases = {}
col_names = []
for i, calc in enumerate(order_of_operations):
Expand Down

0 comments on commit ec4ef33

Please sign in to comment.