diff --git a/data_validation/data_validation.py b/data_validation/data_validation.py index e04b0173..e6f6c3d4 100644 --- a/data_validation/data_validation.py +++ b/data_validation/data_validation.py @@ -106,10 +106,15 @@ def _add_random_row_filter(self): raise ValueError("Primary Keys are required for Random Row Filters") # Filter for only first primary key (multi-pk filter not supported) - primary_key_info = self.config_manager.primary_keys[0] + source_pk_column = self.config_manager.primary_keys[0][ + consts.CONFIG_SOURCE_COLUMN + ] + target_pk_column = self.config_manager.primary_keys[0][ + consts.CONFIG_TARGET_COLUMN + ] randomRowBuilder = RandomRowBuilder( - [primary_key_info[consts.CONFIG_SOURCE_COLUMN]], + [source_pk_column], self.config_manager.random_row_batch_size(), ) @@ -128,24 +133,33 @@ def _add_random_row_filter(self): self.validation_builder.source_builder, ) + # Check if source table's primary key is BINARY, if so then + # force cast the id columns to STRING (HEX). + binary_conversion_required = False + if query[source_pk_column].type().is_binary(): + binary_conversion_required = True + query = query.mutate( + **{source_pk_column: query[source_pk_column].cast("string")} + ) + random_rows = self.config_manager.source_client.execute(query) if len(random_rows) == 0: return + + random_values = list(random_rows[source_pk_column]) + if binary_conversion_required: + # For binary ids we have a list of hex strings for our IN list. + # Each of these needs to be cast back to binary. + random_values = [ibis.literal(_).cast("binary") for _ in random_values] + filter_field = { consts.CONFIG_TYPE: consts.FILTER_TYPE_ISIN, - consts.CONFIG_FILTER_SOURCE_COLUMN: primary_key_info[ - consts.CONFIG_FIELD_ALIAS - ], - consts.CONFIG_FILTER_SOURCE_VALUE: random_rows[ - primary_key_info[consts.CONFIG_SOURCE_COLUMN] - ], - consts.CONFIG_FILTER_TARGET_COLUMN: primary_key_info[ - consts.CONFIG_FIELD_ALIAS - ], - consts.CONFIG_FILTER_TARGET_VALUE: random_rows[ - primary_key_info[consts.CONFIG_SOURCE_COLUMN] - ], + consts.CONFIG_FILTER_SOURCE_COLUMN: source_pk_column, + consts.CONFIG_FILTER_SOURCE_VALUE: random_values, + consts.CONFIG_FILTER_TARGET_COLUMN: target_pk_column, + consts.CONFIG_FILTER_TARGET_VALUE: random_values, } + self.validation_builder.add_filter(filter_field) def query_too_large(self, rows_df, grouped_fields): diff --git a/tests/system/data_sources/test_hive.py b/tests/system/data_sources/test_hive.py index 1a5c7a37..eb9ea737 100644 --- a/tests/system/data_sources/test_hive.py +++ b/tests/system/data_sources/test_hive.py @@ -319,6 +319,7 @@ def test_row_validation_core_types_to_bigquery(): def test_row_validation_binary_pk_to_bigquery(): """Hive to BigQuery dvt_binary row validation. This is testing binary primary key join columns. + Includes random row filter test. """ parser = cli_tools.configure_arg_parser() args = parser.parse_args( @@ -330,6 +331,10 @@ def test_row_validation_binary_pk_to_bigquery(): "-tbls=pso_data_validator.dvt_binary", "--primary-keys=binary_id", "--hash=int_id,other_data", + # We have a bug in our test Hive instance that returns + # zero rows on binary IN lists with >1 element. + # "--use-random-row", + # "--random-row-batch-size=5", ] ) df = run_test_from_cli_args(args) diff --git a/tests/system/data_sources/test_mysql.py b/tests/system/data_sources/test_mysql.py index 9810bf16..97991fb8 100644 --- a/tests/system/data_sources/test_mysql.py +++ b/tests/system/data_sources/test_mysql.py @@ -336,6 +336,7 @@ def test_row_validation_core_types_to_bigquery(): def test_row_validation_binary_pk_to_bigquery(): """MySQL to BigQuery dvt_binary row validation. This is testing binary primary key join columns. + Includes random row filter test. """ parser = cli_tools.configure_arg_parser() args = parser.parse_args( @@ -347,6 +348,8 @@ def test_row_validation_binary_pk_to_bigquery(): "-tbls=pso_data_validator.dvt_binary", "--primary-keys=binary_id", "--hash=int_id,other_data", + "--use-random-row", + "--random-row-batch-size=5", ] ) df = run_test_from_cli_args(args) diff --git a/tests/system/data_sources/test_oracle.py b/tests/system/data_sources/test_oracle.py index 93d2582a..cab758fa 100644 --- a/tests/system/data_sources/test_oracle.py +++ b/tests/system/data_sources/test_oracle.py @@ -473,6 +473,7 @@ def test_row_validation_large_decimals_to_bigquery(): def test_row_validation_binary_pk_to_bigquery(): """Oracle to BigQuery dvt_binary row validation. This is testing binary primary key join columns. + Includes random row filter test. """ parser = cli_tools.configure_arg_parser() args = parser.parse_args( @@ -484,6 +485,8 @@ def test_row_validation_binary_pk_to_bigquery(): "-tbls=pso_data_validator.dvt_binary", "--primary-keys=binary_id", "--hash=int_id,other_data", + "--use-random-row", + "--random-row-batch-size=5", ] ) df = run_test_from_cli_args(args) diff --git a/tests/system/data_sources/test_postgres.py b/tests/system/data_sources/test_postgres.py index 44d728ee..e5d5a8ca 100644 --- a/tests/system/data_sources/test_postgres.py +++ b/tests/system/data_sources/test_postgres.py @@ -765,6 +765,7 @@ def test_row_validation_large_decimals_to_bigquery(): def test_row_validation_binary_pk_to_bigquery(): """PostgreSQL to BigQuery dvt_binary row validation. This is testing binary primary key join columns. + Includes random row filter test. """ parser = cli_tools.configure_arg_parser() args = parser.parse_args( @@ -776,6 +777,8 @@ def test_row_validation_binary_pk_to_bigquery(): "-tbls=pso_data_validator.dvt_binary", "--primary-keys=binary_id", "--hash=int_id,other_data", + "--use-random-row", + "--random-row-batch-size=5", ] ) df = run_test_from_cli_args(args) diff --git a/tests/system/data_sources/test_snowflake.py b/tests/system/data_sources/test_snowflake.py index 7bdad15b..3b9fe0b9 100644 --- a/tests/system/data_sources/test_snowflake.py +++ b/tests/system/data_sources/test_snowflake.py @@ -324,6 +324,7 @@ def test_row_validation_core_types_to_bigquery(): def test_row_validation_binary_pk_to_bigquery(): """Snowflake to BigQuery dvt_binary row validation. This is testing binary primary key join columns. + Includes random row filter test. """ parser = cli_tools.configure_arg_parser() args = parser.parse_args( @@ -335,6 +336,8 @@ def test_row_validation_binary_pk_to_bigquery(): "-tbls=PSO_DATA_VALIDATOR.PUBLIC.DVT_BINARY=pso_data_validator.dvt_binary", "--primary-keys=binary_id", "--hash=int_id,other_data", + "--use-random-row", + "--random-row-batch-size=5", ] ) df = run_test_from_cli_args(args) diff --git a/tests/system/data_sources/test_spanner.py b/tests/system/data_sources/test_spanner.py index 1a6c6996..927a0613 100644 --- a/tests/system/data_sources/test_spanner.py +++ b/tests/system/data_sources/test_spanner.py @@ -348,6 +348,7 @@ def test_row_validation_core_types_to_bigquery(): def test_row_validation_binary_pk_to_bigquery(): """Spanner to BigQuery dvt_binary row validation. This is testing binary primary key join columns. + Includes random row filter test. """ parser = cli_tools.configure_arg_parser() args = parser.parse_args( @@ -359,6 +360,8 @@ def test_row_validation_binary_pk_to_bigquery(): "-tbls=pso_data_validator.dvt_binary", "--primary-keys=binary_id", "--hash=int_id,other_data", + "--use-random-row", + "--random-row-batch-size=5", ] ) df = run_test_from_cli_args(args) diff --git a/tests/system/data_sources/test_sql_server.py b/tests/system/data_sources/test_sql_server.py index be0da49f..3f72bef4 100644 --- a/tests/system/data_sources/test_sql_server.py +++ b/tests/system/data_sources/test_sql_server.py @@ -460,6 +460,7 @@ def test_row_validation_large_decimals_to_bigquery(): def test_row_validation_binary_pk_to_bigquery(): """SQL Server to BigQuery dvt_binary row validation. This is testing binary primary key join columns. + Includes random row filter test. """ parser = cli_tools.configure_arg_parser() args = parser.parse_args( @@ -471,6 +472,8 @@ def test_row_validation_binary_pk_to_bigquery(): "-tbls=pso_data_validator.dvt_binary", "--primary-keys=binary_id", "--hash=int_id,other_data", + "--use-random-row", + "--random-row-batch-size=5", ] ) df = run_test_from_cli_args(args) diff --git a/tests/system/data_sources/test_teradata.py b/tests/system/data_sources/test_teradata.py index f9bc1e3b..8be8a8d2 100644 --- a/tests/system/data_sources/test_teradata.py +++ b/tests/system/data_sources/test_teradata.py @@ -457,6 +457,7 @@ def test_row_validation_large_decimals_to_bigquery(): def test_row_validation_binary_pk_to_bigquery(): """Teradata to BigQuery dvt_binary row validation. This is testing binary primary key join columns. + Includes random row filter test. """ parser = cli_tools.configure_arg_parser() args = parser.parse_args( @@ -468,6 +469,8 @@ def test_row_validation_binary_pk_to_bigquery(): "-tbls=udf.dvt_binary=pso_data_validator.dvt_binary", "--primary-keys=binary_id", "--hash=int_id,other_data", + "--use-random-row", + "--random-row-batch-size=5", ] ) df = run_test_from_cli_args(args) diff --git a/third_party/ibis/ibis_addon/operations.py b/third_party/ibis/ibis_addon/operations.py index 38f465b3..9229819d 100644 --- a/third_party/ibis/ibis_addon/operations.py +++ b/third_party/ibis/ibis_addon/operations.py @@ -130,11 +130,17 @@ def compile_to_char(numeric_value, fmt): @bigquery_cast.register(str, dt.Binary, dt.String) -def bigquery_cast_generate(compiled_arg, from_, to): +def bigquery_cast_from_binary_generate(compiled_arg, from_, to): """Cast of binary to string should be hex conversion.""" return f"TO_HEX({compiled_arg})" +@bigquery_cast.register(str, dt.String, dt.Binary) +def bigquery_cast_to_binary_generate(compiled_arg, from_, to): + """Cast of binary to string should be hex conversion.""" + return f"FROM_HEX({compiled_arg})" + + def format_hash_bigquery(translator, op): arg = translator.translate(op.arg) if op.how == "farm_fingerprint": @@ -385,6 +391,9 @@ def sa_cast_hive(t, op): if arg_dtype.is_binary() and typ.is_string(): # Binary to string cast is a "to hex" conversion for DVT. return f"lower(hex({arg_formatted}))" + elif arg_dtype.is_string() and typ.is_binary(): + # Binary from string cast is a "from hex" conversion for DVT. + return f"unhex({arg_formatted})" # Follow the original Ibis code path. # Cannot use sa_fixed_cast() because of ImpalaExprTranslator ancestry. @@ -401,10 +410,13 @@ def sa_cast_postgres(t, op): typ = op.to arg_dtype = arg.output_dtype + sa_arg = t.translate(arg) if arg_dtype.is_binary() and typ.is_string(): - sa_arg = t.translate(arg) # Binary to string cast is a "to hex" conversion for DVT. return sa.func.encode(sa_arg, sa.literal("hex")) + elif arg_dtype.is_string() and typ.is_binary(): + # Binary from string cast is a "from hex" conversion for DVT. + return sa.func.decode(sa_arg, sa.literal("hex")) # Follow the original Ibis code path. return sa_fixed_cast(t, op) @@ -415,17 +427,19 @@ def sa_cast_mssql(t, op): typ = op.to arg_dtype = arg.output_dtype + sa_arg = t.translate(arg) # Specialize going from a binary float type to a string. if (arg_dtype.is_float32() or arg_dtype.is_float64()) and typ.is_string(): - sa_arg = t.translate(arg) # This prevents output in scientific notation, at least for my tests it did. return sa.func.format(sa_arg, "G") elif arg_dtype.is_binary() and typ.is_string(): - sa_arg = t.translate(arg) # Binary to string cast is a "to hex" conversion for DVT. return sa.func.lower( sa.func.convert(sa.text("VARCHAR(MAX)"), sa_arg, sa.literal(2)) ) + elif arg_dtype.is_string() and typ.is_binary(): + # Binary from string cast is a "from hex" conversion for DVT. + return sa.func.convert(sa.text("VARBINARY(MAX)"), sa_arg, sa.literal(2)) # Follow the original Ibis code path. return sa_fixed_cast(t, op) @@ -437,6 +451,7 @@ def sa_cast_mysql(t, op): typ = op.to arg_dtype = arg.output_dtype + sa_arg = t.translate(arg) # Specialize going from numeric(p,s>0) to string if ( arg_dtype.is_decimal() @@ -452,9 +467,11 @@ def sa_cast_mysql(t, op): # https://stackoverflow.com/a/20111398 return sa_fixed_cast(t, op) + sa.literal(0) elif arg_dtype.is_binary() and typ.is_string(): - sa_arg = t.translate(arg) # Binary to string cast is a "to hex" conversion for DVT. return sa.func.lower(sa.func.hex(sa_arg)) + elif arg_dtype.is_string() and typ.is_binary(): + # Binary from string cast is a "from hex" conversion for DVT. + return sa.func.unhex(sa_arg) # Follow the original Ibis code path. return sa_fixed_cast(t, op) @@ -469,6 +486,9 @@ def sa_cast_oracle(t, op): if arg_dtype.is_binary() and typ.is_string(): # Binary to string cast is a "to hex" conversion for DVT. return sa.func.lower(sa.func.rawtohex(sa_arg)) + elif arg_dtype.is_string() and typ.is_binary(): + # Binary from string cast is a "from hex" conversion for DVT. + return sa.func.hextoraw(sa_arg) # Follow the original Ibis code path. return sa_fixed_cast(t, op) @@ -487,6 +507,9 @@ def sa_cast_snowflake(t, op): if arg_dtype.is_binary() and typ.is_string(): # Binary to string cast is a "to hex" conversion for DVT. return sa.func.hex_encode(sa_arg, sa.literal(0)) + elif arg_dtype.is_string() and typ.is_binary(): + # Binary from string cast is a "from hex" conversion for DVT. + return sa.func.hex_decode_binary(sa_arg) # Follow the original Ibis code path. return sa_fixed_cast(t, op) diff --git a/third_party/ibis/ibis_teradata/registry.py b/third_party/ibis/ibis_teradata/registry.py index 192de29c..07510f2d 100644 --- a/third_party/ibis/ibis_teradata/registry.py +++ b/third_party/ibis/ibis_teradata/registry.py @@ -48,6 +48,11 @@ def teradata_cast_binary_to_string(compiled_arg, from_, to): return "LOWER(FROM_BYTES({}, 'base16'))".format(compiled_arg) +@teradata_cast.register(str, dt.String, dt.Binary) +def teradata_cast_string_to_binary(compiled_arg, from_, to): + return "TO_BYTES({}, 'base16')".format(compiled_arg) + + @teradata_cast.register(str, dt.DataType, dt.DataType) def teradata_cast_generate(compiled_arg, from_, to): sql_type = ibis_type_to_teradata_type(to)