diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 3ecfc2972..4843ccb99 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -timeout: 2400s +timeout: 7200s steps: - id: lint name: 'gcr.io/pso-kokoro-resources/python-multi' diff --git a/data_validation/cli_tools.py b/data_validation/cli_tools.py index 7c3bfff6f..8cc41b0d3 100644 --- a/data_validation/cli_tools.py +++ b/data_validation/cli_tools.py @@ -99,6 +99,12 @@ ["password", "Password for authentication of user"], ["database", "Database in postgres to connect to (default postgres)"], ], + "Spanner": [ + ["project_id", "GCP Project to use for Spanner"], + ["instance_id", "ID of Spanner instance to connect to"], + ["database_id", "ID of Spanner database (schema) to connect to"], + ["google_service_account_key_path", "(Optional) GCP SA Key Path"], + ], } diff --git a/data_validation/clients.py b/data_validation/clients.py index a34780cd2..d2d7997ce 100644 --- a/data_validation/clients.py +++ b/data_validation/clients.py @@ -23,8 +23,9 @@ from ibis.backends.mysql.client import MySQLClient from ibis.backends.postgres.client import PostgreSQLClient -from third_party.ibis.ibis_impala.api import impala_connect import third_party.ibis.ibis_addon.datatypes +from third_party.ibis.ibis_cloud_spanner.api import connect as spanner_connect +from third_party.ibis.ibis_impala.api import impala_connect from data_validation import client_info @@ -184,4 +185,5 @@ def get_all_tables(client, allowed_schemas=None): "Teradata": TeradataClient, "MSSQL": mssql_connect, "Snowflake": snowflake_connect, + "Spanner": spanner_connect, } diff --git a/data_validation/data_validation.py b/data_validation/data_validation.py index 9c146c46e..24d4bd375 100644 --- a/data_validation/data_validation.py +++ b/data_validation/data_validation.py @@ -212,9 +212,19 @@ def _get_pandas_schema(self, source_df, target_df, join_on_fields, verbose=False source_df[join_on_field] = source_df[join_on_field].astype(str) target_df[join_on_field] = target_df[join_on_field].astype(str) - pd_schema = source_df.dtypes[ - [i for i, v in source_df.dtypes.iteritems() if v not in [numpy.dtype("O")]] - ] + # Loop over index keys() instead of iteritems() because pandas is + # failing with datetime64[ns, UTC] data type on Python 3.9. + schema_data = [] + schema_index = [] + for key in source_df.dtypes.keys(): + dtype = source_df.dtypes[key] + # The Ibis pandas backend fails with `KeyError: dtype('O')` if + # object dtypes are passed in. + if dtype in {numpy.dtype("O")}: + continue + schema_data.append(dtype) + schema_index.append(key) + pd_schema = pandas.Series(schema_data, index=schema_index) if verbose: print("-- ** Pandas Schema ** --") print(pd_schema) diff --git a/noxfile.py b/noxfile.py index df414d5b6..843f1b87c 100644 --- a/noxfile.py +++ b/noxfile.py @@ -187,5 +187,5 @@ def integration_spanner(session): if not os.environ.get(env_var, ""): raise Exception("Expected Env Var: %s" % env_var) - # TODO: Add tests for DVT data sources. See integration_bigquery. session.run("pytest", "third_party/ibis/ibis_cloud_spanner/tests", *session.posargs) + session.run("pytest", "tests/system/data_sources/test_spanner.py", *session.posargs) diff --git a/requirements.txt b/requirements.txt index bcf893400..020a091c4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,12 +11,12 @@ SQLAlchemy==1.3.22 PyMySQL==1.0.2 psycopg2-binary==2.8.6 PyYAML==5.4.1 -pandas==1.2.1 +pandas==1.2.3 proto-plus==1.13.0 pyarrow==3.0.0 pydata-google-auth==1.1.0 -google-cloud-bigquery==2.7.0 -google-cloud-bigquery-storage==2.2.1 +google-cloud-bigquery==2.11.0 +google-cloud-bigquery-storage==2.3.0 google-cloud-spanner==3.1.0 setuptools>=34.0.0 jellyfish==0.8.2 diff --git a/tests/system/data_sources/test_bigquery.py b/tests/system/data_sources/test_bigquery.py index 501b31f9a..b05ac924a 100644 --- a/tests/system/data_sources/test_bigquery.py +++ b/tests/system/data_sources/test_bigquery.py @@ -185,7 +185,7 @@ def test_count_validator(): count_tripduration_value = df[df["validation_name"] == "count_tripduration"][ "source_agg_value" ].values[0] - avg_tripduration_value = df[df["validation_name"] == "count_tripduration"][ + avg_tripduration_value = df[df["validation_name"] == "avg_tripduration"][ "source_agg_value" ].values[0] max_birth_year_value = df[df["validation_name"] == "max_birth_year"][ diff --git a/tests/system/data_sources/test_spanner.py b/tests/system/data_sources/test_spanner.py new file mode 100644 index 000000000..24aafd080 --- /dev/null +++ b/tests/system/data_sources/test_spanner.py @@ -0,0 +1,242 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import os + +import pytest + +from data_validation import cli_tools, consts, data_validation +from data_validation import __main__ as main +from third_party.ibis.ibis_cloud_spanner.tests import conftest + + +SPANNER_CONN_NAME = "spanner-integration-test" +CLI_FIND_TABLES_ARGS = [ + "find-tables", + "--source-conn", + SPANNER_CONN_NAME, + "--target-conn", + SPANNER_CONN_NAME, +] + + +# Copy text fixtures from Spanner Ibis client tests, because it's a bit verbose +# to create a Spanner instance and load data to it. Relevant test fixtures can +# be copied here after clients are upstreamed to Ibis. +spanner_client = conftest.spanner_client +instance_id = conftest.instance_id +database_id = conftest.database_id + + +@pytest.fixture +def spanner_connection_config(instance_id, database_id): + return { + "source_type": "Spanner", + "project_id": os.environ["PROJECT_ID"], + "instance_id": instance_id, + "database_id": database_id, + } + + +@pytest.fixture +def spanner_connection_args(instance_id, database_id): + return [ + "connections", + "add", + "--connection-name", + SPANNER_CONN_NAME, + "Spanner", + "--project-id", + os.environ["PROJECT_ID"], + "--instance-id", + instance_id, + "--database-id", + database_id, + ] + + +@pytest.fixture +def count_config(spanner_connection_config, database_id): + return { + # Connection Name + consts.CONFIG_SOURCE_CONN: spanner_connection_config, + consts.CONFIG_TARGET_CONN: spanner_connection_config, + # Validation Type + consts.CONFIG_TYPE: "Column", + # Configuration Required Depending on Validator Type + consts.CONFIG_SCHEMA_NAME: database_id, + consts.CONFIG_TABLE_NAME: "functional_alltypes", + consts.CONFIG_GROUPED_COLUMNS: [], + consts.CONFIG_AGGREGATES: [ + { + consts.CONFIG_TYPE: "count", + consts.CONFIG_SOURCE_COLUMN: None, + consts.CONFIG_TARGET_COLUMN: None, + consts.CONFIG_FIELD_ALIAS: "count", + }, + { + consts.CONFIG_TYPE: "count", + consts.CONFIG_SOURCE_COLUMN: "string_col", + consts.CONFIG_TARGET_COLUMN: "string_col", + consts.CONFIG_FIELD_ALIAS: "count_string_col", + }, + { + consts.CONFIG_TYPE: "avg", + consts.CONFIG_SOURCE_COLUMN: "float_col", + consts.CONFIG_TARGET_COLUMN: "float_col", + consts.CONFIG_FIELD_ALIAS: "avg_float_col", + }, + { + consts.CONFIG_TYPE: "max", + consts.CONFIG_SOURCE_COLUMN: "timestamp_col", + consts.CONFIG_TARGET_COLUMN: "timestamp_col", + consts.CONFIG_FIELD_ALIAS: "max_timestamp_col", + }, + { + consts.CONFIG_TYPE: "min", + consts.CONFIG_SOURCE_COLUMN: "int_col", + consts.CONFIG_TARGET_COLUMN: "int_col", + consts.CONFIG_FIELD_ALIAS: "min_int_col", + }, + ], + } + + +@pytest.fixture +def grouped_config(spanner_connection_config, database_id): + return { + # Connection Name + consts.CONFIG_SOURCE_CONN: spanner_connection_config, + consts.CONFIG_TARGET_CONN: spanner_connection_config, + # Validation Type + consts.CONFIG_TYPE: "GroupedColumn", + # Configuration Required Depending on Validator Type + consts.CONFIG_SCHEMA_NAME: database_id, + consts.CONFIG_TABLE_NAME: "functional_alltypes", + consts.CONFIG_AGGREGATES: [ + { + consts.CONFIG_TYPE: "count", + consts.CONFIG_SOURCE_COLUMN: None, + consts.CONFIG_TARGET_COLUMN: None, + consts.CONFIG_FIELD_ALIAS: "count", + }, + { + consts.CONFIG_TYPE: "sum", + consts.CONFIG_SOURCE_COLUMN: "float_col", + consts.CONFIG_TARGET_COLUMN: "float_col", + consts.CONFIG_FIELD_ALIAS: "sum_float_col", + }, + ], + consts.CONFIG_GROUPED_COLUMNS: [ + { + consts.CONFIG_FIELD_ALIAS: "timestamp_col", + consts.CONFIG_SOURCE_COLUMN: "timestamp_col", + consts.CONFIG_TARGET_COLUMN: "timestamp_col", + consts.CONFIG_CAST: "date", + }, + ], + } + + +CLI_CONFIG_FILE = "example_test.yaml" +CLI_RUN_CONFIG_ARGS = ["run-config", "--config-file", CLI_CONFIG_FILE] + + +STRING_MATCH_RESULT = '{"schema_name": "pso_data_validator", "table_name": "results", "target_schema_name": "pso_data_validator", "target_table_name": "results"}' + + +def test_count_validator(count_config): + validator = data_validation.DataValidation(count_config, verbose=True) + df = validator.execute() + + count_value = df[df["validation_name"] == "count"]["source_agg_value"].values[0] + count_string_value = df[df["validation_name"] == "count_string_col"][ + "source_agg_value" + ].values[0] + avg_float_value = df[df["validation_name"] == "avg_float_col"][ + "source_agg_value" + ].values[0] + max_timestamp_value = df[df["validation_name"] == "max_timestamp_col"][ + "source_agg_value" + ].values[0] + min_int_value = df[df["validation_name"] == "min_int_col"][ + "source_agg_value" + ].values[0] + + assert float(count_value) > 0 + assert float(count_string_value) > 0 + assert float(avg_float_value) > 0 + assert datetime.datetime.strptime( + max_timestamp_value, "%Y-%m-%d %H:%M:%S%z", + ) > datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + assert float(min_int_value) > 0 + + +def test_grouped_count_validator(grouped_config): + validator = data_validation.DataValidation(grouped_config, verbose=True) + df = validator.execute() + rows = list(df[df["validation_name"] == "count"].iterrows()) + + # Check that all partitions are unique. + partitions = frozenset(df["group_by_columns"]) + assert len(rows) == len(partitions) + assert len(rows) > 1 + assert df["source_agg_value"].sum() == df["target_agg_value"].sum() + + for _, row in rows: + assert float(row["source_agg_value"]) > 0 + assert row["source_agg_value"] == row["target_agg_value"] + + +def test_cli_find_tables(spanner_connection_args, database_id): + _store_spanner_conn(spanner_connection_args) + + parser = cli_tools.configure_arg_parser() + args = parser.parse_args(CLI_FIND_TABLES_ARGS) + tables_json = main.find_tables_using_string_matching(args) + tables = json.loads(tables_json) + assert isinstance(tables_json, str) + assert { + "schema_name": database_id, + "table_name": "array_table", + "target_schema_name": database_id, + "target_table_name": "array_table", + } in tables + assert { + "schema_name": database_id, + "table_name": "functional_alltypes", + "target_schema_name": database_id, + "target_table_name": "functional_alltypes", + } in tables + assert { + "schema_name": database_id, + "table_name": "students_pointer", + "target_schema_name": database_id, + "target_table_name": "students_pointer", + } in tables + + _remove_spanner_conn() + + +def _store_spanner_conn(spanner_connection_args): + parser = cli_tools.configure_arg_parser() + mock_args = parser.parse_args(spanner_connection_args) + main.run_connections(mock_args) + + +def _remove_spanner_conn(): + file_path = cli_tools._get_connection_file(SPANNER_CONN_NAME) + os.remove(file_path) diff --git a/third_party/ibis/ibis_cloud_spanner/tests/ddl.sql b/third_party/ibis/ibis_cloud_spanner/tests/ddl.sql index a6be8533e..e9aa27db5 100644 --- a/third_party/ibis/ibis_cloud_spanner/tests/ddl.sql +++ b/third_party/ibis/ibis_cloud_spanner/tests/ddl.sql @@ -31,8 +31,8 @@ CREATE TABLE functional_alltypes bool_col BOOL, date DATE, date_string_col STRING(MAX), - double_col NUMERIC, - float_col NUMERIC, + numeric_col NUMERIC, + float_col FLOAT64, index INT64, int_col INT64, month INT64, diff --git a/third_party/ibis/ibis_cloud_spanner/tests/dml.sql b/third_party/ibis/ibis_cloud_spanner/tests/dml.sql index 5849f7dac..8c093c083 100644 --- a/third_party/ibis/ibis_cloud_spanner/tests/dml.sql +++ b/third_party/ibis/ibis_cloud_spanner/tests/dml.sql @@ -38,32 +38,32 @@ VALUES(106, 'Phoebe', 10, 490, 'Chemistry', 9.6, '2019-02-09'); INSERT INTO functional_alltypes - (id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year ) + (id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year ) VALUES (1, 10001, TRUE, '2016-02-09', '01/01/2001', 2.5, 12.16, 101, 21, 4, 16, 'David', '2002-02-10 15:30:00+00', 6, 99, 2010); INSERT INTO functional_alltypes - (id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year ) + (id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year ) VALUES (2, 10002, FALSE, '2016-10-10', '02/02/2002', 2.6, 13.16, 102, 22, 5, 18, 'Ryan', '2009-02-12 10:06:00+00', 7, 98, 2012); INSERT INTO functional_alltypes - (id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year ) + (id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year ) VALUES (3, 10003, TRUE, '2018-02-09', '03/03/2003', 9.5, 44.16, 201, 41, 6, 56, 'Steve', '2010-06-10 12:12:00+00', 12, 66, 2006); INSERT INTO functional_alltypes - (id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year ) + (id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year ) VALUES (4, 10004, TRUE, '2018-10-10', '04/04/2004', 9.6, 45.16, 202, 42, 9, 58, 'Chandler', '2014-06-12 10:04:00+00', 14, 69, 2009); INSERT INTO functional_alltypes - (id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year ) + (id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year ) VALUES (5, 10005, FALSE, '2020-06-12', '05/05/2005', 6.6, 66.12, 401, 62, 12, 98, 'Rose', '2018-02-10 10:06:00+00', 16, 96, 2012); INSERT INTO functional_alltypes - (id ,bigint_col ,bool_col ,date ,date_string_col ,double_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year ) + (id ,bigint_col ,bool_col ,date ,date_string_col ,numeric_col ,float_col ,index ,int_col ,month ,smallint_col ,string_col ,timestamp_col ,tinyint_col ,Unnamed0 ,year ) VALUES (6, 10006, TRUE, '2020-12-12', '06/06/2006', 6.9, 66.19, 402, 69, 14, 99, 'Rachel', '2019-04-12 12:09:00+00', 18, 99, 2014); diff --git a/third_party/ibis/ibis_cloud_spanner/tests/test_client.py b/third_party/ibis/ibis_cloud_spanner/tests/test_client.py index 32ca523df..1e2cd0ea3 100644 --- a/third_party/ibis/ibis_cloud_spanner/tests/test_client.py +++ b/third_party/ibis/ibis_cloud_spanner/tests/test_client.py @@ -188,7 +188,7 @@ def test_scalar_param_int64(alltypes, df): def test_scalar_param_double(alltypes, df): param = ibis.param("double") - expr = alltypes[alltypes.double_col == param] + expr = alltypes[alltypes.numeric_col == param] double_value = 2.5 result = ( @@ -197,7 +197,7 @@ def test_scalar_param_double(alltypes, df): .reset_index(drop=True) ) expected = ( - df.loc[df.double_col == double_value].sort_values("id").reset_index(drop=True) + df.loc[df.numeric_col == double_value].sort_values("id").reset_index(drop=True) ) tm.assert_frame_equal(result, expected) @@ -357,7 +357,7 @@ def test_prevent_rewrite(alltypes): t = alltypes expr = ( t.groupby(t.string_col) - .aggregate(collected_double=t.double_col.collect()) + .aggregate(collected_double=t.numeric_col.collect()) .pipe(ibis.prevent_rewrite) .filter(lambda t: t.string_col != "wat") ) @@ -365,7 +365,7 @@ def test_prevent_rewrite(alltypes): expected = """\ SELECT * FROM ( - SELECT `string_col`, ARRAY_AGG(`double_col`) AS `collected_double` + SELECT `string_col`, ARRAY_AGG(`numeric_col`) AS `collected_double` FROM functional_alltypes GROUP BY 1 ) t0 @@ -490,8 +490,8 @@ def test_array_length(array_table): def test_scalar_param_array(alltypes, df, client): - expr = alltypes.sort_by("id").limit(1).double_col.collect() + expr = alltypes.sort_by("id").limit(1).numeric_col.collect() result = client.get_data_using_query(cs_compile.compile(expr)) result = result["tmp"][0] - expected = [df.sort_values("id").double_col.iat[0]] + expected = [df.sort_values("id").numeric_col.iat[0]] assert result == expected diff --git a/third_party/ibis/ibis_cloud_spanner/tests/test_compiler.py b/third_party/ibis/ibis_cloud_spanner/tests/test_compiler.py index 8e8e9d25e..d1663d117 100644 --- a/third_party/ibis/ibis_cloud_spanner/tests/test_compiler.py +++ b/third_party/ibis/ibis_cloud_spanner/tests/test_compiler.py @@ -55,10 +55,10 @@ def test_union(alltypes, distinct, expected_keyword): def test_ieee_divide(alltypes): - expr = alltypes.double_col / 0 + expr = alltypes.numeric_col / 0 result = cs_compile.compile(expr) expected = f"""\ -SELECT IEEE_DIVIDE(`double_col`, 0) AS `tmp` +SELECT IEEE_DIVIDE(`numeric_col`, 0) AS `tmp` FROM functional_alltypes""" assert result == expected @@ -225,25 +225,25 @@ def test_window_function(alltypes): ) def test_union_cte(alltypes, distinct1, distinct2, expected1, expected2): t = alltypes - expr1 = t.group_by(t.string_col).aggregate(metric=t.double_col.sum()) + expr1 = t.group_by(t.string_col).aggregate(metric=t.numeric_col.sum()) expr2 = expr1.view() expr3 = expr1.view() expr = expr1.union(expr2, distinct=distinct1).union(expr3, distinct=distinct2) result = cs_compile.compile(expr) expected = f"""\ WITH t0 AS ( - SELECT `string_col`, sum(`double_col`) AS `metric` + SELECT `string_col`, sum(`numeric_col`) AS `metric` FROM functional_alltypes GROUP BY 1 ) SELECT * FROM t0 {expected1} -SELECT `string_col`, sum(`double_col`) AS `metric` +SELECT `string_col`, sum(`numeric_col`) AS `metric` FROM functional_alltypes GROUP BY 1 {expected2} -SELECT `string_col`, sum(`double_col`) AS `metric` +SELECT `string_col`, sum(`numeric_col`) AS `metric` FROM functional_alltypes GROUP BY 1""" assert result == expected @@ -323,11 +323,11 @@ def test_bool_reducers_where(alltypes): def test_approx_nunique(alltypes): - d = alltypes.double_col + d = alltypes.numeric_col expr = d.approx_nunique() result = cs_compile.compile(expr) expected = f"""\ -SELECT APPROX_COUNT_DISTINCT(`double_col`) AS `approx_nunique` +SELECT APPROX_COUNT_DISTINCT(`numeric_col`) AS `approx_nunique` FROM functional_alltypes""" assert result == expected @@ -342,11 +342,11 @@ def test_approx_nunique(alltypes): def test_approx_median(alltypes): - d = alltypes.double_col + d = alltypes.numeric_col expr = d.approx_median() result = cs_compile.compile(expr) expected = f"""\ -SELECT APPROX_QUANTILES(`double_col`, 2)[OFFSET(1)] AS `approx_median` +SELECT APPROX_QUANTILES(`numeric_col`, 2)[OFFSET(1)] AS `approx_median` FROM functional_alltypes""" assert result == expected @@ -354,7 +354,7 @@ def test_approx_median(alltypes): expr2 = d.approx_median(where=m > 6) result = cs_compile.compile(expr2) expected = f"""\ -SELECT APPROX_QUANTILES(CASE WHEN `month` > 6 THEN `double_col` ELSE NULL END, 2)[OFFSET(1)] AS `approx_median` +SELECT APPROX_QUANTILES(CASE WHEN `month` > 6 THEN `numeric_col` ELSE NULL END, 2)[OFFSET(1)] AS `approx_median` FROM functional_alltypes""" # noqa: E501 assert result == expected diff --git a/third_party/ibis/ibis_cloud_spanner/to_pandas.py b/third_party/ibis/ibis_cloud_spanner/to_pandas.py index 902f88334..a318c0256 100644 --- a/third_party/ibis/ibis_cloud_spanner/to_pandas.py +++ b/third_party/ibis/ibis_cloud_spanner/to_pandas.py @@ -45,23 +45,4 @@ def to_pandas(snapshot, sql, query_parameters): # Creating pandas dataframe from data and columns_list df = DataFrame(data, columns=column_list) - # Dictionary to map spanner datatype to a pandas compatible datatype - SPANNER_TO_PANDAS_DTYPE = { - "INT64": "int64", - "STRING": "object", - "BOOL": "bool", - "BYTES": "object", - "ARRAY": "object", - "DATE": "datetime64[ns, UTC]", - "FLOAT64": "float64", - "NUMERIC": "object", - "TIMESTAMP": "datetime64[ns, UTC]", - } - - for k, v in columns_dict.items(): - try: - df[k] = df[k].astype(SPANNER_TO_PANDAS_DTYPE[v]) - except KeyError: - print("Spanner Datatype is not present in datatype mapping dictionary") - return df