Skip to content

Commit

Permalink
feat: add data source for Cloud Spanner (#206)
Browse files Browse the repository at this point in the history
* feat: add data source for Cloud Spanner

* remove dtype conversions

* fix spanner tests on Python 3.9

* 2nd try at workaround for iteritems bug

* update comment

* increase timeout

* bump timeout

* remove unused constant
  • Loading branch information
tswast committed Apr 23, 2021
1 parent a27b39e commit c63f68e
Show file tree
Hide file tree
Showing 13 changed files with 295 additions and 54 deletions.
2 changes: 1 addition & 1 deletion cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

timeout: 2400s
timeout: 7200s
steps:
- id: lint
name: 'gcr.io/pso-kokoro-resources/python-multi'
Expand Down
6 changes: 6 additions & 0 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@
["password", "Password for authentication of user"],
["database", "Database in postgres to connect to (default postgres)"],
],
"Spanner": [
["project_id", "GCP Project to use for Spanner"],
["instance_id", "ID of Spanner instance to connect to"],
["database_id", "ID of Spanner database (schema) to connect to"],
["google_service_account_key_path", "(Optional) GCP SA Key Path"],
],
}


Expand Down
4 changes: 3 additions & 1 deletion data_validation/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
from ibis.backends.mysql.client import MySQLClient
from ibis.backends.postgres.client import PostgreSQLClient

from third_party.ibis.ibis_impala.api import impala_connect
import third_party.ibis.ibis_addon.datatypes
from third_party.ibis.ibis_cloud_spanner.api import connect as spanner_connect
from third_party.ibis.ibis_impala.api import impala_connect
from data_validation import client_info


Expand Down Expand Up @@ -184,4 +185,5 @@ def get_all_tables(client, allowed_schemas=None):
"Teradata": TeradataClient,
"MSSQL": mssql_connect,
"Snowflake": snowflake_connect,
"Spanner": spanner_connect,
}
16 changes: 13 additions & 3 deletions data_validation/data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,19 @@ def _get_pandas_schema(self, source_df, target_df, join_on_fields, verbose=False
source_df[join_on_field] = source_df[join_on_field].astype(str)
target_df[join_on_field] = target_df[join_on_field].astype(str)

pd_schema = source_df.dtypes[
[i for i, v in source_df.dtypes.iteritems() if v not in [numpy.dtype("O")]]
]
# Loop over index keys() instead of iteritems() because pandas is
# failing with datetime64[ns, UTC] data type on Python 3.9.
schema_data = []
schema_index = []
for key in source_df.dtypes.keys():
dtype = source_df.dtypes[key]
# The Ibis pandas backend fails with `KeyError: dtype('O')` if
# object dtypes are passed in.
if dtype in {numpy.dtype("O")}:
continue
schema_data.append(dtype)
schema_index.append(key)
pd_schema = pandas.Series(schema_data, index=schema_index)
if verbose:
print("-- ** Pandas Schema ** --")
print(pd_schema)
Expand Down
2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,5 +187,5 @@ def integration_spanner(session):
if not os.environ.get(env_var, ""):
raise Exception("Expected Env Var: %s" % env_var)

# TODO: Add tests for DVT data sources. See integration_bigquery.
session.run("pytest", "third_party/ibis/ibis_cloud_spanner/tests", *session.posargs)
session.run("pytest", "tests/system/data_sources/test_spanner.py", *session.posargs)
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ SQLAlchemy==1.3.22
PyMySQL==1.0.2
psycopg2-binary==2.8.6
PyYAML==5.4.1
pandas==1.2.1
pandas==1.2.3
proto-plus==1.13.0
pyarrow==3.0.0
pydata-google-auth==1.1.0
google-cloud-bigquery==2.7.0
google-cloud-bigquery-storage==2.2.1
google-cloud-bigquery==2.11.0
google-cloud-bigquery-storage==2.3.0
google-cloud-spanner==3.1.0
setuptools>=34.0.0
jellyfish==0.8.2
2 changes: 1 addition & 1 deletion tests/system/data_sources/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def test_count_validator():
count_tripduration_value = df[df["validation_name"] == "count_tripduration"][
"source_agg_value"
].values[0]
avg_tripduration_value = df[df["validation_name"] == "count_tripduration"][
avg_tripduration_value = df[df["validation_name"] == "avg_tripduration"][
"source_agg_value"
].values[0]
max_birth_year_value = df[df["validation_name"] == "max_birth_year"][
Expand Down
242 changes: 242 additions & 0 deletions tests/system/data_sources/test_spanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import json
import os

import pytest

from data_validation import cli_tools, consts, data_validation
from data_validation import __main__ as main
from third_party.ibis.ibis_cloud_spanner.tests import conftest


SPANNER_CONN_NAME = "spanner-integration-test"
CLI_FIND_TABLES_ARGS = [
"find-tables",
"--source-conn",
SPANNER_CONN_NAME,
"--target-conn",
SPANNER_CONN_NAME,
]


# Copy text fixtures from Spanner Ibis client tests, because it's a bit verbose
# to create a Spanner instance and load data to it. Relevant test fixtures can
# be copied here after clients are upstreamed to Ibis.
spanner_client = conftest.spanner_client
instance_id = conftest.instance_id
database_id = conftest.database_id


@pytest.fixture
def spanner_connection_config(instance_id, database_id):
return {
"source_type": "Spanner",
"project_id": os.environ["PROJECT_ID"],
"instance_id": instance_id,
"database_id": database_id,
}


@pytest.fixture
def spanner_connection_args(instance_id, database_id):
return [
"connections",
"add",
"--connection-name",
SPANNER_CONN_NAME,
"Spanner",
"--project-id",
os.environ["PROJECT_ID"],
"--instance-id",
instance_id,
"--database-id",
database_id,
]


@pytest.fixture
def count_config(spanner_connection_config, database_id):
return {
# Connection Name
consts.CONFIG_SOURCE_CONN: spanner_connection_config,
consts.CONFIG_TARGET_CONN: spanner_connection_config,
# Validation Type
consts.CONFIG_TYPE: "Column",
# Configuration Required Depending on Validator Type
consts.CONFIG_SCHEMA_NAME: database_id,
consts.CONFIG_TABLE_NAME: "functional_alltypes",
consts.CONFIG_GROUPED_COLUMNS: [],
consts.CONFIG_AGGREGATES: [
{
consts.CONFIG_TYPE: "count",
consts.CONFIG_SOURCE_COLUMN: None,
consts.CONFIG_TARGET_COLUMN: None,
consts.CONFIG_FIELD_ALIAS: "count",
},
{
consts.CONFIG_TYPE: "count",
consts.CONFIG_SOURCE_COLUMN: "string_col",
consts.CONFIG_TARGET_COLUMN: "string_col",
consts.CONFIG_FIELD_ALIAS: "count_string_col",
},
{
consts.CONFIG_TYPE: "avg",
consts.CONFIG_SOURCE_COLUMN: "float_col",
consts.CONFIG_TARGET_COLUMN: "float_col",
consts.CONFIG_FIELD_ALIAS: "avg_float_col",
},
{
consts.CONFIG_TYPE: "max",
consts.CONFIG_SOURCE_COLUMN: "timestamp_col",
consts.CONFIG_TARGET_COLUMN: "timestamp_col",
consts.CONFIG_FIELD_ALIAS: "max_timestamp_col",
},
{
consts.CONFIG_TYPE: "min",
consts.CONFIG_SOURCE_COLUMN: "int_col",
consts.CONFIG_TARGET_COLUMN: "int_col",
consts.CONFIG_FIELD_ALIAS: "min_int_col",
},
],
}


@pytest.fixture
def grouped_config(spanner_connection_config, database_id):
return {
# Connection Name
consts.CONFIG_SOURCE_CONN: spanner_connection_config,
consts.CONFIG_TARGET_CONN: spanner_connection_config,
# Validation Type
consts.CONFIG_TYPE: "GroupedColumn",
# Configuration Required Depending on Validator Type
consts.CONFIG_SCHEMA_NAME: database_id,
consts.CONFIG_TABLE_NAME: "functional_alltypes",
consts.CONFIG_AGGREGATES: [
{
consts.CONFIG_TYPE: "count",
consts.CONFIG_SOURCE_COLUMN: None,
consts.CONFIG_TARGET_COLUMN: None,
consts.CONFIG_FIELD_ALIAS: "count",
},
{
consts.CONFIG_TYPE: "sum",
consts.CONFIG_SOURCE_COLUMN: "float_col",
consts.CONFIG_TARGET_COLUMN: "float_col",
consts.CONFIG_FIELD_ALIAS: "sum_float_col",
},
],
consts.CONFIG_GROUPED_COLUMNS: [
{
consts.CONFIG_FIELD_ALIAS: "timestamp_col",
consts.CONFIG_SOURCE_COLUMN: "timestamp_col",
consts.CONFIG_TARGET_COLUMN: "timestamp_col",
consts.CONFIG_CAST: "date",
},
],
}


CLI_CONFIG_FILE = "example_test.yaml"
CLI_RUN_CONFIG_ARGS = ["run-config", "--config-file", CLI_CONFIG_FILE]


STRING_MATCH_RESULT = '{"schema_name": "pso_data_validator", "table_name": "results", "target_schema_name": "pso_data_validator", "target_table_name": "results"}'


def test_count_validator(count_config):
validator = data_validation.DataValidation(count_config, verbose=True)
df = validator.execute()

count_value = df[df["validation_name"] == "count"]["source_agg_value"].values[0]
count_string_value = df[df["validation_name"] == "count_string_col"][
"source_agg_value"
].values[0]
avg_float_value = df[df["validation_name"] == "avg_float_col"][
"source_agg_value"
].values[0]
max_timestamp_value = df[df["validation_name"] == "max_timestamp_col"][
"source_agg_value"
].values[0]
min_int_value = df[df["validation_name"] == "min_int_col"][
"source_agg_value"
].values[0]

assert float(count_value) > 0
assert float(count_string_value) > 0
assert float(avg_float_value) > 0
assert datetime.datetime.strptime(
max_timestamp_value, "%Y-%m-%d %H:%M:%S%z",
) > datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
assert float(min_int_value) > 0


def test_grouped_count_validator(grouped_config):
validator = data_validation.DataValidation(grouped_config, verbose=True)
df = validator.execute()
rows = list(df[df["validation_name"] == "count"].iterrows())

# Check that all partitions are unique.
partitions = frozenset(df["group_by_columns"])
assert len(rows) == len(partitions)
assert len(rows) > 1
assert df["source_agg_value"].sum() == df["target_agg_value"].sum()

for _, row in rows:
assert float(row["source_agg_value"]) > 0
assert row["source_agg_value"] == row["target_agg_value"]


def test_cli_find_tables(spanner_connection_args, database_id):
_store_spanner_conn(spanner_connection_args)

parser = cli_tools.configure_arg_parser()
args = parser.parse_args(CLI_FIND_TABLES_ARGS)
tables_json = main.find_tables_using_string_matching(args)
tables = json.loads(tables_json)
assert isinstance(tables_json, str)
assert {
"schema_name": database_id,
"table_name": "array_table",
"target_schema_name": database_id,
"target_table_name": "array_table",
} in tables
assert {
"schema_name": database_id,
"table_name": "functional_alltypes",
"target_schema_name": database_id,
"target_table_name": "functional_alltypes",
} in tables
assert {
"schema_name": database_id,
"table_name": "students_pointer",
"target_schema_name": database_id,
"target_table_name": "students_pointer",
} in tables

_remove_spanner_conn()


def _store_spanner_conn(spanner_connection_args):
parser = cli_tools.configure_arg_parser()
mock_args = parser.parse_args(spanner_connection_args)
main.run_connections(mock_args)


def _remove_spanner_conn():
file_path = cli_tools._get_connection_file(SPANNER_CONN_NAME)
os.remove(file_path)
4 changes: 2 additions & 2 deletions third_party/ibis/ibis_cloud_spanner/tests/ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ CREATE TABLE functional_alltypes
bool_col BOOL,
date DATE,
date_string_col STRING(MAX),
double_col NUMERIC,
float_col NUMERIC,
numeric_col NUMERIC,
float_col FLOAT64,
index INT64,
int_col INT64,
month INT64,
Expand Down
12 changes: 6 additions & 6 deletions third_party/ibis/ibis_cloud_spanner/tests/dml.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,32 +38,32 @@ VALUES(106, 'Phoebe', 10, 490, 'Chemistry', 9.6, '2019-02-09');


INSERT INTO functional_alltypes
(id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
(id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
VALUES
(1, 10001, TRUE, '2016-02-09', '01/01/2001', 2.5, 12.16, 101, 21, 4, 16, 'David', '2002-02-10 15:30:00+00', 6, 99, 2010);

INSERT INTO functional_alltypes
(id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
(id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
VALUES
(2, 10002, FALSE, '2016-10-10', '02/02/2002', 2.6, 13.16, 102, 22, 5, 18, 'Ryan', '2009-02-12 10:06:00+00', 7, 98, 2012);

INSERT INTO functional_alltypes
(id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
(id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
VALUES
(3, 10003, TRUE, '2018-02-09', '03/03/2003', 9.5, 44.16, 201, 41, 6, 56, 'Steve', '2010-06-10 12:12:00+00', 12, 66, 2006);

INSERT INTO functional_alltypes
(id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
(id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
VALUES
(4, 10004, TRUE, '2018-10-10', '04/04/2004', 9.6, 45.16, 202, 42, 9, 58, 'Chandler', '2014-06-12 10:04:00+00', 14, 69, 2009);

INSERT INTO functional_alltypes
(id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
(id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
VALUES
(5, 10005, FALSE, '2020-06-12', '05/05/2005', 6.6, 66.12, 401, 62, 12, 98, 'Rose', '2018-02-10 10:06:00+00', 16, 96, 2012);

INSERT INTO functional_alltypes
(id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
(id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year )
VALUES
(6, 10006, TRUE, '2020-12-12', '06/06/2006', 6.9, 66.19, 402, 69, 14, 99, 'Rachel', '2019-04-12 12:09:00+00', 18, 99, 2014);

Expand Down
Loading

0 comments on commit c63f68e

Please sign in to comment.