Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Issues with validate column for time zoned timestamps #930

Merged
merged 5 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 78 additions & 10 deletions tests/system/data_sources/test_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from data_validation import __main__ as main
from data_validation import cli_tools, data_validation, consts, exceptions
from data_validation.partition_builder import PartitionBuilder
from tests.system.data_sources.test_bigquery import BQ_CONN


MYSQL_HOST = os.getenv("MYSQL_HOST", "localhost")
MYSQL_USER = os.getenv("MYSQL_USER", "dvt")
Expand Down Expand Up @@ -426,6 +428,13 @@ def test_mysql_row():
pass


def mock_get_connection_config(*args):
if args[1] in ("mysql-conn", "mock-conn"):
return CONN
elif args[1] == "bq-conn":
return BQ_CONN


# Expected result from partitioning table on 3 keys
EXPECTED_PARTITION_FILTER = [
"course_id < 'ALG001' OR course_id = 'ALG001' AND (quarter_id < 3 OR quarter_id = 3 AND (student_id < 1234))",
Expand All @@ -439,9 +448,9 @@ def test_mysql_row():

@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
return_value=CONN,
new=mock_get_connection_config,
)
def test_mysql_generate_table_partitions(mock_conn):
def test_mysql_generate_table_partitions():
"""Test generate table partitions on mysql
The unit tests, specifically test_add_partition_filters_to_config and test_store_yaml_partitions_local
check that yaml configurations are created and saved in local storage. Partitions can only be created with
Expand Down Expand Up @@ -476,9 +485,9 @@ def test_mysql_generate_table_partitions(mock_conn):

@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
return_value=CONN,
new=mock_get_connection_config,
)
def test_schema_validation_core_types(mock_conn):
def test_schema_validation_core_types():
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
[
Expand All @@ -501,9 +510,9 @@ def test_schema_validation_core_types(mock_conn):

@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
return_value=CONN,
new=mock_get_connection_config,
)
def test_column_validation_core_types(mock_conn):
def test_column_validation_core_types():
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
[
Expand All @@ -529,9 +538,40 @@ def test_column_validation_core_types(mock_conn):

@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
return_value=CONN,
new=mock_get_connection_config,
)
def test_column_validation_core_types_to_bigquery():
parser = cli_tools.configure_arg_parser()
# TODO Change --sum string below to include col_datetime and col_tstz when issue-762 is complete.
# TODO Change --sum, --min and --max options to include col_char_2 when issue-842 is complete.
# We've excluded col_float32 because BigQuery does not have an exact same type and float32/64 are lossy and cannot be compared.
args = parser.parse_args(
[
"validate",
"column",
"-sc=mysql-conn",
"-tc=bq-conn",
"-tbls=pso_data_validator.dvt_core_types",
"--filter-status=fail",
"--sum=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_string,col_date",
"--min=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_string,col_date,col_datetime,col_tstz",
"--max=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_string,col_date,col_datetime,col_tstz",
]
)
config_managers = main.build_config_managers_from_args(args)
assert len(config_managers) == 1
config_manager = config_managers[0]
validator = data_validation.DataValidation(config_manager.config, verbose=False)
df = validator.execute()
# With filter on failures the data frame should be empty
assert len(df) == 0


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_row_validation_core_types(mock_conn):
def test_row_validation_core_types():
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
[
Expand All @@ -556,9 +596,37 @@ def test_row_validation_core_types(mock_conn):

@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
return_value=CONN,
new=mock_get_connection_config,
)
def test_row_validation_core_types_to_bigquery():
# TODO Change --hash string below to include col_float32,col_float64 when issue-841 is complete.
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
[
"validate",
"row",
"-sc=mysql-conn",
"-tc=bq-conn",
"-tbls=pso_data_validator.dvt_core_types",
"--primary-keys=id",
"--filter-status=fail",
"--hash=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_varchar_30,col_char_2,col_string,col_date,col_datetime,col_tstz",
]
)
config_managers = main.build_config_managers_from_args(args)
assert len(config_managers) == 1
config_manager = config_managers[0]
validator = data_validation.DataValidation(config_manager.config, verbose=False)
df = validator.execute()
# With filter on failures the data frame should be empty
assert len(df) == 0


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_custom_query_validation_core_types(mock_conn):
def test_custom_query_validation_core_types():
"""MySQL to MySQL dvt_core_types custom-query validation"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand Down
8 changes: 3 additions & 5 deletions tests/system/data_sources/test_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ def test_column_validation_core_types():
def test_column_validation_core_types_to_bigquery():
parser = cli_tools.configure_arg_parser()
# TODO Change --sum string below to include col_datetime and col_tstz when issue-762 is complete.
# TODO Change --min/max strings below to include col_tstz when issue-917 is complete.
# We've excluded col_float32 because BigQuery does not have an exact same type and float32/64 are lossy and cannot be compared.
args = parser.parse_args(
[
Expand All @@ -229,8 +228,8 @@ def test_column_validation_core_types_to_bigquery():
"-tbls=pso_data_validator.dvt_core_types",
"--filter-status=fail",
"--sum=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_char_2,col_string,col_date",
"--min=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_char_2,col_string,col_date,col_datetime",
"--max=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_char_2,col_string,col_date,col_datetime",
"--min=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_char_2,col_string,col_date,col_datetime,col_tstz",
"--max=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_char_2,col_string,col_date,col_datetime,col_tstz",
]
)
config_managers = main.build_config_managers_from_args(args)
Expand Down Expand Up @@ -275,7 +274,6 @@ def test_row_validation_core_types():
new=mock_get_connection_config,
)
def test_row_validation_core_types_to_bigquery():
# TODO Change --hash string below to include col_tstz when issue-917 is complete.
# TODO Change --hash string below to include col_float32,col_float64 when issue-841 is complete.
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
Expand All @@ -287,7 +285,7 @@ def test_row_validation_core_types_to_bigquery():
"-tbls=pso_data_validator.dvt_core_types",
"--primary-keys=id",
"--filter-status=fail",
"--hash=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_varchar_30,col_char_2,col_string,col_date,col_datetime",
"--hash=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_varchar_30,col_char_2,col_string,col_date,col_datetime,col_tstz",
]
)
config_managers = main.build_config_managers_from_args(args)
Expand Down
5 changes: 2 additions & 3 deletions tests/system/data_sources/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,6 @@ def test_column_validation_core_types():
)
def test_column_validation_core_types_to_bigquery():
parser = cli_tools.configure_arg_parser()
# TODO Change --min/max strings below to include col_tstz when issue-917 is complete.
# We've excluded col_float32 because BigQuery does not have an exact same type and float32/64 are lossy and cannot be compared.
# TODO Change --sum and --max options to include col_char_2 when issue-842 is complete.
args = parser.parse_args(
Expand All @@ -628,8 +627,8 @@ def test_column_validation_core_types_to_bigquery():
"-tbls=pso_data_validator.dvt_core_types",
"--filter-status=fail",
"--sum=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_string,col_date,col_datetime",
nehanene15 marked this conversation as resolved.
Show resolved Hide resolved
"--min=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_string,col_date,col_datetime",
"--max=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_string,col_date,col_datetime",
"--min=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_string,col_date,col_datetime,col_tstz",
"--max=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_string,col_date,col_datetime,col_tstz",
]
)
config_managers = main.build_config_managers_from_args(args)
Expand Down
7 changes: 3 additions & 4 deletions tests/system/data_sources/test_sql_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ def test_column_validation_core_types():
)
def test_column_validation_core_types_to_bigquery():
parser = cli_tools.configure_arg_parser()
# TODO Change --sum/min/max strings below to include col_tstz when issue-917 is complete.
# We've excluded col_float32 because BigQuery does not have an exact same type and float32/64 are lossy and cannot be compared.
# We've excluded col_char_2 since the data stored in MSSQL has a trailing space which is counted in the LEN()
args = parser.parse_args(
Expand All @@ -356,8 +355,8 @@ def test_column_validation_core_types_to_bigquery():
"-tbls=pso_data_validator.dvt_core_types",
"--filter-status=fail",
"--sum=col_int8,col_int16,col_int32,col_int64,col_float64,col_date,col_datetime,col_dec_10_2,col_dec_20,col_dec_38,col_varchar_30,col_char_2,col_string",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add col_tstz to the --sum flag as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is a tricky one.
I added the column and the test fails but it's not clear what the best course of action is.
If I compare SQL Server to BigQuery then we get an epoch seconds mismatch:

  • SQL Server: 259206
  • BigQuery: 280806

However if I compare SQL Server to itself then the value is 280806, which matches BigQuery.

The problem stems from these lines in config_manager.py:

        elif column_type == "timestamp" or column_type == "!timestamp":
            if (
                self.source_client.name == "bigquery"
                or self.target_client.name == "bigquery"
            ):
                calc_func = "cast"
                cast_type = "timestamp"

They are saying that if the source or target is BigQuery then cast both source and target to timestamp before doing the epoch seconds expression. The cast to timestamp is cropping the time zone in SQL Server.

At first I thought I needed to change the SQL Server cast to first convert to UTC but I also think it is wrong that a BigQuery requirement changes what we do on the other engine. If BigQuery needs a pre-cast to TIMESTAMP then it probably shouldn't be catered for here. Although I haven't looked into how hard it is to do that instead.

This needs more investigation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed - seeing something very similar in Teradata. Since we cast to timestamp, we lose the timezone and cause a mismatch when validating col_tstz for TD to BQ.

TD to TD also produces the correct 280806 value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created #938 for this problem.

"--min=col_int8,col_int16,col_int32,col_int64,col_float64,col_date,col_datetime,col_dec_10_2,col_dec_20,col_dec_38,col_varchar_30,col_char_2,col_string",
"--max=col_int8,col_int16,col_int32,col_int64,col_float64,col_date,col_datetime,col_dec_10_2,col_dec_20,col_dec_38,col_varchar_30,col_char_2,col_string",
"--min=col_int8,col_int16,col_int32,col_int64,col_float64,col_date,col_datetime,col_tstz,col_dec_10_2,col_dec_20,col_dec_38,col_varchar_30,col_char_2,col_string",
"--max=col_int8,col_int16,col_int32,col_int64,col_float64,col_date,col_datetime,col_tstz,col_dec_10_2,col_dec_20,col_dec_38,col_varchar_30,col_char_2,col_string",
]
)
config_managers = main.build_config_managers_from_args(args)
Expand Down Expand Up @@ -404,7 +403,7 @@ def test_row_validation_core_types():
def test_row_validation_core_types_to_bigquery():
parser = cli_tools.configure_arg_parser()
# TODO When issue-834 is complete add col_string to --hash string below.
# TODO Change --hash string below to include col_tstz when issue-917 is complete.
# TODO Change --hash string below to include col_tstz when issue-929 is complete.
# TODO Change --hash string below to include col_float32,col_float64 when issue-841 is complete.
args = parser.parse_args(
[
Expand Down
6 changes: 3 additions & 3 deletions tests/system/data_sources/test_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ def test_column_validation_core_types_to_bigquery():
"-tbls=udf.dvt_core_types=pso_data_validator.dvt_core_types",
"--filter-status=fail",
"--sum=col_int8,col_int16,col_int32,col_int64,col_float32,col_float64,col_varchar_30,col_char_2,col_string,col_date,col_dec_20,col_dec_38,col_dec_10_2",
"--min=col_int8,col_int16,col_int32,col_int64,col_float32,col_float64,col_varchar_30,col_char_2,col_string,col_date,col_dec_20,col_dec_38,col_dec_10_2",
"--max=col_int8,col_int16,col_int32,col_int64,col_float32,col_float64,col_varchar_30,col_char_2,col_string,col_date,col_dec_20,col_dec_38,col_dec_10_2",
"--min=col_int8,col_int16,col_int32,col_int64,col_float32,col_float64,col_varchar_30,col_char_2,col_string,col_date,col_tstz,col_dec_20,col_dec_38,col_dec_10_2",
"--max=col_int8,col_int16,col_int32,col_int64,col_float32,col_float64,col_varchar_30,col_char_2,col_string,col_date,col_tstz,col_dec_20,col_dec_38,col_dec_10_2",
]
)
config_managers = main.build_config_managers_from_args(args)
Expand Down Expand Up @@ -402,7 +402,7 @@ def test_row_validation_core_types_to_bigquery():
parser = cli_tools.configure_arg_parser()
# Excluded col_string because LONG VARCHAR column causes exception regardless of column contents:
# [Error 3798] A column or character expression is larger than the max size.
# TODO Change --hash option to include col_tstz when issue-917 is complete.
# TODO Change --hash option to include col_tstz when issue-929 is complete.
# TODO Change --hash option to include col_float32,col_float64 when issue-841 is complete.
args = parser.parse_args(
[
Expand Down
6 changes: 6 additions & 0 deletions third_party/ibis/ibis_oracle/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ def _table_column(t, op):
out_expr = get_col(sa_table, op)
out_expr.quote = t._quote_column_names

if op.output_dtype.is_timestamp():
timezone = op.output_dtype.timezone
if timezone is not None:
# Using literal_column on Oracle because the time zone string cannot be a bind.
out_expr = sa.literal_column(f"{out_expr.name} AT TIME ZONE '{timezone}'").label(op.name)

# If the column does not originate from the table set in the current SELECT
# context, we should format as a subquery
if t.permit_subquery and ctx.is_foreign_expr(table):
Expand Down