diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 75ebfd76c..eecb81ee1 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -13,6 +13,9 @@ # limitations under the License. timeout: 7200s +options: + pool: + name: 'projects/pso-kokoro-resources/locations/us-central1/workerPools/private-pool' steps: - id: lint name: 'gcr.io/pso-kokoro-resources/python-multi' @@ -70,6 +73,14 @@ steps: - 'NOX_SESSION=integration_spanner' - 'PROJECT_ID=pso-kokoro-resources' waitFor: ['-'] +- id: integration_teradata + name: 'gcr.io/pso-kokoro-resources/python-multi' + args: ['bash', './ci/build.sh'] + env: + - 'NOX_SESSION=integration_teradata' + - 'PROJECT_ID=pso-kokoro-resources' + - 'TERADATA_PASSWORD=udfs' + - 'TERADATA_HOST=10.128.15.235' - id: integration_state name: 'gcr.io/pso-kokoro-resources/python-multi' args: ['bash', './ci/build.sh'] diff --git a/noxfile.py b/noxfile.py index 74c07d7eb..a96764a8d 100644 --- a/noxfile.py +++ b/noxfile.py @@ -198,6 +198,23 @@ def integration_spanner(session): session.run("pytest", "tests/system/data_sources/test_spanner.py", *session.posargs) +@nox.session(python=PYTHON_VERSIONS, venv_backend="venv") +def integration_teradata(session): + """Run Teradata integration tests. + Ensure Teradata validation is running as expected. + """ + _setup_session_requirements(session, extra_packages=["teradatasql"]) + + expected_env_vars = ["PROJECT_ID", "TERADATA_PASSWORD", "TERADATA_HOST"] + for env_var in expected_env_vars: + if not os.environ.get(env_var, ""): + raise Exception("Expected Env Var: %s" % env_var) + + session.run( + "pytest", "tests/system/data_sources/test_teradata.py", *session.posargs + ) + + @nox.session(python=random.choice(PYTHON_VERSIONS), venv_backend="venv") def integration_state(session): """Run StateManager integration tests. diff --git a/tests/system/data_sources/test_teradata.py b/tests/system/data_sources/test_teradata.py index b427d0499..2bcd8e707 100644 --- a/tests/system/data_sources/test_teradata.py +++ b/tests/system/data_sources/test_teradata.py @@ -12,72 +12,55 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime +import os -from data_validation import data_validation -from data_validation.query_builder import query_builder +from data_validation import data_validation, consts +TERADATA_PASSWORD = os.getenv("TERADATA_PASSWORD") +TERADATA_HOST = os.getenv("TERADATA_HOST") +PROJECT_ID = os.getenv("PROJECT_ID") -TERADATA_CONFIG = { - # Configuration Required for All Data Soures +conn = { "source_type": "Teradata", - # BigQuery Specific Connection Config - "config": { - "host": "127.0.0.1", - "user_name": "dbc", - "password": "dbc", - "port": 10255, - }, - # Configuration Required Depending on Validator Type - "schema_name": "Sys_Calendar", - "table_name": "CALENDAR", - "partition_column": "year_of_calendar", - "format": "table", + "host": TERADATA_HOST, + "user_name": "udf", + "password": TERADATA_PASSWORD, + "port": 1025, } -def create_validator(builder): - return data_validation.DataValidation( - TERADATA_CONFIG, builder=builder, result_handler=None, verbose=False - ) +TERADATA_CONFIG = { + # Specific Connection Config + consts.CONFIG_SOURCE_CONN: conn, + consts.CONFIG_TARGET_CONN: conn, + # Validation Type + consts.CONFIG_TYPE: "Column", + # Configuration Required Depending on Validator Type + consts.CONFIG_SCHEMA_NAME: "Sys_Calendar", + consts.CONFIG_TABLE_NAME: "CALENDAR", + consts.CONFIG_AGGREGATES: [ + { + consts.CONFIG_TYPE: "count", + consts.CONFIG_SOURCE_COLUMN: "year_of_calendar", + consts.CONFIG_TARGET_COLUMN: "year_of_calendar", + consts.CONFIG_FIELD_ALIAS: "count", + }, + ], + consts.CONFIG_FORMAT: "table", + consts.CONFIG_FILTERS: [ + { + consts.CONFIG_TYPE: consts.FILTER_TYPE_EQUALS, + consts.CONFIG_FILTER_SOURCE_COLUMN: "year_of_calendar", + consts.CONFIG_FILTER_SOURCE_VALUE: 2010, + consts.CONFIG_FILTER_TARGET_COLUMN: "year_of_calendar", + consts.CONFIG_FILTER_TARGET_VALUE: 2010, + }, + ], +} def test_count_validator(): - builder = query_builder.QueryBuilder.build_count_validator() - validator = create_validator(builder) - df = validator.execute() - assert df["count_inp"][0] > 0 - assert df["count_inp"][0] == df["count_out"][0] - - -def test_partitioned_count_validator(): - builder = query_builder.QueryBuilder.build_partition_count_validator( - days_past=700, limit=100 - ) - # Add Filters for large table - _add_calendar_date_filters(builder) - - validator = create_validator(builder) + validator = data_validation.DataValidation(TERADATA_CONFIG, verbose=True) df = validator.execute() - rows = list(df.iterrows()) - - # Check that all partitions are unique. - partitions = frozenset(df["partition_key"]) - assert len(rows) == len(partitions) - - for _, row in rows: - assert row["count_inp"] > 0 - assert row["count_inp"] == row["count_out"] - - -def _add_calendar_date_filters(builder): - # Adding custom filter for better results - project_start_date = datetime(2020, 1, 1, 0, 0, 0) - filter_obj = query_builder.FilterField.less_than( - "calendar_date", project_start_date - ) - builder.add_filter_field(filter_obj) - - in_the_past = datetime(1991, 5, 2, 0, 0, 0) - filter_obj = query_builder.FilterField.greater_than("calendar_date", in_the_past) - builder.add_filter_field(filter_obj) + assert int(df["source_agg_value"][0]) > 0 + assert df["source_agg_value"][0] == df["target_agg_value"][0] diff --git a/tests/system/ibis_addon/test_operations.py b/tests/system/ibis_addon/test_operations.py index 3312fe72f..d3d07480d 100644 --- a/tests/system/ibis_addon/test_operations.py +++ b/tests/system/ibis_addon/test_operations.py @@ -20,6 +20,7 @@ # Import required in order to register operations. import third_party.ibis.ibis_addon.operations # noqa: F401 +from third_party.ibis import ibis_teradata @pytest.fixture @@ -27,6 +28,11 @@ def bigquery_client(): return ibis_bigquery.connect() +@pytest.fixture +def teradata_client(): + return ibis_teradata.connect() + + def test_bit_xor_bigquery(bigquery_client): tbl = bigquery_client.table( "citibike_trips", database="bigquery-public-data.new_york_citibike" @@ -126,3 +132,25 @@ def test_hashbytes_bigquery_binary(bigquery_client): """ ).strip() ) + + +def test_hashbytes_teradata_binary(teradata_client): + tbl = teradata_client.table("citibike_trips", database="udfs.new_york_citibike") + expr = tbl[ + tbl["start_station_name"] + .cast(dt.binary) + .hashbytes(how="sha256") + .name("station_hash") + ] + sql = expr.compile() + # TODO: Update the expected SQL to be a valid query once + # https://github.com/ibis-project/ibis/issues/2354 is fixed. + assert ( + sql + == textwrap.dedent( + """ + SELECT hash_sha256(CAST(`start_station_name` AS BINARY)) AS `station_hash` + FROM `udfs.citibike_trips` + """ + ).strip() + ) diff --git a/third_party/ibis/ibis_addon/operations.py b/third_party/ibis/ibis_addon/operations.py index ccda25c3d..4639f5d03 100644 --- a/third_party/ibis/ibis_addon/operations.py +++ b/third_party/ibis/ibis_addon/operations.py @@ -155,4 +155,4 @@ def sa_format_raw_sql(translator, expr): ImpalaExprTranslator._registry[RawSQL] = format_raw_sql OracleExprTranslator._registry[RawSQL] = sa_format_raw_sql TeradataExprTranslator._registry[RawSQL] = format_raw_sql -TeradataExprTranslator._registry[RawSQL] = format_hashbytes_bigquery +TeradataExprTranslator._registry[HashBytes] = format_hashbytes_teradata