Skip to content

Commit

Permalink
feat: Allow aggregation over length of string columns (#430)
Browse files Browse the repository at this point in the history
* feat: treat string values as length(string) for column validation

* fix: round results of AVG() so values are comparable for BQ to Hive

* feat: update tests

* feat: update CLI with flag

* feat: adding tests

* feat: clean up code
  • Loading branch information
nehanene15 committed Apr 7, 2022
1 parent a6cf3f0 commit 201f0a2
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 14 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ grouped column validations a step further by providing the `--primary-key` flag.
With this flag, if a mismatch was found, DVT will dive deeper into the slice
with the error and find the row (primary key value) with the inconsistency.

You can specify a list of string columns for aggregations in order to calculate
an aggregation over the `length(string_col)`. Running an aggregation
over all columns ('*') will only run over numeric columns, unless the
`--wildcard-include-string-len` flag is present.

```
data-validation (--verbose or -v) validate column
--source-conn or -sc SOURCE_CONN
Expand Down Expand Up @@ -120,6 +125,8 @@ data-validation (--verbose or -v) validate column
See: *Validation Reports* section
[--service-account or -sa PATH_TO_SA_KEY]
Service account to use for BigQuery result handler output.
[--wildcard-include-string-len or -wis]
If flag is present, include string columns in aggregation as len(string_col)
[--filters SOURCE_FILTER:TARGET_FILTER]
Colon separated string values of source and target filters.
If target filter is not provided, the source filter will run on source and target tables.
Expand Down
14 changes: 9 additions & 5 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def get_aggregate_config(args, config_manager):
config_manager (ConfigManager): Validation config manager instance.
"""
aggregate_configs = [config_manager.build_config_count_aggregate()]
supported_data_types = ["float64", "int8", "int16", "int32", "int64", "decimal"]

if args.wildcard_include_string_len:
supported_data_types.append("string")

if args.count:
col_args = None if args.count == "*" else cli_tools.get_arg_list(args.count)
Expand All @@ -62,27 +66,27 @@ def get_aggregate_config(args, config_manager):
if args.sum:
col_args = None if args.sum == "*" else cli_tools.get_arg_list(args.sum)
aggregate_configs += config_manager.build_config_column_aggregates(
"sum", col_args, consts.NUMERIC_DATA_TYPES
"sum", col_args, supported_data_types
)
if args.avg:
col_args = None if args.avg == "*" else cli_tools.get_arg_list(args.avg)
aggregate_configs += config_manager.build_config_column_aggregates(
"avg", col_args, consts.NUMERIC_DATA_TYPES
"avg", col_args, supported_data_types
)
if args.min:
col_args = None if args.min == "*" else cli_tools.get_arg_list(args.min)
aggregate_configs += config_manager.build_config_column_aggregates(
"min", col_args, consts.NUMERIC_DATA_TYPES
"min", col_args, supported_data_types
)
if args.max:
col_args = None if args.max == "*" else cli_tools.get_arg_list(args.max)
aggregate_configs += config_manager.build_config_column_aggregates(
"max", col_args, consts.NUMERIC_DATA_TYPES
"max", col_args, supported_data_types
)
if args.bit_xor:
col_args = None if args.bit_xor == "*" else cli_tools.get_arg_list(args.bit_xor)
aggregate_configs += config_manager.build_config_column_aggregates(
"bit_xor", col_args, consts.NUMERIC_DATA_TYPES
"bit_xor", col_args, supported_data_types
)
return aggregate_configs

Expand Down
12 changes: 12 additions & 0 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,12 @@ def _configure_run_parser(subparsers):
"-rbs",
help="Row batch size used for random row filters (default 10,000).",
)
run_parser.add_argument(
"--wildcard-include-string-len",
"-wis",
action="store_true",
help="Include string fields for wildcard aggregations.",
)


def _configure_connection_parser(subparsers):
Expand Down Expand Up @@ -513,6 +519,12 @@ def _configure_column_parser(column_parser):
"-rbs",
help="Row batch size used for random row filters (default 10,000).",
)
column_parser.add_argument(
"--wildcard-include-string-len",
"-wis",
action="store_true",
help="Include string fields for wildcard aggregations.",
)


def _configure_schema_parser(schema_parser):
Expand Down
4 changes: 4 additions & 0 deletions data_validation/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def _calculate_difference(field_differences, datatype, validation, is_value_comp
if isinstance(datatype, ibis.expr.datatypes.Timestamp):
source_value = field_differences["differences_source_value"].epoch_seconds()
target_value = field_differences["differences_target_value"].epoch_seconds()
elif isinstance(datatype, ibis.expr.datatypes.Float64):
# Float64 type results from AVG() aggregation
source_value = field_differences["differences_source_value"].round(digits=4)
target_value = field_differences["differences_target_value"].round(digits=4)
else:
source_value = field_differences["differences_source_value"]
target_value = field_differences["differences_target_value"]
Expand Down
43 changes: 38 additions & 5 deletions data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,30 @@ def build_config_count_aggregate(self):

return aggregate_config

def append_stringlen_calc_field(self, column, agg_type):
""" Append calculated field for length(string) for column validation"""
calculated_config = {
consts.CONFIG_CALCULATED_SOURCE_COLUMNS: [column],
consts.CONFIG_CALCULATED_TARGET_COLUMNS: [column],
consts.CONFIG_FIELD_ALIAS: f"length__{column}",
consts.CONFIG_TYPE: "length",
consts.CONFIG_DEPTH: 0,
}

existing_calc_fields = [
x[consts.CONFIG_FIELD_ALIAS] for x in self.calculated_fields
]
if calculated_config[consts.CONFIG_FIELD_ALIAS] not in existing_calc_fields:
self.append_calculated_fields([calculated_config])

aggregate_config = {
consts.CONFIG_SOURCE_COLUMN: f"length__{column}",
consts.CONFIG_TARGET_COLUMN: f"length__{column}",
consts.CONFIG_FIELD_ALIAS: f"{agg_type}__length__{column}",
consts.CONFIG_TYPE: agg_type,
}
return aggregate_config

def build_config_column_aggregates(self, agg_type, arg_value, supported_types):
"""Return list of aggregate objects of given agg_type."""
aggregate_configs = []
Expand All @@ -453,6 +477,9 @@ def build_config_column_aggregates(self, agg_type, arg_value, supported_types):
casefold_source_columns = {x.casefold(): str(x) for x in source_table.columns}
casefold_target_columns = {x.casefold(): str(x) for x in target_table.columns}

if arg_value and supported_types:
supported_types.append("string")

allowlist_columns = arg_value or casefold_source_columns
for column in casefold_source_columns:
# Get column type and remove precision/scale attributes
Expand All @@ -463,13 +490,19 @@ def build_config_column_aggregates(self, agg_type, arg_value, supported_types):
continue
elif column not in casefold_target_columns:
logging.info(
f"Skipping Agg {agg_type}: {source_table.op().name}.{column}"
f"Skipping {agg_type} on {column} as column is not present in target table"
)
continue
elif supported_types and column_type not in supported_types:
if self.verbose:
msg = f"Skipping Agg {agg_type}: {source_table.op().name}.{column} {column_type}"
print(msg)
logging.info(
f"Skipping {agg_type} on {column} due to data type: {column_type}"
)
continue

if column_type == "string":
aggregate_config = self.append_stringlen_calc_field(column, agg_type)
aggregate_configs.append(aggregate_config)
continue

aggregate_config = {
Expand Down Expand Up @@ -500,12 +533,12 @@ def build_config_calculated_fields(
continue
elif column not in casefold_target_columns:
logging.info(
f"Skipping Calc {calc_type}: {source_table.op().name}.{column} {column_type}"
f"Skipping {calc_type} on {column} as column is not present in target table"
)
continue
elif supported_types and column_type not in supported_types:
if self.verbose:
msg = f"Skipping Calc {calc_type}: {source_table.op().name}.{column} {column_type}"
msg = f"Skipping {calc_type} on {column} due to data type: {column_type}"
print(msg)
continue

Expand Down
3 changes: 0 additions & 3 deletions data_validation/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@
RESULT_TYPE_SOURCE = "source"
RESULT_TYPE_TARGET = "target"

# Ibis Object Info
NUMERIC_DATA_TYPES = ["float64", "int8", "int16", "int32", "int64", "decimal"]

FORMAT_TYPES = ["csv", "json", "table", "text"]

# Text Result Handler column filter list
Expand Down
42 changes: 41 additions & 1 deletion tests/system/data_sources/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,27 @@
"--config-file",
CLI_CONFIG_FILE,
]
EXPECTED_NUM_YAML_LINES = 36 # Expected number of lines for validation config geenrated by CLI_STORE_COLUMN_ARGS
EXPECTED_NUM_YAML_LINES = 47 # Expected number of lines for validation config geenrated by CLI_STORE_COLUMN_ARGS
CLI_RUN_CONFIG_ARGS = ["run-config", "--config-file", CLI_CONFIG_FILE]
CLI_CONFIGS_RUN_ARGS = ["configs", "run", "--config-file", CLI_CONFIG_FILE]

CLI_WILDCARD_STRING_ARGS = [
"validate",
"column",
"--source-conn",
BQ_CONN_NAME,
"--target-conn",
BQ_CONN_NAME,
"--tables-list",
"bigquery-public-data.new_york_citibike.citibike_trips",
"--sum",
"*",
"--wildcard-include-string-len",
"--config-file",
CLI_CONFIG_FILE,
]
EXPECTED_NUM_YAML_LINES_WILDCARD = 112

CLI_FIND_TABLES_ARGS = [
"find-tables",
"--source-conn",
Expand Down Expand Up @@ -331,6 +348,29 @@ def test_cli_store_yaml_then_run_local():
os.environ[consts.ENV_DIRECTORY_VAR] = gcs_path


def test_wildcard_column_agg_yaml():
"""Test storing column validation YAML with string fields."""
# Unset GCS env var so that YAML is saved locally
gcs_path = os.environ[consts.ENV_DIRECTORY_VAR]
os.environ[consts.ENV_DIRECTORY_VAR] = ""

# Store BQ Connection
_store_bq_conn()

# Build validation and store to file
parser = cli_tools.configure_arg_parser()
mock_args = parser.parse_args(CLI_WILDCARD_STRING_ARGS)
main.run(mock_args)

yaml_file_path = CLI_CONFIG_FILE
with open(yaml_file_path, "r") as yaml_file:
assert len(yaml_file.readlines()) == EXPECTED_NUM_YAML_LINES_WILDCARD

os.remove(yaml_file_path)
# Re-set GCS env var
os.environ[consts.ENV_DIRECTORY_VAR] = gcs_path


def test_cli_find_tables():
_store_bq_conn()

Expand Down

0 comments on commit 201f0a2

Please sign in to comment.