diff --git a/requirements.txt b/requirements.txt index ca5b5ff4f..7d20fe8a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,15 @@ dataclasses==0.6 -google-api-python-client==1.7.11 +google-api-python-client==1.10.0 ibis-framework==1.3.0 impyla==0.16.2 -sqlalchemy==1.3.13 -pymysql==0.9.3 +sqlalchemy==1.3.18 +pymysql==0.10.0 psycopg2-binary==2.8.5 PyYAML==5.3.1 -pyspark==2.4.5 -apache-airflow==1.10.9 -pandas==0.25.3 -google-cloud-bigquery==1.22.0 +pyspark==3.0.0 +apache-airflow==1.10.11 +pandas==1.0.5 +pyarrow==0.17.1 +google-cloud-bigquery==1.26.1 +google-cloud-bigquery-storage==1.0.0 setuptools>=34.0.0 diff --git a/tests/system/conftest.py b/tests/system/conftest.py new file mode 100644 index 000000000..44dccf00e --- /dev/null +++ b/tests/system/conftest.py @@ -0,0 +1,49 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import os +import random + +import google.cloud.bigquery +import pytest + + +ALPHABET = "abcdefghijklmnopqrstuvwxyz" + + +@pytest.fixture(scope="module") +def bigquery_client(): + project_id = os.environ["PROJECT_ID"] + + return google.cloud.bigquery.Client(project=project_id) + + +@pytest.fixture(scope="module") +def bigquery_dataset_id(bigquery_client): + now = datetime.datetime.now() + project_id = os.environ["PROJECT_ID"] + dataset_id = ( + f"{project_id}.data_validator_tests_" + + now.strftime("%Y%m%d%H%M") + + random.choice(ALPHABET) + + random.choice(ALPHABET) + + random.choice(ALPHABET) + + random.choice(ALPHABET) + + random.choice(ALPHABET) + + random.choice(ALPHABET) + ) + bigquery_client.create_dataset(dataset_id) + yield dataset_id + bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True) diff --git a/tests/system/result_handlers/test_bigquery.py b/tests/system/result_handlers/test_bigquery.py new file mode 100644 index 000000000..1f3b6e7a2 --- /dev/null +++ b/tests/system/result_handlers/test_bigquery.py @@ -0,0 +1,141 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import pathlib +import time + +import google.cloud.bigquery +import pandas +import pandas.testing + + +REPO_ROOT = pathlib.Path(__file__).parent.parent.parent.parent +SCHEMA_PATH = REPO_ROOT / "terraform" / "results_schema.json" +_NAN = float("nan") +GET_DATAFRAME_TIMEOUT_SECONDS = 30 + + +def get_now(): + # Round to nearest seconds. For some reason, when microsecond precision is + # used the time ends up 1 microsecond different in the round-trip. Not + # sure if it's due to pandas, arrow, or BigQuery. + now = datetime.datetime.now(datetime.timezone.utc) + return datetime.datetime( + now.year, + now.month, + now.day, + now.hour, + now.minute, + now.second, + tzinfo=datetime.timezone.utc, + ) + + +def create_bigquery_results_table(bigquery_client, table_id): + schema = bigquery_client.schema_from_json(SCHEMA_PATH) + table = google.cloud.bigquery.Table(table_id, schema=schema) + return bigquery_client.create_table(table) + + +def get_dataframe(bigquery_client, table_id): + timeout = time.time() + GET_DATAFRAME_TIMEOUT_SECONDS + while True: + # Run a query rather than call list_rows so that rows are fetched from + # the streaming buffer. + result = bigquery_client.query( + "SELECT run_id, start_time, end_time, source_table_name, " + "source_column_name, target_table_name, target_column_name, " + "validation_type, aggregation_type, validation_name, " + "source_agg_value, target_agg_value, group_by_columns, " + "difference, pct_difference " + f" FROM `{table_id}` ORDER BY target_agg_value ASC" + ).to_dataframe() + + if len(result.index) > 0 or time.time() > timeout: + return result + + +def get_handler(bigquery_client, table_id): + import data_validation.result_handlers.bigquery + + return data_validation.result_handlers.bigquery.BigQueryResultHandler( + bigquery_client, table_id=table_id + ) + + +def test_execute_with_nan(bigquery_client, bigquery_dataset_id): + table_id = f"{bigquery_dataset_id}.test_execute_with_nan" + object_under_test = get_handler(bigquery_client, table_id) + create_bigquery_results_table(bigquery_client, table_id) + end = get_now() + start = end - datetime.timedelta(minutes=1) + df = pandas.DataFrame( + { + "run_id": ["grouped-test"] * 6, + "start_time": [start] * 6, + "end_time": [end] * 6, + "source_table_name": [ + "test_source", + "test_source", + _NAN, + _NAN, + "test_source", + "test_source", + ], + "source_column_name": [ + "source_column", + "source_column", + _NAN, + _NAN, + "source_column", + "source_column", + ], + "target_table_name": [ + "test_target", + "test_target", + "test_target", + "test_target", + _NAN, + _NAN, + ], + "target_column_name": [ + "target_column", + "target_column", + "target_column", + "target_column", + _NAN, + _NAN, + ], + "validation_type": ["GroupedColumn"] * 6, + "aggregation_type": ["count"] * 6, + "validation_name": ["count"] * 6, + "source_agg_value": ["2", "4", _NAN, _NAN, "6", "8"], + "target_agg_value": ["1", "3", "5", "7", "8", "9"], + "group_by_columns": [ + '{"grp_a": "a", "grp_i": "0"}', + '{"grp_a": "a", "grp_i": "1"}', + '{"grp_a": "b", "grp_i": "0"}', + '{"grp_a": "b", "grp_i": "1"}', + '{"grp_a": "c", "grp_i": "0"}', + '{"grp_a": "c", "grp_i": "1"}', + ], + "difference": [-1.0, -1.0, _NAN, _NAN, _NAN, _NAN], + "pct_difference": [-50.0, -25.0, _NAN, _NAN, _NAN, _NAN], + } + ) + object_under_test.execute(None, df) + result = get_dataframe(bigquery_client, table_id) + pandas.testing.assert_frame_equal(result, df) + bigquery_client.delete_table(table_id) diff --git a/tests/unit/test_combiner.py b/tests/unit/test_combiner.py index e91bd7a3f..2c16eb45e 100644 --- a/tests/unit/test_combiner.py +++ b/tests/unit/test_combiner.py @@ -146,8 +146,8 @@ def test_generate_report_with_too_many_rows(module_under_test): "source_agg_value": ["2020-07-01 16:00:00+00:00"], "target_agg_value": ["2020-07-01 16:00:00+00:00"], "group_by_columns": [None], - "difference": [float("nan")], - "pct_difference": [float("nan")], + "difference": [_NAN], + "pct_difference": [_NAN], } ), ), @@ -392,22 +392,8 @@ def test_generate_report_without_group_by( '{"grp_a": "c", "grp_i": "0"}', '{"grp_a": "c", "grp_i": "1"}', ], - "difference": [ - -1.0, - -1.0, - float("nan"), - float("nan"), - float("nan"), - float("nan"), - ], - "pct_difference": [ - -50.0, - -25.0, - float("nan"), - float("nan"), - float("nan"), - float("nan"), - ], + "difference": [-1.0, -1.0, _NAN, _NAN, _NAN, _NAN], + "pct_difference": [-50.0, -25.0, _NAN, _NAN, _NAN, _NAN], } ), ),