Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add data source for Cloud Spanner #206

Merged
merged 12 commits into from
Apr 23, 2021
2 changes: 1 addition & 1 deletion cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

timeout: 1800s
timeout: 7200s
steps:
- id: lint
name: 'gcr.io/pso-kokoro-resources/python-multi'
Expand Down
6 changes: 6 additions & 0 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@
["password", "Password for authentication of user"],
["database", "Database in postgres to connect to (default postgres)"],
],
"Spanner": [
["project_id", "GCP Project to use for Spanner"],
["instance_id", "ID of Spanner instance to connect to"],
["database_id", "ID of Spanner database (schema) to connect to"],
["google_service_account_key_path", "(Optional) GCP SA Key Path"],
],
}


Expand Down
4 changes: 3 additions & 1 deletion data_validation/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
from ibis.backends.mysql.client import MySQLClient
from ibis.backends.postgres.client import PostgreSQLClient

from third_party.ibis.ibis_impala.api import impala_connect
import third_party.ibis.ibis_addon.datatypes
from third_party.ibis.ibis_cloud_spanner.api import connect as spanner_connect
from third_party.ibis.ibis_impala.api import impala_connect
from data_validation import client_info


Expand Down Expand Up @@ -181,4 +182,5 @@ def get_all_tables(client):
"Teradata": TeradataClient,
"MSSQL": mssql_connect,
"Snowflake": snowflake_connect,
"Spanner": spanner_connect,
}
16 changes: 13 additions & 3 deletions data_validation/data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,19 @@ def _get_pandas_schema(self, source_df, target_df, join_on_fields, verbose=False
source_df[join_on_field] = source_df[join_on_field].astype(str)
target_df[join_on_field] = target_df[join_on_field].astype(str)

pd_schema = source_df.dtypes[
[i for i, v in source_df.dtypes.iteritems() if v not in [numpy.dtype("O")]]
]
# Loop over index keys() instead of iteritems() because pandas is
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to reproduce this issue:

import datetime
import pandas

print(pandas.__version__)

df = pandas.DataFrame({
    "int_col": [1],
    "datetime_col": pandas.Series(
        ["2021-03-12T12:50:25"], dtype="datetime64[ns]"
    ),
})

df["datetime_col"] = df["datetime_col"].dt.tz_localize(datetime.timezone.utc)

for i, v in df.dtypes.iteritems():
    print(f"{i}: {v}")

It was successful in my environment, so I'm not sure what's going on in Python 3.9 session.

# failing with datetime64[ns, UTC] data type on Python 3.9.
schema_data = []
schema_index = []
for key in source_df.dtypes.keys():
dtype = source_df.dtypes[key]
# The Ibis pandas backend fails with `KeyError: dtype('O')` if
# object dtypes are passed in.
if dtype in {numpy.dtype("O")}:
continue
schema_data.append(dtype)
schema_index.append(key)
pd_schema = pandas.Series(schema_data, index=schema_index)
if verbose:
print("-- ** Pandas Schema ** --")
print(pd_schema)
Expand Down
2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,5 +151,5 @@ def integration_spanner(session):
if not os.environ.get(env_var, ""):
raise Exception("Expected Env Var: %s" % env_var)

# TODO: Add tests for DVT data sources. See integration_bigquery.
session.run("pytest", "third_party/ibis/ibis_cloud_spanner/tests", *session.posargs)
session.run("pytest", "tests/system/data_sources/test_spanner.py", *session.posargs)
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ SQLAlchemy==1.3.22
PyMySQL==1.0.2
psycopg2-binary==2.8.6
PyYAML==5.4.1
pandas==1.2.1
pandas==1.2.3
proto-plus==1.13.0
pyarrow==3.0.0
pydata-google-auth==1.1.0
google-cloud-bigquery==2.7.0
google-cloud-bigquery-storage==2.2.1
google-cloud-bigquery==2.11.0
google-cloud-bigquery-storage==2.3.0
google-cloud-spanner==3.1.0
setuptools>=34.0.0
jellyfish==0.8.2
2 changes: 1 addition & 1 deletion tests/system/data_sources/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def test_count_validator():
count_tripduration_value = df[df["validation_name"] == "count_tripduration"][
"source_agg_value"
].values[0]
avg_tripduration_value = df[df["validation_name"] == "count_tripduration"][
avg_tripduration_value = df[df["validation_name"] == "avg_tripduration"][
"source_agg_value"
].values[0]
max_birth_year_value = df[df["validation_name"] == "max_birth_year"][
Expand Down
242 changes: 242 additions & 0 deletions tests/system/data_sources/test_spanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import json
import os

import pytest

from data_validation import cli_tools, consts, data_validation
from data_validation import __main__ as main
from third_party.ibis.ibis_cloud_spanner.tests import conftest


SPANNER_CONN_NAME = "spanner-integration-test"
CLI_FIND_TABLES_ARGS = [
"find-tables",
"--source-conn",
SPANNER_CONN_NAME,
"--target-conn",
SPANNER_CONN_NAME,
]


# Copy text fixtures from Spanner Ibis client tests, because it's a bit verbose
# to create a Spanner instance and load data to it. Relevant test fixtures can
# be copied here after clients are upstreamed to Ibis.
spanner_client = conftest.spanner_client
instance_id = conftest.instance_id
database_id = conftest.database_id


@pytest.fixture
def spanner_connection_config(instance_id, database_id):
return {
"source_type": "Spanner",
"project_id": os.environ["PROJECT_ID"],
"instance_id": instance_id,
"database_id": database_id,
}


@pytest.fixture
def spanner_connection_args(instance_id, database_id):
return [
"connections",
"add",
"--connection-name",
SPANNER_CONN_NAME,
"Spanner",
"--project-id",
os.environ["PROJECT_ID"],
"--instance-id",
instance_id,
"--database-id",
database_id,
]


@pytest.fixture
def count_config(spanner_connection_config, database_id):
return {
# Connection Name
consts.CONFIG_SOURCE_CONN: spanner_connection_config,
consts.CONFIG_TARGET_CONN: spanner_connection_config,
# Validation Type
consts.CONFIG_TYPE: "Column",
# Configuration Required Depending on Validator Type
consts.CONFIG_SCHEMA_NAME: database_id,
consts.CONFIG_TABLE_NAME: "functional_alltypes",
consts.CONFIG_GROUPED_COLUMNS: [],
consts.CONFIG_AGGREGATES: [
{
consts.CONFIG_TYPE: "count",
consts.CONFIG_SOURCE_COLUMN: None,
consts.CONFIG_TARGET_COLUMN: None,
consts.CONFIG_FIELD_ALIAS: "count",
},
{
consts.CONFIG_TYPE: "count",
consts.CONFIG_SOURCE_COLUMN: "string_col",
consts.CONFIG_TARGET_COLUMN: "string_col",
consts.CONFIG_FIELD_ALIAS: "count_string_col",
},
{
consts.CONFIG_TYPE: "avg",
consts.CONFIG_SOURCE_COLUMN: "float_col",
consts.CONFIG_TARGET_COLUMN: "float_col",
consts.CONFIG_FIELD_ALIAS: "avg_float_col",
},
{
consts.CONFIG_TYPE: "max",
consts.CONFIG_SOURCE_COLUMN: "timestamp_col",
consts.CONFIG_TARGET_COLUMN: "timestamp_col",
consts.CONFIG_FIELD_ALIAS: "max_timestamp_col",
},
{
consts.CONFIG_TYPE: "min",
consts.CONFIG_SOURCE_COLUMN: "int_col",
consts.CONFIG_TARGET_COLUMN: "int_col",
consts.CONFIG_FIELD_ALIAS: "min_int_col",
},
],
}


@pytest.fixture
def grouped_config(spanner_connection_config, database_id):
return {
# Connection Name
consts.CONFIG_SOURCE_CONN: spanner_connection_config,
consts.CONFIG_TARGET_CONN: spanner_connection_config,
# Validation Type
consts.CONFIG_TYPE: "GroupedColumn",
# Configuration Required Depending on Validator Type
consts.CONFIG_SCHEMA_NAME: database_id,
consts.CONFIG_TABLE_NAME: "functional_alltypes",
consts.CONFIG_AGGREGATES: [
{
consts.CONFIG_TYPE: "count",
consts.CONFIG_SOURCE_COLUMN: None,
consts.CONFIG_TARGET_COLUMN: None,
consts.CONFIG_FIELD_ALIAS: "count",
},
{
consts.CONFIG_TYPE: "sum",
consts.CONFIG_SOURCE_COLUMN: "float_col",
consts.CONFIG_TARGET_COLUMN: "float_col",
consts.CONFIG_FIELD_ALIAS: "sum_float_col",
},
],
consts.CONFIG_GROUPED_COLUMNS: [
{
consts.CONFIG_FIELD_ALIAS: "timestamp_col",
consts.CONFIG_SOURCE_COLUMN: "timestamp_col",
consts.CONFIG_TARGET_COLUMN: "timestamp_col",
consts.CONFIG_CAST: "date",
},
],
}


CLI_CONFIG_FILE = "example_test.yaml"
CLI_RUN_CONFIG_ARGS = ["run-config", "--config-file", CLI_CONFIG_FILE]


STRING_MATCH_RESULT = '{"schema_name": "pso_data_validator", "table_name": "results", "target_schema_name": "pso_data_validator", "target_table_name": "results"}'


def test_count_validator(count_config):
validator = data_validation.DataValidation(count_config, verbose=True)
df = validator.execute()

count_value = df[df["validation_name"] == "count"]["source_agg_value"].values[0]
count_string_value = df[df["validation_name"] == "count_string_col"][
"source_agg_value"
].values[0]
avg_float_value = df[df["validation_name"] == "avg_float_col"][
"source_agg_value"
].values[0]
max_timestamp_value = df[df["validation_name"] == "max_timestamp_col"][
"source_agg_value"
].values[0]
min_int_value = df[df["validation_name"] == "min_int_col"][
"source_agg_value"
].values[0]

assert float(count_value) > 0
assert float(count_string_value) > 0
assert float(avg_float_value) > 0
assert datetime.datetime.strptime(
max_timestamp_value, "%Y-%m-%d %H:%M:%S%z",
) > datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
assert float(min_int_value) > 0


def test_grouped_count_validator(grouped_config):
validator = data_validation.DataValidation(grouped_config, verbose=True)
df = validator.execute()
rows = list(df[df["validation_name"] == "count"].iterrows())

# Check that all partitions are unique.
partitions = frozenset(df["group_by_columns"])
assert len(rows) == len(partitions)
assert len(rows) > 1
assert df["source_agg_value"].sum() == df["target_agg_value"].sum()

for _, row in rows:
assert float(row["source_agg_value"]) > 0
assert row["source_agg_value"] == row["target_agg_value"]


def test_cli_find_tables(spanner_connection_args, database_id):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test might be less error prone using fake fs, plus you wont need to clean up the connection file at the end?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried, and am getting an error when the client authenticates. First, it couldn't find my key file, so I used an updated fixture to add it as a real path.


@pytest.fixture
def fs_with_creds(fs):
    if "GOOGLE_APPLICATION_CREDENTIALS" in os.environ:
        fs.add_real_file(os.environ["GOOGLE_APPLICATION_CREDENTIALS"])

    yield fs

But even with this, I get an error:

ERROR    grpc._plugin_wrapping:_plugin_wrapping.py:82 AuthMetadataPluginCallback "<google.auth.transport.grpc.AuthMetadataPlugin object at 0x7fb3b8b9ae10>" raised exception!
Traceback (most recent call last):
  File "/Users/swast/miniconda3/envs/pso-data-validator-ibis-1.4/lib/python3.7/site-packages/grpc/_plugin_wrapping.py", line 78, in __call__
    context, _AuthMetadataPluginCallback(callback_state, callback))
  File "/Users/swast/miniconda3/envs/pso-data-validator-ibis-1.4/lib/python3.7/site-packages/google/auth/transport/grpc.py", line 86, in __call__
    callback(self._get_authorization_headers(context), None)
  File "/Users/swast/miniconda3/envs/pso-data-validator-ibis-1.4/lib/python3.7/site-packages/google/auth/transport/grpc.py", line 73, in _get_authorization_headers
    self._request, context.method_name, context.service_url, headers
  File "/Users/swast/miniconda3/envs/pso-data-validator-ibis-1.4/lib/python3.7/site-packages/google/auth/credentials.py", line 133, in before_request
    self.refresh(request)
  File "/Users/swast/miniconda3/envs/pso-data-validator-ibis-1.4/lib/python3.7/site-packages/google/oauth2/service_account.py", line 361, in refresh
    access_token, expiry, _ = _client.jwt_grant(request, self._token_uri, assertion)
  File "/Users/swast/miniconda3/envs/pso-data-validator-ibis-1.4/lib/python3.7/site-packages/google/oauth2/_client.py", line 153, in jwt_grant
    response_data = _token_endpoint_request(request, token_uri, body)
  File "/Users/swast/miniconda3/envs/pso-data-validator-ibis-1.4/lib/python3.7/site-packages/google/oauth2/_client.py", line 105, in _token_endpoint_request
    response = request(method="POST", url=token_uri, headers=headers, body=body)
  File "/Users/swast/miniconda3/envs/pso-data-validator-ibis-1.4/lib/python3.7/site-packages/google/auth/transport/requests.py", line 183, in __call__
    method, url, data=body, headers=headers, timeout=timeout, **kwargs
  File "/Users/swast/miniconda3/envs/pso-data-validator-ibis-1.4/lib/python3.7/site-packages/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/Users/swast/miniconda3/envs/pso-data-validator-ibis-1.4/lib/python3.7/site-packages/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/Users/swast/miniconda3/envs/pso-data-validator-ibis-1.4/lib/python3.7/site-packages/requests/adapters.py", line 416, in send
    self.cert_verify(conn, request.url, verify, cert)
  File "/Users/swast/miniconda3/envs/pso-data-validator-ibis-1.4/lib/python3.7/site-packages/requests/adapters.py", line 228, in cert_verify
    "invalid path: {}".format(cert_loc))
OSError: Could not find a suitable TLS CA certificate bundle, invalid path: /Users/swast/miniconda3/envs/pso-data-validator-ibis-1.4/lib/python3.7/site-packages/certifi/cacert.pem

I believe it's having trouble reading the root certificates to validate a secure connection.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could try adding some more "real" files / directories, but I think it's becoming more error-prone, not less :-(

_store_spanner_conn(spanner_connection_args)

parser = cli_tools.configure_arg_parser()
args = parser.parse_args(CLI_FIND_TABLES_ARGS)
tables_json = main.find_tables_using_string_matching(args)
tables = json.loads(tables_json)
assert isinstance(tables_json, str)
assert {
"schema_name": database_id,
"table_name": "array_table",
"target_schema_name": database_id,
"target_table_name": "array_table",
} in tables
assert {
"schema_name": database_id,
"table_name": "functional_alltypes",
"target_schema_name": database_id,
"target_table_name": "functional_alltypes",
} in tables
assert {
"schema_name": database_id,
"table_name": "students_pointer",
"target_schema_name": database_id,
"target_table_name": "students_pointer",
} in tables

_remove_spanner_conn()


def _store_spanner_conn(spanner_connection_args):
parser = cli_tools.configure_arg_parser()
mock_args = parser.parse_args(spanner_connection_args)
main.run_connections(mock_args)


def _remove_spanner_conn():
file_path = cli_tools._get_connection_file(SPANNER_CONN_NAME)
os.remove(file_path)
4 changes: 2 additions & 2 deletions third_party/ibis/ibis_cloud_spanner/tests/ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ CREATE TABLE functional_alltypes
bool_col BOOL,
date DATE,
date_string_col STRING(MAX),
double_col NUMERIC,
float_col NUMERIC,
numeric_col NUMERIC,
float_col FLOAT64,
index INT64,
int_col INT64,
month INT64,
Expand Down
12 changes: 6 additions & 6 deletions third_party/ibis/ibis_cloud_spanner/tests/dml.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,32 +38,32 @@ VALUES(106, 'Phoebe', 10, 490, 'Chemistry', 9.6, '2019-02-09');


INSERT INTO functional_alltypes
(id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
(id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
VALUES
(1, 10001, TRUE, '2016-02-09', '01/01/2001', 2.5, 12.16, 101, 21, 4, 16, 'David', '2002-02-10 15:30:00+00', 6, 99, 2010);

INSERT INTO functional_alltypes
(id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
(id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
VALUES
(2, 10002, FALSE, '2016-10-10', '02/02/2002', 2.6, 13.16, 102, 22, 5, 18, 'Ryan', '2009-02-12 10:06:00+00', 7, 98, 2012);

INSERT INTO functional_alltypes
(id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
(id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
VALUES
(3, 10003, TRUE, '2018-02-09', '03/03/2003', 9.5, 44.16, 201, 41, 6, 56, 'Steve', '2010-06-10 12:12:00+00', 12, 66, 2006);

INSERT INTO functional_alltypes
(id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
(id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
VALUES
(4, 10004, TRUE, '2018-10-10', '04/04/2004', 9.6, 45.16, 202, 42, 9, 58, 'Chandler', '2014-06-12 10:04:00+00', 14, 69, 2009);

INSERT INTO functional_alltypes
(id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
(id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
VALUES
(5, 10005, FALSE, '2020-06-12', '05/05/2005', 6.6, 66.12, 401, 62, 12, 98, 'Rose', '2018-02-10 10:06:00+00', 16, 96, 2012);

INSERT INTO functional_alltypes
(id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
(id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
VALUES
(6, 10006, TRUE, '2020-12-12', '06/06/2006', 6.9, 66.19, 402, 69, 14, 99, 'Rachel', '2019-04-12 12:09:00+00', 18, 99, 2014);

Expand Down
Loading