diff --git a/README.md b/README.md index d3420e1c2..57247b299 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,11 @@ grouped column validations a step further by providing the `--primary-key` flag. With this flag, if a mismatch was found, DVT will dive deeper into the slice with the error and find the row (primary key value) with the inconsistency. +You can specify a list of string columns for aggregations in order to calculate +an aggregation over the `length(string_col)`. Running an aggregation +over all columns ('*') will only run over numeric columns, unless the +`--wildcard-include-string-len` flag is present. + ``` data-validation (--verbose or -v) validate column --source-conn or -sc SOURCE_CONN @@ -120,6 +125,8 @@ data-validation (--verbose or -v) validate column See: *Validation Reports* section [--service-account or -sa PATH_TO_SA_KEY] Service account to use for BigQuery result handler output. + [--wildcard-include-string-len or -wis] + If flag is present, include string columns in aggregation as len(string_col) [--filters SOURCE_FILTER:TARGET_FILTER] Colon separated string values of source and target filters. If target filter is not provided, the source filter will run on source and target tables. diff --git a/data_validation/__main__.py b/data_validation/__main__.py index 35d9692c5..57957064c 100644 --- a/data_validation/__main__.py +++ b/data_validation/__main__.py @@ -53,6 +53,10 @@ def get_aggregate_config(args, config_manager): config_manager (ConfigManager): Validation config manager instance. """ aggregate_configs = [config_manager.build_config_count_aggregate()] + supported_data_types = ["float64", "int8", "int16", "int32", "int64", "decimal"] + + if args.wildcard_include_string_len: + supported_data_types.append("string") if args.count: col_args = None if args.count == "*" else cli_tools.get_arg_list(args.count) @@ -62,27 +66,27 @@ def get_aggregate_config(args, config_manager): if args.sum: col_args = None if args.sum == "*" else cli_tools.get_arg_list(args.sum) aggregate_configs += config_manager.build_config_column_aggregates( - "sum", col_args, consts.NUMERIC_DATA_TYPES + "sum", col_args, supported_data_types ) if args.avg: col_args = None if args.avg == "*" else cli_tools.get_arg_list(args.avg) aggregate_configs += config_manager.build_config_column_aggregates( - "avg", col_args, consts.NUMERIC_DATA_TYPES + "avg", col_args, supported_data_types ) if args.min: col_args = None if args.min == "*" else cli_tools.get_arg_list(args.min) aggregate_configs += config_manager.build_config_column_aggregates( - "min", col_args, consts.NUMERIC_DATA_TYPES + "min", col_args, supported_data_types ) if args.max: col_args = None if args.max == "*" else cli_tools.get_arg_list(args.max) aggregate_configs += config_manager.build_config_column_aggregates( - "max", col_args, consts.NUMERIC_DATA_TYPES + "max", col_args, supported_data_types ) if args.bit_xor: col_args = None if args.bit_xor == "*" else cli_tools.get_arg_list(args.bit_xor) aggregate_configs += config_manager.build_config_column_aggregates( - "bit_xor", col_args, consts.NUMERIC_DATA_TYPES + "bit_xor", col_args, supported_data_types ) return aggregate_configs diff --git a/data_validation/cli_tools.py b/data_validation/cli_tools.py index ecebb9af8..91c973022 100644 --- a/data_validation/cli_tools.py +++ b/data_validation/cli_tools.py @@ -320,6 +320,12 @@ def _configure_run_parser(subparsers): "-rbs", help="Row batch size used for random row filters (default 10,000).", ) + run_parser.add_argument( + "--wildcard-include-string-len", + "-wis", + action="store_true", + help="Include string fields for wildcard aggregations.", + ) def _configure_connection_parser(subparsers): @@ -513,6 +519,12 @@ def _configure_column_parser(column_parser): "-rbs", help="Row batch size used for random row filters (default 10,000).", ) + column_parser.add_argument( + "--wildcard-include-string-len", + "-wis", + action="store_true", + help="Include string fields for wildcard aggregations.", + ) def _configure_schema_parser(schema_parser): diff --git a/data_validation/combiner.py b/data_validation/combiner.py index 8c5fff5c7..e60756f0c 100644 --- a/data_validation/combiner.py +++ b/data_validation/combiner.py @@ -101,6 +101,10 @@ def _calculate_difference(field_differences, datatype, validation, is_value_comp if isinstance(datatype, ibis.expr.datatypes.Timestamp): source_value = field_differences["differences_source_value"].epoch_seconds() target_value = field_differences["differences_target_value"].epoch_seconds() + elif isinstance(datatype, ibis.expr.datatypes.Float64): + # Float64 type results from AVG() aggregation + source_value = field_differences["differences_source_value"].round(digits=4) + target_value = field_differences["differences_target_value"].round(digits=4) else: source_value = field_differences["differences_source_value"] target_value = field_differences["differences_target_value"] diff --git a/data_validation/config_manager.py b/data_validation/config_manager.py index dccebe0b6..2fd6e8866 100644 --- a/data_validation/config_manager.py +++ b/data_validation/config_manager.py @@ -444,6 +444,30 @@ def build_config_count_aggregate(self): return aggregate_config + def append_stringlen_calc_field(self, column, agg_type): + """ Append calculated field for length(string) for column validation""" + calculated_config = { + consts.CONFIG_CALCULATED_SOURCE_COLUMNS: [column], + consts.CONFIG_CALCULATED_TARGET_COLUMNS: [column], + consts.CONFIG_FIELD_ALIAS: f"length__{column}", + consts.CONFIG_TYPE: "length", + consts.CONFIG_DEPTH: 0, + } + + existing_calc_fields = [ + x[consts.CONFIG_FIELD_ALIAS] for x in self.calculated_fields + ] + if calculated_config[consts.CONFIG_FIELD_ALIAS] not in existing_calc_fields: + self.append_calculated_fields([calculated_config]) + + aggregate_config = { + consts.CONFIG_SOURCE_COLUMN: f"length__{column}", + consts.CONFIG_TARGET_COLUMN: f"length__{column}", + consts.CONFIG_FIELD_ALIAS: f"{agg_type}__length__{column}", + consts.CONFIG_TYPE: agg_type, + } + return aggregate_config + def build_config_column_aggregates(self, agg_type, arg_value, supported_types): """Return list of aggregate objects of given agg_type.""" aggregate_configs = [] @@ -453,6 +477,9 @@ def build_config_column_aggregates(self, agg_type, arg_value, supported_types): casefold_source_columns = {x.casefold(): str(x) for x in source_table.columns} casefold_target_columns = {x.casefold(): str(x) for x in target_table.columns} + if arg_value and supported_types: + supported_types.append("string") + allowlist_columns = arg_value or casefold_source_columns for column in casefold_source_columns: # Get column type and remove precision/scale attributes @@ -463,13 +490,19 @@ def build_config_column_aggregates(self, agg_type, arg_value, supported_types): continue elif column not in casefold_target_columns: logging.info( - f"Skipping Agg {agg_type}: {source_table.op().name}.{column}" + f"Skipping {agg_type} on {column} as column is not present in target table" ) continue elif supported_types and column_type not in supported_types: if self.verbose: - msg = f"Skipping Agg {agg_type}: {source_table.op().name}.{column} {column_type}" - print(msg) + logging.info( + f"Skipping {agg_type} on {column} due to data type: {column_type}" + ) + continue + + if column_type == "string": + aggregate_config = self.append_stringlen_calc_field(column, agg_type) + aggregate_configs.append(aggregate_config) continue aggregate_config = { @@ -500,12 +533,12 @@ def build_config_calculated_fields( continue elif column not in casefold_target_columns: logging.info( - f"Skipping Calc {calc_type}: {source_table.op().name}.{column} {column_type}" + f"Skipping {calc_type} on {column} as column is not present in target table" ) continue elif supported_types and column_type not in supported_types: if self.verbose: - msg = f"Skipping Calc {calc_type}: {source_table.op().name}.{column} {column_type}" + msg = f"Skipping {calc_type} on {column} due to data type: {column_type}" print(msg) continue diff --git a/data_validation/consts.py b/data_validation/consts.py index ad77edc46..85317523f 100644 --- a/data_validation/consts.py +++ b/data_validation/consts.py @@ -124,9 +124,6 @@ RESULT_TYPE_SOURCE = "source" RESULT_TYPE_TARGET = "target" -# Ibis Object Info -NUMERIC_DATA_TYPES = ["float64", "int8", "int16", "int32", "int64", "decimal"] - FORMAT_TYPES = ["csv", "json", "table", "text"] # Text Result Handler column filter list diff --git a/tests/system/data_sources/test_bigquery.py b/tests/system/data_sources/test_bigquery.py index 06c9c11c1..fe3a9609b 100644 --- a/tests/system/data_sources/test_bigquery.py +++ b/tests/system/data_sources/test_bigquery.py @@ -177,10 +177,27 @@ "--config-file", CLI_CONFIG_FILE, ] -EXPECTED_NUM_YAML_LINES = 36 # Expected number of lines for validation config geenrated by CLI_STORE_COLUMN_ARGS +EXPECTED_NUM_YAML_LINES = 47 # Expected number of lines for validation config geenrated by CLI_STORE_COLUMN_ARGS CLI_RUN_CONFIG_ARGS = ["run-config", "--config-file", CLI_CONFIG_FILE] CLI_CONFIGS_RUN_ARGS = ["configs", "run", "--config-file", CLI_CONFIG_FILE] +CLI_WILDCARD_STRING_ARGS = [ + "validate", + "column", + "--source-conn", + BQ_CONN_NAME, + "--target-conn", + BQ_CONN_NAME, + "--tables-list", + "bigquery-public-data.new_york_citibike.citibike_trips", + "--sum", + "*", + "--wildcard-include-string-len", + "--config-file", + CLI_CONFIG_FILE, +] +EXPECTED_NUM_YAML_LINES_WILDCARD = 112 + CLI_FIND_TABLES_ARGS = [ "find-tables", "--source-conn", @@ -331,6 +348,29 @@ def test_cli_store_yaml_then_run_local(): os.environ[consts.ENV_DIRECTORY_VAR] = gcs_path +def test_wildcard_column_agg_yaml(): + """Test storing column validation YAML with string fields.""" + # Unset GCS env var so that YAML is saved locally + gcs_path = os.environ[consts.ENV_DIRECTORY_VAR] + os.environ[consts.ENV_DIRECTORY_VAR] = "" + + # Store BQ Connection + _store_bq_conn() + + # Build validation and store to file + parser = cli_tools.configure_arg_parser() + mock_args = parser.parse_args(CLI_WILDCARD_STRING_ARGS) + main.run(mock_args) + + yaml_file_path = CLI_CONFIG_FILE + with open(yaml_file_path, "r") as yaml_file: + assert len(yaml_file.readlines()) == EXPECTED_NUM_YAML_LINES_WILDCARD + + os.remove(yaml_file_path) + # Re-set GCS env var + os.environ[consts.ENV_DIRECTORY_VAR] = gcs_path + + def test_cli_find_tables(): _store_bq_conn()