Skip to content

Commit

Permalink
Merge pull request #64 from GoogleCloudPlatform/issue50-group-by-nan
Browse files Browse the repository at this point in the history
fix: update BigQuery dependencies to fix group-by results handler
  • Loading branch information
dhercher committed Aug 19, 2020
2 parents 569a96a + 97c5b03 commit 5861514
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 25 deletions.
16 changes: 9 additions & 7 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
dataclasses==0.6
google-api-python-client==1.7.11
google-api-python-client==1.10.0
ibis-framework==1.3.0
impyla==0.16.2
sqlalchemy==1.3.13
pymysql==0.9.3
sqlalchemy==1.3.18
pymysql==0.10.0
psycopg2-binary==2.8.5
PyYAML==5.3.1
pyspark==2.4.5
apache-airflow==1.10.9
pandas==0.25.3
google-cloud-bigquery==1.22.0
pyspark==3.0.0
apache-airflow==1.10.11
pandas==1.0.5
pyarrow==0.17.1
google-cloud-bigquery==1.26.1
google-cloud-bigquery-storage==1.0.0
setuptools>=34.0.0
49 changes: 49 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import os
import random

import google.cloud.bigquery
import pytest


ALPHABET = "abcdefghijklmnopqrstuvwxyz"


@pytest.fixture(scope="module")
def bigquery_client():
project_id = os.environ["PROJECT_ID"]

return google.cloud.bigquery.Client(project=project_id)


@pytest.fixture(scope="module")
def bigquery_dataset_id(bigquery_client):
now = datetime.datetime.now()
project_id = os.environ["PROJECT_ID"]
dataset_id = (
f"{project_id}.data_validator_tests_"
+ now.strftime("%Y%m%d%H%M")
+ random.choice(ALPHABET)
+ random.choice(ALPHABET)
+ random.choice(ALPHABET)
+ random.choice(ALPHABET)
+ random.choice(ALPHABET)
+ random.choice(ALPHABET)
)
bigquery_client.create_dataset(dataset_id)
yield dataset_id
bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)
141 changes: 141 additions & 0 deletions tests/system/result_handlers/test_bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import pathlib
import time

import google.cloud.bigquery
import pandas
import pandas.testing


REPO_ROOT = pathlib.Path(__file__).parent.parent.parent.parent
SCHEMA_PATH = REPO_ROOT / "terraform" / "results_schema.json"
_NAN = float("nan")
GET_DATAFRAME_TIMEOUT_SECONDS = 30


def get_now():
# Round to nearest seconds. For some reason, when microsecond precision is
# used the time ends up 1 microsecond different in the round-trip. Not
# sure if it's due to pandas, arrow, or BigQuery.
now = datetime.datetime.now(datetime.timezone.utc)
return datetime.datetime(
now.year,
now.month,
now.day,
now.hour,
now.minute,
now.second,
tzinfo=datetime.timezone.utc,
)


def create_bigquery_results_table(bigquery_client, table_id):
schema = bigquery_client.schema_from_json(SCHEMA_PATH)
table = google.cloud.bigquery.Table(table_id, schema=schema)
return bigquery_client.create_table(table)


def get_dataframe(bigquery_client, table_id):
timeout = time.time() + GET_DATAFRAME_TIMEOUT_SECONDS
while True:
# Run a query rather than call list_rows so that rows are fetched from
# the streaming buffer.
result = bigquery_client.query(
"SELECT run_id, start_time, end_time, source_table_name, "
"source_column_name, target_table_name, target_column_name, "
"validation_type, aggregation_type, validation_name, "
"source_agg_value, target_agg_value, group_by_columns, "
"difference, pct_difference "
f" FROM `{table_id}` ORDER BY target_agg_value ASC"
).to_dataframe()

if len(result.index) > 0 or time.time() > timeout:
return result


def get_handler(bigquery_client, table_id):
import data_validation.result_handlers.bigquery

return data_validation.result_handlers.bigquery.BigQueryResultHandler(
bigquery_client, table_id=table_id
)


def test_execute_with_nan(bigquery_client, bigquery_dataset_id):
table_id = f"{bigquery_dataset_id}.test_execute_with_nan"
object_under_test = get_handler(bigquery_client, table_id)
create_bigquery_results_table(bigquery_client, table_id)
end = get_now()
start = end - datetime.timedelta(minutes=1)
df = pandas.DataFrame(
{
"run_id": ["grouped-test"] * 6,
"start_time": [start] * 6,
"end_time": [end] * 6,
"source_table_name": [
"test_source",
"test_source",
_NAN,
_NAN,
"test_source",
"test_source",
],
"source_column_name": [
"source_column",
"source_column",
_NAN,
_NAN,
"source_column",
"source_column",
],
"target_table_name": [
"test_target",
"test_target",
"test_target",
"test_target",
_NAN,
_NAN,
],
"target_column_name": [
"target_column",
"target_column",
"target_column",
"target_column",
_NAN,
_NAN,
],
"validation_type": ["GroupedColumn"] * 6,
"aggregation_type": ["count"] * 6,
"validation_name": ["count"] * 6,
"source_agg_value": ["2", "4", _NAN, _NAN, "6", "8"],
"target_agg_value": ["1", "3", "5", "7", "8", "9"],
"group_by_columns": [
'{"grp_a": "a", "grp_i": "0"}',
'{"grp_a": "a", "grp_i": "1"}',
'{"grp_a": "b", "grp_i": "0"}',
'{"grp_a": "b", "grp_i": "1"}',
'{"grp_a": "c", "grp_i": "0"}',
'{"grp_a": "c", "grp_i": "1"}',
],
"difference": [-1.0, -1.0, _NAN, _NAN, _NAN, _NAN],
"pct_difference": [-50.0, -25.0, _NAN, _NAN, _NAN, _NAN],
}
)
object_under_test.execute(None, df)
result = get_dataframe(bigquery_client, table_id)
pandas.testing.assert_frame_equal(result, df)
bigquery_client.delete_table(table_id)
22 changes: 4 additions & 18 deletions tests/unit/test_combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ def test_generate_report_with_too_many_rows(module_under_test):
"source_agg_value": ["2020-07-01 16:00:00+00:00"],
"target_agg_value": ["2020-07-01 16:00:00+00:00"],
"group_by_columns": [None],
"difference": [float("nan")],
"pct_difference": [float("nan")],
"difference": [_NAN],
"pct_difference": [_NAN],
}
),
),
Expand Down Expand Up @@ -392,22 +392,8 @@ def test_generate_report_without_group_by(
'{"grp_a": "c", "grp_i": "0"}',
'{"grp_a": "c", "grp_i": "1"}',
],
"difference": [
-1.0,
-1.0,
float("nan"),
float("nan"),
float("nan"),
float("nan"),
],
"pct_difference": [
-50.0,
-25.0,
float("nan"),
float("nan"),
float("nan"),
float("nan"),
],
"difference": [-1.0, -1.0, _NAN, _NAN, _NAN, _NAN],
"pct_difference": [-50.0, -25.0, _NAN, _NAN, _NAN, _NAN],
}
),
),
Expand Down

0 comments on commit 5861514

Please sign in to comment.