Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow aggregation over length of string columns #430

Merged
merged 7 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ grouped column validations a step further by providing the `--primary-key` flag.
With this flag, if a mismatch was found, DVT will dive deeper into the slice
with the error and find the row (primary key value) with the inconsistency.

You can specify a list of string columns for aggregations in order to calculate
an aggregation over the `length(string_col)`. Running an aggregation
over all columns ('*') will only run over numeric columns, unless the
`--wildcard-include-string-len` flag is present.

```
data-validation (--verbose or -v) validate column
--source-conn or -sc SOURCE_CONN
Expand Down Expand Up @@ -120,6 +125,8 @@ data-validation (--verbose or -v) validate column
See: *Validation Reports* section
[--service-account or -sa PATH_TO_SA_KEY]
Service account to use for BigQuery result handler output.
[--wildcard-include-string-len or -wis]
If flag is present, include string columns in aggregation as len(string_col)
[--filters SOURCE_FILTER:TARGET_FILTER]
Colon separated string values of source and target filters.
If target filter is not provided, the source filter will run on source and target tables.
Expand Down
14 changes: 9 additions & 5 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def get_aggregate_config(args, config_manager):
config_manager (ConfigManager): Validation config manager instance.
"""
aggregate_configs = [config_manager.build_config_count_aggregate()]
supported_data_types = ["float64", "int8", "int16", "int32", "int64", "decimal"]

if args.wildcard_include_string_len:
supported_data_types.append("string")

if args.count:
col_args = None if args.count == "*" else cli_tools.get_arg_list(args.count)
Expand All @@ -62,27 +66,27 @@ def get_aggregate_config(args, config_manager):
if args.sum:
col_args = None if args.sum == "*" else cli_tools.get_arg_list(args.sum)
aggregate_configs += config_manager.build_config_column_aggregates(
"sum", col_args, consts.NUMERIC_DATA_TYPES
"sum", col_args, supported_data_types
)
if args.avg:
col_args = None if args.avg == "*" else cli_tools.get_arg_list(args.avg)
aggregate_configs += config_manager.build_config_column_aggregates(
"avg", col_args, consts.NUMERIC_DATA_TYPES
"avg", col_args, supported_data_types
)
if args.min:
col_args = None if args.min == "*" else cli_tools.get_arg_list(args.min)
aggregate_configs += config_manager.build_config_column_aggregates(
"min", col_args, consts.NUMERIC_DATA_TYPES
"min", col_args, supported_data_types
)
if args.max:
col_args = None if args.max == "*" else cli_tools.get_arg_list(args.max)
aggregate_configs += config_manager.build_config_column_aggregates(
"max", col_args, consts.NUMERIC_DATA_TYPES
"max", col_args, supported_data_types
)
if args.bit_xor:
col_args = None if args.bit_xor == "*" else cli_tools.get_arg_list(args.bit_xor)
aggregate_configs += config_manager.build_config_column_aggregates(
"bit_xor", col_args, consts.NUMERIC_DATA_TYPES
"bit_xor", col_args, supported_data_types
)
return aggregate_configs

Expand Down
12 changes: 12 additions & 0 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,12 @@ def _configure_run_parser(subparsers):
"-rbs",
help="Row batch size used for random row filters (default 10,000).",
)
run_parser.add_argument(
"--wildcard-include-string-len",
"-wis",
action="store_true",
help="Include string fields for wildcard aggregations.",
)


def _configure_connection_parser(subparsers):
Expand Down Expand Up @@ -513,6 +519,12 @@ def _configure_column_parser(column_parser):
"-rbs",
help="Row batch size used for random row filters (default 10,000).",
)
column_parser.add_argument(
"--wildcard-include-string-len",
"-wis",
action="store_true",
help="Include string fields for wildcard aggregations.",
)


def _configure_schema_parser(schema_parser):
Expand Down
4 changes: 4 additions & 0 deletions data_validation/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def _calculate_difference(field_differences, datatype, validation, is_value_comp
if isinstance(datatype, ibis.expr.datatypes.Timestamp):
source_value = field_differences["differences_source_value"].epoch_seconds()
target_value = field_differences["differences_target_value"].epoch_seconds()
elif isinstance(datatype, ibis.expr.datatypes.Float64):
# Float64 type results from AVG() aggregation
source_value = field_differences["differences_source_value"].round(digits=4)
target_value = field_differences["differences_target_value"].round(digits=4)
else:
source_value = field_differences["differences_source_value"]
target_value = field_differences["differences_target_value"]
Expand Down
43 changes: 38 additions & 5 deletions data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,30 @@ def build_config_count_aggregate(self):

return aggregate_config

def append_stringlen_calc_field(self, column, agg_type):
""" Append calculated field for length(string) for column validation"""
calculated_config = {
consts.CONFIG_CALCULATED_SOURCE_COLUMNS: [column],
consts.CONFIG_CALCULATED_TARGET_COLUMNS: [column],
consts.CONFIG_FIELD_ALIAS: f"length__{column}",
consts.CONFIG_TYPE: "length",
consts.CONFIG_DEPTH: 0,
}

existing_calc_fields = [
x[consts.CONFIG_FIELD_ALIAS] for x in self.calculated_fields
]
if calculated_config[consts.CONFIG_FIELD_ALIAS] not in existing_calc_fields:
self.append_calculated_fields([calculated_config])

aggregate_config = {
consts.CONFIG_SOURCE_COLUMN: f"length__{column}",
consts.CONFIG_TARGET_COLUMN: f"length__{column}",
consts.CONFIG_FIELD_ALIAS: f"{agg_type}__length__{column}",
consts.CONFIG_TYPE: agg_type,
}
return aggregate_config

def build_config_column_aggregates(self, agg_type, arg_value, supported_types):
"""Return list of aggregate objects of given agg_type."""
aggregate_configs = []
Expand All @@ -453,6 +477,9 @@ def build_config_column_aggregates(self, agg_type, arg_value, supported_types):
casefold_source_columns = {x.casefold(): str(x) for x in source_table.columns}
casefold_target_columns = {x.casefold(): str(x) for x in target_table.columns}

if arg_value and supported_types:
supported_types.append("string")

allowlist_columns = arg_value or casefold_source_columns
for column in casefold_source_columns:
# Get column type and remove precision/scale attributes
Expand All @@ -463,13 +490,19 @@ def build_config_column_aggregates(self, agg_type, arg_value, supported_types):
continue
elif column not in casefold_target_columns:
logging.info(
f"Skipping Agg {agg_type}: {source_table.op().name}.{column}"
f"Skipping {agg_type} on {column} as column is not present in target table"
)
continue
elif supported_types and column_type not in supported_types:
if self.verbose:
msg = f"Skipping Agg {agg_type}: {source_table.op().name}.{column} {column_type}"
print(msg)
logging.info(
f"Skipping {agg_type} on {column} due to data type: {column_type}"
)
continue

if column_type == "string":
aggregate_config = self.append_stringlen_calc_field(column, agg_type)
aggregate_configs.append(aggregate_config)
continue

aggregate_config = {
Expand Down Expand Up @@ -500,12 +533,12 @@ def build_config_calculated_fields(
continue
elif column not in casefold_target_columns:
logging.info(
f"Skipping Calc {calc_type}: {source_table.op().name}.{column} {column_type}"
f"Skipping {calc_type} on {column} as column is not present in target table"
)
continue
elif supported_types and column_type not in supported_types:
if self.verbose:
msg = f"Skipping Calc {calc_type}: {source_table.op().name}.{column} {column_type}"
msg = f"Skipping {calc_type} on {column} due to data type: {column_type}"
print(msg)
continue

Expand Down
3 changes: 0 additions & 3 deletions data_validation/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@
RESULT_TYPE_SOURCE = "source"
RESULT_TYPE_TARGET = "target"

# Ibis Object Info
NUMERIC_DATA_TYPES = ["float64", "int8", "int16", "int32", "int64", "decimal"]

FORMAT_TYPES = ["csv", "json", "table", "text"]

# Text Result Handler column filter list
Expand Down
42 changes: 41 additions & 1 deletion tests/system/data_sources/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,27 @@
"--config-file",
CLI_CONFIG_FILE,
]
EXPECTED_NUM_YAML_LINES = 36 # Expected number of lines for validation config geenrated by CLI_STORE_COLUMN_ARGS
EXPECTED_NUM_YAML_LINES = 47 # Expected number of lines for validation config geenrated by CLI_STORE_COLUMN_ARGS
CLI_RUN_CONFIG_ARGS = ["run-config", "--config-file", CLI_CONFIG_FILE]
CLI_CONFIGS_RUN_ARGS = ["configs", "run", "--config-file", CLI_CONFIG_FILE]

CLI_WILDCARD_STRING_ARGS = [
"validate",
"column",
"--source-conn",
BQ_CONN_NAME,
"--target-conn",
BQ_CONN_NAME,
"--tables-list",
"bigquery-public-data.new_york_citibike.citibike_trips",
"--sum",
"*",
"--wildcard-include-string-len",
"--config-file",
CLI_CONFIG_FILE,
]
EXPECTED_NUM_YAML_LINES_WILDCARD = 112

CLI_FIND_TABLES_ARGS = [
"find-tables",
"--source-conn",
Expand Down Expand Up @@ -331,6 +348,29 @@ def test_cli_store_yaml_then_run_local():
os.environ[consts.ENV_DIRECTORY_VAR] = gcs_path


def test_wildcard_column_agg_yaml():
"""Test storing column validation YAML with string fields."""
# Unset GCS env var so that YAML is saved locally
gcs_path = os.environ[consts.ENV_DIRECTORY_VAR]
os.environ[consts.ENV_DIRECTORY_VAR] = ""

# Store BQ Connection
_store_bq_conn()

# Build validation and store to file
parser = cli_tools.configure_arg_parser()
mock_args = parser.parse_args(CLI_WILDCARD_STRING_ARGS)
main.run(mock_args)

yaml_file_path = CLI_CONFIG_FILE
with open(yaml_file_path, "r") as yaml_file:
assert len(yaml_file.readlines()) == EXPECTED_NUM_YAML_LINES_WILDCARD

os.remove(yaml_file_path)
# Re-set GCS env var
os.environ[consts.ENV_DIRECTORY_VAR] = gcs_path


def test_cli_find_tables():
_store_bq_conn()

Expand Down