Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for random row sampling on binary id columns #1135

Merged
merged 16 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions data_validation/data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,15 @@ def _add_random_row_filter(self):
raise ValueError("Primary Keys are required for Random Row Filters")

# Filter for only first primary key (multi-pk filter not supported)
primary_key_info = self.config_manager.primary_keys[0]
source_pk_column = self.config_manager.primary_keys[0][
consts.CONFIG_SOURCE_COLUMN
]
target_pk_column = self.config_manager.primary_keys[0][
consts.CONFIG_TARGET_COLUMN
]

randomRowBuilder = RandomRowBuilder(
[primary_key_info[consts.CONFIG_SOURCE_COLUMN]],
[source_pk_column],
self.config_manager.random_row_batch_size(),
)

Expand All @@ -128,24 +133,33 @@ def _add_random_row_filter(self):
self.validation_builder.source_builder,
)

# Check if source table's primary key is BINARY, if so then
# force cast the id columns to STRING (HEX).
binary_conversion_required = False
if query[source_pk_column].type().is_binary():
binary_conversion_required = True
query = query.mutate(
**{source_pk_column: query[source_pk_column].cast("string")}
)

random_rows = self.config_manager.source_client.execute(query)
if len(random_rows) == 0:
return

random_values = list(random_rows[source_pk_column])
if binary_conversion_required:
# For binary ids we have a list of hex strings for our IN list.
# Each of these needs to be cast back to binary.
random_values = [ibis.literal(_).cast("binary") for _ in random_values]

filter_field = {
consts.CONFIG_TYPE: consts.FILTER_TYPE_ISIN,
consts.CONFIG_FILTER_SOURCE_COLUMN: primary_key_info[
consts.CONFIG_FIELD_ALIAS
],
consts.CONFIG_FILTER_SOURCE_VALUE: random_rows[
primary_key_info[consts.CONFIG_SOURCE_COLUMN]
],
consts.CONFIG_FILTER_TARGET_COLUMN: primary_key_info[
consts.CONFIG_FIELD_ALIAS
],
consts.CONFIG_FILTER_TARGET_VALUE: random_rows[
primary_key_info[consts.CONFIG_SOURCE_COLUMN]
],
consts.CONFIG_FILTER_SOURCE_COLUMN: source_pk_column,
consts.CONFIG_FILTER_SOURCE_VALUE: random_values,
consts.CONFIG_FILTER_TARGET_COLUMN: target_pk_column,
consts.CONFIG_FILTER_TARGET_VALUE: random_values,
}

self.validation_builder.add_filter(filter_field)

def query_too_large(self, rows_df, grouped_fields):
Expand Down
5 changes: 5 additions & 0 deletions tests/system/data_sources/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ def test_row_validation_core_types_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""Hive to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -330,6 +331,10 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
# We have a bug in our test Hive instance that returns
# zero rows on binary IN lists with >1 element.
# "--use-random-row",
# "--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
3 changes: 3 additions & 0 deletions tests/system/data_sources/test_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ def test_row_validation_core_types_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""MySQL to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -347,6 +348,8 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
"--use-random-row",
"--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
3 changes: 3 additions & 0 deletions tests/system/data_sources/test_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ def test_row_validation_large_decimals_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""Oracle to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -484,6 +485,8 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
"--use-random-row",
"--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
3 changes: 3 additions & 0 deletions tests/system/data_sources/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ def test_row_validation_large_decimals_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""PostgreSQL to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -776,6 +777,8 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
"--use-random-row",
"--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
3 changes: 3 additions & 0 deletions tests/system/data_sources/test_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ def test_row_validation_core_types_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""Snowflake to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -335,6 +336,8 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=PSO_DATA_VALIDATOR.PUBLIC.DVT_BINARY=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
"--use-random-row",
"--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
3 changes: 3 additions & 0 deletions tests/system/data_sources/test_spanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ def test_row_validation_core_types_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""Spanner to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -359,6 +360,8 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
"--use-random-row",
"--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
3 changes: 3 additions & 0 deletions tests/system/data_sources/test_sql_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ def test_row_validation_large_decimals_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""SQL Server to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -471,6 +472,8 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
"--use-random-row",
"--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
3 changes: 3 additions & 0 deletions tests/system/data_sources/test_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ def test_row_validation_large_decimals_to_bigquery():
def test_row_validation_binary_pk_to_bigquery():
"""Teradata to BigQuery dvt_binary row validation.
This is testing binary primary key join columns.
Includes random row filter test.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -468,6 +469,8 @@ def test_row_validation_binary_pk_to_bigquery():
"-tbls=udf.dvt_binary=pso_data_validator.dvt_binary",
"--primary-keys=binary_id",
"--hash=int_id,other_data",
"--use-random-row",
"--random-row-batch-size=5",
]
)
df = run_test_from_cli_args(args)
Expand Down
33 changes: 28 additions & 5 deletions third_party/ibis/ibis_addon/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,17 @@ def compile_to_char(numeric_value, fmt):


@bigquery_cast.register(str, dt.Binary, dt.String)
def bigquery_cast_generate(compiled_arg, from_, to):
def bigquery_cast_from_binary_generate(compiled_arg, from_, to):
"""Cast of binary to string should be hex conversion."""
return f"TO_HEX({compiled_arg})"


@bigquery_cast.register(str, dt.String, dt.Binary)
def bigquery_cast_to_binary_generate(compiled_arg, from_, to):
"""Cast of binary to string should be hex conversion."""
return f"FROM_HEX({compiled_arg})"


def format_hash_bigquery(translator, op):
arg = translator.translate(op.arg)
if op.how == "farm_fingerprint":
Expand Down Expand Up @@ -385,6 +391,9 @@ def sa_cast_hive(t, op):
if arg_dtype.is_binary() and typ.is_string():
# Binary to string cast is a "to hex" conversion for DVT.
return f"lower(hex({arg_formatted}))"
elif arg_dtype.is_string() and typ.is_binary():
# Binary from string cast is a "from hex" conversion for DVT.
return f"unhex({arg_formatted})"

# Follow the original Ibis code path.
# Cannot use sa_fixed_cast() because of ImpalaExprTranslator ancestry.
Expand All @@ -401,10 +410,13 @@ def sa_cast_postgres(t, op):
typ = op.to
arg_dtype = arg.output_dtype

sa_arg = t.translate(arg)
if arg_dtype.is_binary() and typ.is_string():
sa_arg = t.translate(arg)
# Binary to string cast is a "to hex" conversion for DVT.
return sa.func.encode(sa_arg, sa.literal("hex"))
elif arg_dtype.is_string() and typ.is_binary():
# Binary from string cast is a "from hex" conversion for DVT.
return sa.func.decode(sa_arg, sa.literal("hex"))

# Follow the original Ibis code path.
return sa_fixed_cast(t, op)
Expand All @@ -415,17 +427,19 @@ def sa_cast_mssql(t, op):
typ = op.to
arg_dtype = arg.output_dtype

sa_arg = t.translate(arg)
# Specialize going from a binary float type to a string.
if (arg_dtype.is_float32() or arg_dtype.is_float64()) and typ.is_string():
sa_arg = t.translate(arg)
# This prevents output in scientific notation, at least for my tests it did.
return sa.func.format(sa_arg, "G")
elif arg_dtype.is_binary() and typ.is_string():
sa_arg = t.translate(arg)
# Binary to string cast is a "to hex" conversion for DVT.
return sa.func.lower(
sa.func.convert(sa.text("VARCHAR(MAX)"), sa_arg, sa.literal(2))
)
elif arg_dtype.is_string() and typ.is_binary():
# Binary from string cast is a "from hex" conversion for DVT.
return sa.func.convert(sa.text("VARBINARY(MAX)"), sa_arg, sa.literal(2))

# Follow the original Ibis code path.
return sa_fixed_cast(t, op)
Expand All @@ -437,6 +451,7 @@ def sa_cast_mysql(t, op):
typ = op.to
arg_dtype = arg.output_dtype

sa_arg = t.translate(arg)
# Specialize going from numeric(p,s>0) to string
if (
arg_dtype.is_decimal()
Expand All @@ -452,9 +467,11 @@ def sa_cast_mysql(t, op):
# https://stackoverflow.com/a/20111398
return sa_fixed_cast(t, op) + sa.literal(0)
elif arg_dtype.is_binary() and typ.is_string():
sa_arg = t.translate(arg)
# Binary to string cast is a "to hex" conversion for DVT.
return sa.func.lower(sa.func.hex(sa_arg))
elif arg_dtype.is_string() and typ.is_binary():
# Binary from string cast is a "from hex" conversion for DVT.
return sa.func.unhex(sa_arg)

# Follow the original Ibis code path.
return sa_fixed_cast(t, op)
Expand All @@ -469,6 +486,9 @@ def sa_cast_oracle(t, op):
if arg_dtype.is_binary() and typ.is_string():
# Binary to string cast is a "to hex" conversion for DVT.
return sa.func.lower(sa.func.rawtohex(sa_arg))
elif arg_dtype.is_string() and typ.is_binary():
# Binary from string cast is a "from hex" conversion for DVT.
return sa.func.hextoraw(sa_arg)

# Follow the original Ibis code path.
return sa_fixed_cast(t, op)
Expand All @@ -487,6 +507,9 @@ def sa_cast_snowflake(t, op):
if arg_dtype.is_binary() and typ.is_string():
# Binary to string cast is a "to hex" conversion for DVT.
return sa.func.hex_encode(sa_arg, sa.literal(0))
elif arg_dtype.is_string() and typ.is_binary():
# Binary from string cast is a "from hex" conversion for DVT.
return sa.func.hex_decode_binary(sa_arg)

# Follow the original Ibis code path.
return sa_fixed_cast(t, op)
Expand Down
5 changes: 5 additions & 0 deletions third_party/ibis/ibis_teradata/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ def teradata_cast_binary_to_string(compiled_arg, from_, to):
return "LOWER(FROM_BYTES({}, 'base16'))".format(compiled_arg)


@teradata_cast.register(str, dt.String, dt.Binary)
def teradata_cast_string_to_binary(compiled_arg, from_, to):
return "TO_BYTES({}, 'base16')".format(compiled_arg)


@teradata_cast.register(str, dt.DataType, dt.DataType)
def teradata_cast_generate(compiled_arg, from_, to):
sql_type = ibis_type_to_teradata_type(to)
Expand Down