Skip to content

Commit

Permalink
test: Add cross engine tests using BigQuery target (#843)
Browse files Browse the repository at this point in the history
* test: Add Oracle to BigQuery integration tests

* test: Add Hive to BigQuery integration tests

* test: Add SQL Server to BigQuery integration tests

* test: Add SQL Server to BigQuery integration tests

* test: Add Teradata to BigQuery integration tests

* test: Add issue-841 note for float32/64 issues

* test: Add issue-842 note for Hive char issue

* test: Add PostgreSQL to BigQuery integration tests

* test: Reformat Teradata to BigQuery integration tests

* test: Remove duplicate SQL Server test

* test: Disable Hive-to-Hive tests in favour of Hive to BigQuery tests
  • Loading branch information
nj1973 committed May 12, 2023
1 parent 0749c9e commit b3a828c
Show file tree
Hide file tree
Showing 5 changed files with 560 additions and 45 deletions.
130 changes: 124 additions & 6 deletions tests/system/data_sources/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from data_validation import __main__ as main
from data_validation import cli_tools, data_validation, consts
from tests.system.data_sources.test_bigquery import BQ_CONN


HIVE_HOST = os.getenv("HIVE_HOST", "localhost")
Expand Down Expand Up @@ -60,11 +61,23 @@ def test_count_validator():
assert df["source_agg_value"][0] == df["target_agg_value"][0]


def mock_get_connection_config(*args):
if args[1] in ("hive-conn", "mock-conn"):
return CONN
elif args[1] == "bq-conn":
return BQ_CONN


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
return_value=CONN,
new=mock_get_connection_config,
)
def test_schema_validation_core_types(mock_conn):
def disabled_test_schema_validation_core_types():
"""
Disabled this test in favour of test_schema_validation_core_types_to_bigquery().
The Hive integration tests are too slow and timing out but I believe
test_column_validation_core_types_to_bigquery() will cover off most of what this test does.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
[
Expand All @@ -87,9 +100,50 @@ def test_schema_validation_core_types(mock_conn):

@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
return_value=CONN,
new=mock_get_connection_config,
)
def test_column_validation_core_types(mock_conn):
def test_schema_validation_core_types_to_bigquery():
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
[
"validate",
"schema",
"-sc=hive-conn",
"-tc=bq-conn",
"-tbls=pso_data_validator.dvt_core_types",
"--filter-status=fail",
(
# All Hive integrals go to BigQuery INT64.
"--allow-list=int8:int64,int16:int64,int32:int64,"
# Hive decimals that map to BigQuery NUMERIC.
"decimal(20,0):decimal(38,9),decimal(10,2):decimal(38,9),"
# Hive decimals that map to BigQuery BIGNUMERIC.
# When issue-839 is resolved we need to edit the line below as appropriate.
"decimal(38,0):decimal(38,9),"
# BigQuery does not have a float32 type.
"float32:float64"
),
]
)
config_managers = main.build_config_managers_from_args(args)
assert len(config_managers) == 1
config_manager = config_managers[0]
validator = data_validation.DataValidation(config_manager.config, verbose=False)
df = validator.execute()
# With filter on failures the data frame should be empty
assert len(df) == 0


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def disabled_test_column_validation_core_types():
"""
Disabled this test in favour of test_column_validation_core_types_to_bigquery().
The Hive integration tests are too slow and timing out but I believe
test_column_validation_core_types_to_bigquery() will cover off most of what this test does.
"""
parser = cli_tools.configure_arg_parser()
# Hive tests are really slow so I've excluded --min below assuming that --max is
# effectively the same test when comparing an engine back to itself.
Expand All @@ -116,9 +170,44 @@ def test_column_validation_core_types(mock_conn):

@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
return_value=CONN,
new=mock_get_connection_config,
)
def test_column_validation_core_types_to_bigquery():
parser = cli_tools.configure_arg_parser()
# Hive tests are really slow so I've excluded --min below assuming that --max is effectively the same test.
# We've excluded col_float32 because BigQuery does not have an exact same type and float32/64 are lossy and cannot be compared.
# TODO Change --sum and --max options to include col_char_2 when issue-842 is complete.
args = parser.parse_args(
[
"validate",
"column",
"-sc=hive-conn",
"-tc=bq-conn",
"-tbls=pso_data_validator.dvt_core_types",
"--filter-status=fail",
"--sum=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_string,col_date,col_datetime,col_tstz",
"--max=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_string,col_date,col_datetime,col_tstz",
]
)
config_managers = main.build_config_managers_from_args(args)
assert len(config_managers) == 1
config_manager = config_managers[0]
validator = data_validation.DataValidation(config_manager.config, verbose=False)
df = validator.execute()
# With filter on failures the data frame should be empty
assert len(df) == 0


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_row_validation_core_types(mock_conn):
def disabled_test_row_validation_core_types():
"""
Disabled this test in favour of test_row_validation_core_types_to_bigquery().
The Hive integration tests are too slow and timing out but I believe
test_column_validation_core_types_to_bigquery() will cover off most of what this test does.
"""
parser = cli_tools.configure_arg_parser()
# TODO Change --hash option to * below when issue-765 is complete.
args = parser.parse_args(
Expand All @@ -140,3 +229,32 @@ def test_row_validation_core_types(mock_conn):
df = validator.execute()
# With filter on failures the data frame should be empty
assert len(df) == 0


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_row_validation_core_types_to_bigquery():
parser = cli_tools.configure_arg_parser()
# TODO Change --hash option to include col_date,col_datetime,col_tstz when issue-765 is complete.
# TODO Change --hash string below to include col_float32,col_float64 when issue-841 is complete.
args = parser.parse_args(
[
"validate",
"row",
"-sc=hive-conn",
"-tc=bq-conn",
"-tbls=pso_data_validator.dvt_core_types",
"--primary-keys=id",
"--filter-status=fail",
"--hash=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_varchar_30,col_char_2,col_string",
]
)
config_managers = main.build_config_managers_from_args(args)
assert len(config_managers) == 1
config_manager = config_managers[0]
validator = data_validation.DataValidation(config_manager.config, verbose=False)
df = validator.execute()
# With filter on failures the data frame should be empty
assert len(df) == 0
125 changes: 115 additions & 10 deletions tests/system/data_sources/test_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from data_validation import __main__ as main
from data_validation import cli_tools, data_validation, consts
from tests.system.data_sources.test_bigquery import BQ_CONN


ORACLE_HOST = os.getenv("ORACLE_HOST", "localhost")
Expand Down Expand Up @@ -62,11 +63,19 @@ def test_count_validator():
assert df["source_agg_value"][0] == df["target_agg_value"][0]


def mock_get_connection_config(*args):
if args[1] in ("ora-conn", "mock-conn"):
return CONN
elif args[1] == "bq-conn":
return BQ_CONN


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
return_value=CONN,
new=mock_get_connection_config,
)
def test_schema_validation_core_types(mock_conn):
def test_schema_validation_core_types():
"""Oracle to Oracle dvt_core_types schema validation"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
[
Expand All @@ -89,14 +98,49 @@ def test_schema_validation_core_types(mock_conn):

@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
return_value=CONN,
new=mock_get_connection_config,
)
def test_column_validation_core_types(mock_conn):
def test_schema_validation_core_types_to_bigquery():
"""Oracle to BigQuery dvt_core_types schema validation"""
parser = cli_tools.configure_arg_parser()
# TODO Add col_datetime,col_tstz to --sum string below when issue-762 is complete. Or change whole string to * if issue-763 is also complete.
# TODO Add col_dec_20,col_dec_38 to --sum string below when issue-763 is complete. Or change whole string to * if issue-762 is also complete.
# TODO Change --min string below to * when issue-763 is complete.
# TODO Change --max string below to * when issue-763 is complete.
args = parser.parse_args(
[
"validate",
"schema",
"-sc=ora-conn",
"-tc=bq-conn",
"-tbls=pso_data_validator.dvt_core_types",
"--filter-status=fail",
(
# Integral Oracle NUMBERS go to BigQuery INT64.
"--allow-list=decimal(8,0):int64,decimal(2,0):int64,decimal(4,0):int64,decimal(9,0):int64,decimal(18,0):int64,"
# Oracle NUMBERS that map to BigQuery NUMERIC.
"decimal(20,0):decimal(38,9),decimal(10,2):decimal(38,9),"
# Oracle NUMBERS that map to BigQuery BIGNUMERIC.
# When issue-839 is resolved we need to edit the line below as appropriate.
"decimal(38,0):decimal(38,9),"
# BigQuery does not have a float32 type.
"float32:float64"
),
]
)
config_managers = main.build_config_managers_from_args(args)
assert len(config_managers) == 1
config_manager = config_managers[0]
validator = data_validation.DataValidation(config_manager.config, verbose=False)
df = validator.execute()
# With filter on failures the data frame should be empty
assert len(df) == 0


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_column_validation_core_types():
"""Oracle to Oracle dvt_core_types column validation"""
parser = cli_tools.configure_arg_parser()
# TODO Change --sum string below to * when issue-762 is complete.
args = parser.parse_args(
[
"validate",
Expand All @@ -121,9 +165,41 @@ def test_column_validation_core_types(mock_conn):

@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
return_value=CONN,
new=mock_get_connection_config,
)
def test_column_validation_core_types_to_bigquery():
parser = cli_tools.configure_arg_parser()
# TODO Change --sum string below to include col_datetime and col_tstz when issue-762 is complete.
# TODO Change --min/max strings below to include col_tstz when issue-706 is complete.
# We've excluded col_float32 because BigQuery does not have an exact same type and float32/64 are lossy and cannot be compared.
args = parser.parse_args(
[
"validate",
"column",
"-sc=ora-conn",
"-tc=bq-conn",
"-tbls=pso_data_validator.dvt_core_types",
"--filter-status=fail",
"--sum=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_char_2,col_string,col_date",
"--min=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_char_2,col_string,col_date,col_datetime",
"--max=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_float64,col_varchar_30,col_char_2,col_string,col_date,col_datetime",
]
)
config_managers = main.build_config_managers_from_args(args)
assert len(config_managers) == 1
config_manager = config_managers[0]
validator = data_validation.DataValidation(config_manager.config, verbose=False)
df = validator.execute()
# With filter on failures the data frame should be empty
assert len(df) == 0


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_row_validation_core_types(mock_conn):
def test_row_validation_core_types():
"""Oracle to Oracle dvt_core_types row validation"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
[
Expand All @@ -144,3 +220,32 @@ def test_row_validation_core_types(mock_conn):
df = validator.execute()
# With filter on failures the data frame should be empty
assert len(df) == 0


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_row_validation_core_types_to_bigquery():
# TODO Change --hash string below to include col_tstz when issue-706 is complete.
# TODO Change --hash string below to include col_float32,col_float64 when issue-841 is complete.
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
[
"validate",
"row",
"-sc=ora-conn",
"-tc=bq-conn",
"-tbls=pso_data_validator.dvt_core_types",
"--primary-keys=id",
"--filter-status=fail",
"--hash=col_int8,col_int16,col_int32,col_int64,col_dec_20,col_dec_38,col_dec_10_2,col_varchar_30,col_char_2,col_string,col_date,col_datetime",
]
)
config_managers = main.build_config_managers_from_args(args)
assert len(config_managers) == 1
config_manager = config_managers[0]
validator = data_validation.DataValidation(config_manager.config, verbose=False)
df = validator.execute()
# With filter on failures the data frame should be empty
assert len(df) == 0
Loading

0 comments on commit b3a828c

Please sign in to comment.