Skip to content

Commit

Permalink
feat: Add support for random row sampling on binary id columns (#1135)
Browse files Browse the repository at this point in the history
* fix: Cast binary random row IDs to string(hex)

* Add blacken and lint changes, comments for next steps

* feat: Add support for random row sampling on binary id columns

* tests: Add tests for random row sampling on binary id columns

* tests: Disable random row on binary key Hive test

---------

Co-authored-by: helensilva14 <[email protected]>
  • Loading branch information
nj1973 and helensilva14 committed May 21, 2024
1 parent f51f327 commit c3d2155
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 19 deletions.
42 changes: 28 additions & 14 deletions data_validation/data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,15 @@ def _add_random_row_filter(self):
raise ValueError("Primary Keys are required for Random Row Filters")

# Filter for only first primary key (multi-pk filter not supported)
primary_key_info = self.config_manager.primary_keys[0]
source_pk_column = self.config_manager.primary_keys[0][
consts.CONFIG_SOURCE_COLUMN
]
target_pk_column = self.config_manager.primary_keys[0][
consts.CONFIG_TARGET_COLUMN
]

randomRowBuilder = RandomRowBuilder(
[primary_key_info[consts.CONFIG_SOURCE_COLUMN]],
[source_pk_column],
self.config_manager.random_row_batch_size(),
)

Expand All @@ -128,24 +133,33 @@ def _add_random_row_filter(self):
self.validation_builder.source_builder,
)

# Check if source table's primary key is BINARY, if so then
# force cast the id columns to STRING (HEX).
binary_conversion_required = False
if query[source_pk_column].type().is_binary():
binary_conversion_required = True
query = query.mutate(
**{source_pk_column: query[source_pk_column].cast("string")}
)

random_rows = self.config_manager.source_client.execute(query)
if len(random_rows) == 0:
return

random_values = list(random_rows[source_pk_column])
if binary_conversion_required:
# For binary ids we have a list of hex strings for our IN list.
# Each of these needs to be cast back to binary.
random_values = [ibis.literal(_).cast("binary") for _ in random_values]

filter_field = {
consts.CONFIG_TYPE: consts.FILTER_TYPE_ISIN,
consts.CONFIG_FILTER_SOURCE_COLUMN: primary_key_info[
consts.CONFIG_FIELD_ALIAS
],
consts.CONFIG_FILTER_SOURCE_VALUE: random_rows[
primary_key_info[consts.CONFIG_SOURCE_COLUMN]
],
consts.CONFIG_FILTER_TARGET_COLUMN: primary_key_info[
consts.CONFIG_FIELD_ALIAS
],
consts.CONFIG_FILTER_TARGET_VALUE: random_rows[
primary_key_info[consts.CONFIG_SOURCE_COLUMN]
],
consts.CONFIG_FILTER_SOURCE_COLUMN: source_pk_column,
consts.CONFIG_FILTER_SOURCE_VALUE: random_values,
consts.CONFIG_FILTER_TARGET_COLUMN: target_pk_column,
consts.CONFIG_FILTER_TARGET_VALUE: random_values,
}

self.validation_builder.add_filter(filter_field)

def query_too_large(self, rows_df, grouped_fields):
Expand Down
5 changes: 5 additions & 0 deletions tests/system/data_sources/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ def test_row_validation_core_types_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""Hive to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -330,6 +331,10 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
# We have a bug in our test Hive instance that returns
# zero rows on binary IN lists with >1 element.
# "--use-random-row",
# "--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
3 changes: 3 additions & 0 deletions tests/system/data_sources/test_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ def test_row_validation_core_types_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""MySQL to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -347,6 +348,8 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
"--use-random-row",
"--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
3 changes: 3 additions & 0 deletions tests/system/data_sources/test_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ def test_row_validation_large_decimals_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""Oracle to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -484,6 +485,8 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
"--use-random-row",
"--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
3 changes: 3 additions & 0 deletions tests/system/data_sources/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ def test_row_validation_large_decimals_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""PostgreSQL to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -776,6 +777,8 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
"--use-random-row",
"--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
3 changes: 3 additions & 0 deletions tests/system/data_sources/test_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ def test_row_validation_core_types_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""Snowflake to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -335,6 +336,8 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=PSO_DATA_VALIDATOR.PUBLIC.DVT_BINARY=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
"--use-random-row",
"--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
3 changes: 3 additions & 0 deletions tests/system/data_sources/test_spanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ def test_row_validation_core_types_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""Spanner to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -359,6 +360,8 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
"--use-random-row",
"--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
3 changes: 3 additions & 0 deletions tests/system/data_sources/test_sql_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ def test_row_validation_large_decimals_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""SQL Server to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -471,6 +472,8 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
"--use-random-row",
"--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
3 changes: 3 additions & 0 deletions tests/system/data_sources/test_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ def test_row_validation_large_decimals_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""Teradata to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -468,6 +469,8 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=udf.dvt_binary=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
"--use-random-row",
"--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
33 changes: 28 additions & 5 deletions third_party/ibis/ibis_addon/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,17 @@ def compile_to_char(numeric_value, fmt):


@bigquery_cast.register(str, dt.Binary, dt.String)
def bigquery_cast_generate(compiled_arg, from_, to):
def bigquery_cast_from_binary_generate(compiled_arg, from_, to):
"""Cast of binary to string should be hex conversion."""
return f"TO_HEX({compiled_arg})"


@bigquery_cast.register(str, dt.String, dt.Binary)
def bigquery_cast_to_binary_generate(compiled_arg, from_, to):
"""Cast of binary to string should be hex conversion."""
return f"FROM_HEX({compiled_arg})"


def format_hash_bigquery(translator, op):
arg = translator.translate(op.arg)
if op.how == "farm_fingerprint":
Expand Down Expand Up @@ -385,6 +391,9 @@ def sa_cast_hive(t, op):
if arg_dtype.is_binary() and typ.is_string():
# Binary to string cast is a "to hex" conversion for DVT.
return f"lower(hex({arg_formatted}))"
elif arg_dtype.is_string() and typ.is_binary():
# Binary from string cast is a "from hex" conversion for DVT.
return f"unhex({arg_formatted})"

# Follow the original Ibis code path.
# Cannot use sa_fixed_cast() because of ImpalaExprTranslator ancestry.
Expand All @@ -401,10 +410,13 @@ def sa_cast_postgres(t, op):
typ = op.to
arg_dtype = arg.output_dtype

sa_arg = t.translate(arg)
if arg_dtype.is_binary() and typ.is_string():
sa_arg = t.translate(arg)
# Binary to string cast is a "to hex" conversion for DVT.
return sa.func.encode(sa_arg, sa.literal("hex"))
elif arg_dtype.is_string() and typ.is_binary():
# Binary from string cast is a "from hex" conversion for DVT.
return sa.func.decode(sa_arg, sa.literal("hex"))

# Follow the original Ibis code path.
return sa_fixed_cast(t, op)
Expand All @@ -415,17 +427,19 @@ def sa_cast_mssql(t, op):
typ = op.to
arg_dtype = arg.output_dtype

sa_arg = t.translate(arg)
# Specialize going from a binary float type to a string.
if (arg_dtype.is_float32() or arg_dtype.is_float64()) and typ.is_string():
sa_arg = t.translate(arg)
# This prevents output in scientific notation, at least for my tests it did.
return sa.func.format(sa_arg, "G")
elif arg_dtype.is_binary() and typ.is_string():
sa_arg = t.translate(arg)
# Binary to string cast is a "to hex" conversion for DVT.
return sa.func.lower(
sa.func.convert(sa.text("VARCHAR(MAX)"), sa_arg, sa.literal(2))
)
elif arg_dtype.is_string() and typ.is_binary():
# Binary from string cast is a "from hex" conversion for DVT.
return sa.func.convert(sa.text("VARBINARY(MAX)"), sa_arg, sa.literal(2))

# Follow the original Ibis code path.
return sa_fixed_cast(t, op)
Expand All @@ -437,6 +451,7 @@ def sa_cast_mysql(t, op):
typ = op.to
arg_dtype = arg.output_dtype

sa_arg = t.translate(arg)
# Specialize going from numeric(p,s>0) to string
if (
arg_dtype.is_decimal()
Expand All @@ -452,9 +467,11 @@ def sa_cast_mysql(t, op):
# https://stackoverflow.com/a/20111398
return sa_fixed_cast(t, op) + sa.literal(0)
elif arg_dtype.is_binary() and typ.is_string():
sa_arg = t.translate(arg)
# Binary to string cast is a "to hex" conversion for DVT.
return sa.func.lower(sa.func.hex(sa_arg))
elif arg_dtype.is_string() and typ.is_binary():
# Binary from string cast is a "from hex" conversion for DVT.
return sa.func.unhex(sa_arg)

# Follow the original Ibis code path.
return sa_fixed_cast(t, op)
Expand All @@ -469,6 +486,9 @@ def sa_cast_oracle(t, op):
if arg_dtype.is_binary() and typ.is_string():
# Binary to string cast is a "to hex" conversion for DVT.
return sa.func.lower(sa.func.rawtohex(sa_arg))
elif arg_dtype.is_string() and typ.is_binary():
# Binary from string cast is a "from hex" conversion for DVT.
return sa.func.hextoraw(sa_arg)

# Follow the original Ibis code path.
return sa_fixed_cast(t, op)
Expand All @@ -487,6 +507,9 @@ def sa_cast_snowflake(t, op):
if arg_dtype.is_binary() and typ.is_string():
# Binary to string cast is a "to hex" conversion for DVT.
return sa.func.hex_encode(sa_arg, sa.literal(0))
elif arg_dtype.is_string() and typ.is_binary():
# Binary from string cast is a "from hex" conversion for DVT.
return sa.func.hex_decode_binary(sa_arg)

# Follow the original Ibis code path.
return sa_fixed_cast(t, op)
Expand Down
5 changes: 5 additions & 0 deletions third_party/ibis/ibis_teradata/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ def teradata_cast_binary_to_string(compiled_arg, from_, to):
return "LOWER(FROM_BYTES({}, 'base16'))".format(compiled_arg)


@teradata_cast.register(str, dt.String, dt.Binary)
def teradata_cast_string_to_binary(compiled_arg, from_, to):
return "TO_BYTES({}, 'base16')".format(compiled_arg)


@teradata_cast.register(str, dt.DataType, dt.DataType)
def teradata_cast_generate(compiled_arg, from_, to):
sql_type = ibis_type_to_teradata_type(to)
Expand Down

0 comments on commit c3d2155

Please sign in to comment.