From 61dabe0f96192f065c67ba24af9d6764e21c53f7 Mon Sep 17 00:00:00 2001 From: Neha Nene Date: Mon, 29 Aug 2022 13:56:03 -0500 Subject: [PATCH] feat: Support for MSSQL row validation (#570) * feat: support for mssql row validation, updating schema print to stdout, removing unneeded cast to date in filter * lint, make import optional * import error * chore: release 2.0.0 Release-As: 2.0.0 * removed unnecessary import * chore: release 2.1.1 Release-As: 2.1.1 * fix typo in tests --- data_validation/__main__.py | 8 ++ data_validation/consts.py | 7 +- .../query_builder/query_builder.py | 11 +- tests/system/data_sources/test_sql_server.py | 107 ++++++++++++++---- third_party/ibis/ibis_addon/operations.py | 21 +++- third_party/ibis/ibis_mssql/compiler.py | 9 +- 6 files changed, 122 insertions(+), 41 deletions(-) diff --git a/data_validation/__main__.py b/data_validation/__main__.py index b0a53c234..cee4036b3 100644 --- a/data_validation/__main__.py +++ b/data_validation/__main__.py @@ -62,6 +62,14 @@ def get_aggregate_config(args, config_manager): "int64", "decimal", "timestamp", + "float64[non-nullable]", + "float32[non-nullable]", + "int8[non-nullable]", + "int16[non-nullable]", + "int32[non-nullable]", + "int64[non-nullable]", + "decimal[non-nullable]", + "timestamp[non-nullable]", ] if args.wildcard_include_string_len: diff --git a/data_validation/consts.py b/data_validation/consts.py index 264f0a434..330060084 100644 --- a/data_validation/consts.py +++ b/data_validation/consts.py @@ -148,6 +148,9 @@ "start_time", "end_time", "aggregation_type", - "source_agg_value", - "target_agg_value", + "difference", + "primary_keys", + "group_by_columns", + "num_random_rows", + "pct_threshold", ] diff --git a/data_validation/query_builder/query_builder.py b/data_validation/query_builder/query_builder.py index 2233ce902..63dcafb4f 100644 --- a/data_validation/query_builder/query_builder.py +++ b/data_validation/query_builder/query_builder.py @@ -176,18 +176,9 @@ def compile(self, ibis_table): if self.left_field: self.left = ibis_table[self.left_field] - # Cast All Datetime to Date (TODO this may be a bug in BQ) - if isinstance( - ibis_table[self.left_field].type(), ibis.expr.datatypes.Timestamp - ): - self.left = self.left.cast("date") + if self.right_field: self.right = ibis_table[self.right_field] - # Cast All Datetime to Date (TODO this may be a bug in BQ) - if isinstance( - ibis_table[self.right_field].type(), ibis.expr.datatypes.Timestamp - ): - self.right = self.right.cast("date") return self.expr(self.left, self.right) diff --git a/tests/system/data_sources/test_sql_server.py b/tests/system/data_sources/test_sql_server.py index 049cc9671..23b1435a2 100644 --- a/tests/system/data_sources/test_sql_server.py +++ b/tests/system/data_sources/test_sql_server.py @@ -29,6 +29,14 @@ SQL_SERVER_USER = os.getenv("SQL_SERVER_USER", "sqlserver") SQL_SERVER_PASSWORD = os.getenv("SQL_SERVER_PASSWORD") PROJECT_ID = os.getenv("PROJECT_ID") +CONN = { + "source_type": "MSSQL", + "host": SQL_SERVER_HOST, + "user": SQL_SERVER_USER, + "password": SQL_SERVER_PASSWORD, + "port": 1433, + "database": "guestbook", +} @pytest.fixture @@ -55,19 +63,10 @@ def cloud_sql(request): def test_sql_server_count(cloud_sql): """Test count validation on SQL Server instance""" - conn = { - "source_type": "MSSQL", - "host": SQL_SERVER_HOST, - "user": SQL_SERVER_USER, - "password": SQL_SERVER_PASSWORD, - "port": 1433, - "database": "guestbook", - } - config_count_valid = { # BigQuery Specific Connection Config - consts.CONFIG_SOURCE_CONN: conn, - consts.CONFIG_TARGET_CONN: conn, + consts.CONFIG_SOURCE_CONN: CONN, + consts.CONFIG_TARGET_CONN: CONN, # Validation Type consts.CONFIG_TYPE: "Column", # Configuration Required Depending on Validator Type @@ -92,19 +91,85 @@ def test_sql_server_count(cloud_sql): assert df["source_agg_value"][0] == df["target_agg_value"][0] -def test_schema_validation(): - conn = { - "source_type": "MSSQL", - "host": SQL_SERVER_HOST, - "user": SQL_SERVER_USER, - "password": SQL_SERVER_PASSWORD, - "port": 1433, - "database": "guestbook", +def test_sql_server_row(cloud_sql): + """Test row validation on SQL Server instance""" + config_row_valid = { + # BigQuery Specific Connection Config + consts.CONFIG_SOURCE_CONN: CONN, + consts.CONFIG_TARGET_CONN: CONN, + # Validation Type + consts.CONFIG_TYPE: "Row", + # Configuration Required Depending on Validator Type + consts.CONFIG_SCHEMA_NAME: "dbo", + consts.CONFIG_TABLE_NAME: "entries", + consts.CONFIG_COMPARISON_FIELDS: [ + { + "source_column": "hash__all", + "target_column": "hash__all", + "field_alias": "hash__all", + "cast": None, + } + ], + consts.CONFIG_CALCULATED_FIELDS: [ + { + "source_calculated_columns": ["content"], + "target_calculated_columns": ["content"], + "field_alias": "cast__content", + "type": "cast", + "depth": 0, + }, + { + "source_calculated_columns": ["cast__content"], + "target_calculated_columns": ["cast__content"], + "field_alias": "ifnull__cast__content", + "type": "ifnull", + "depth": 1, + }, + { + "source_calculated_columns": ["ifnull__cast__content"], + "target_calculated_columns": ["ifnull__cast__content"], + "field_alias": "rstrip__ifnull__cast__content", + "type": "rstrip", + "depth": 2, + }, + { + "source_calculated_columns": ["rstrip__ifnull__cast__content"], + "target_calculated_columns": ["rstrip__ifnull__cast__content"], + "field_alias": "upper__rstrip__ifnull__cast__content", + "type": "upper", + "depth": 3, + }, + { + "source_calculated_columns": ["upper__rstrip__ifnull__cast__content"], + "target_calculated_columns": ["upper__rstrip__ifnull__cast__content"], + "field_alias": "hash__all", + "type": "hash", + "depth": 4, + }, + ], + consts.CONFIG_PRIMARY_KEYS: [ + { + "source_column": "entryID", + "target_column": "entryID", + "field_alias": "entryID", + "cast": None, + } + ], + consts.CONFIG_FORMAT: "table", } + data_validator = data_validation.DataValidation( + config_row_valid, + verbose=False, + ) + df = data_validator.execute() + assert df["source_agg_value"][0] == df["target_agg_value"][0] + + +def test_schema_validation(): config = { - consts.CONFIG_SOURCE_CONN: conn, - consts.CONFIG_TARGET_CONN: conn, + consts.CONFIG_SOURCE_CONN: CONN, + consts.CONFIG_TARGET_CONN: CONN, consts.CONFIG_TYPE: "Schema", consts.CONFIG_SCHEMA_NAME: "dbo", consts.CONFIG_TABLE_NAME: "entries", diff --git a/third_party/ibis/ibis_addon/operations.py b/third_party/ibis/ibis_addon/operations.py index da58014d7..e8eadd2ba 100644 --- a/third_party/ibis/ibis_addon/operations.py +++ b/third_party/ibis/ibis_addon/operations.py @@ -24,13 +24,15 @@ """ import ibis -import sqlalchemy +import sqlalchemy as sa import ibis.expr.api -from ibis_bigquery.compiler import reduction as bq_reduction, BigQueryExprTranslator import ibis.expr.datatypes as dt -from ibis.expr.operations import Arg, Comparison, Reduction, ValueOp import ibis.expr.rules as rlz + +from data_validation.clients import _raise_missing_client_error +from ibis_bigquery.compiler import reduction as bq_reduction, BigQueryExprTranslator +from ibis.expr.operations import Arg, Comparison, Reduction, ValueOp from ibis.expr.types import BinaryValue, IntegerColumn, StringValue from ibis.backends.impala.compiler import ImpalaExprTranslator from ibis.backends.pandas import client as _pandas_client @@ -39,8 +41,9 @@ from ibis.backends.base_sql.compiler import BaseExprTranslator from third_party.ibis.ibis_oracle.compiler import OracleExprTranslator from third_party.ibis.ibis_teradata.compiler import TeradataExprTranslator +from third_party.ibis.ibis_mssql.compiler import MSSQLExprTranslator + -# from third_party.ibis.ibis_mssql.compiler import MSSQLExprTranslator # TODO figure how to add RAWSQL # from third_party.ibis.ibis_snowflake.compiler import SnowflakeExprTranslator # from third_party.ibis.ibis_oracle.compiler import OracleExprTranslator <<<<<< DB2 @@ -163,7 +166,14 @@ def format_raw_sql(translator, expr): def sa_format_raw_sql(translator, expr): op = expr.op() rand_col, raw_sql = op.args - return sqlalchemy.text(raw_sql.op().args[0]) + return sa.text(raw_sql.op().args[0]) + +def sa_format_hashbytes(translator, expr): + arg, how = expr.op().args + compiled_arg = translator.translate(arg) + hash_func = sa.func.hashbytes(sa.sql.literal_column("'SHA2_256'"), compiled_arg) + hash_to_string = sa.func.convert(sa.sql.literal_column('CHAR(64)'), hash_func, sa.sql.literal_column('2')) + return sa.func.lower(hash_to_string) _pandas_client._inferable_pandas_dtypes["floating"] = _pandas_client.dt.float64 @@ -177,6 +187,7 @@ def sa_format_raw_sql(translator, expr): BigQueryExprTranslator._registry[HashBytes] = format_hashbytes_bigquery AlchemyExprTranslator._registry[RawSQL] = format_raw_sql AlchemyExprTranslator._registry[HashBytes] = format_hashbytes_alchemy +MSSQLExprTranslator._registry[HashBytes] = sa_format_hashbytes BaseExprTranslator._registry[RawSQL] = format_raw_sql BaseExprTranslator._registry[HashBytes] = format_hashbytes_base BigQueryExprTranslator._registry[RawSQL] = format_raw_sql diff --git a/third_party/ibis/ibis_mssql/compiler.py b/third_party/ibis/ibis_mssql/compiler.py index b67db8be3..41ce2c6bf 100644 --- a/third_party/ibis/ibis_mssql/compiler.py +++ b/third_party/ibis/ibis_mssql/compiler.py @@ -15,7 +15,6 @@ import math -import pyodbc import sqlalchemy as sa import sqlalchemy.dialects.mssql as mssql @@ -27,7 +26,7 @@ import ibis.backends.base_sqlalchemy.alchemy as alch from ibis import literal as L -from ibis.backends.base_sql import _cumulative_to_reduction, fixed_arity, unary +from ibis.backends.base_sqlalchemy.alchemy import fixed_arity, unary, _cumulative_to_reduction def raise_unsupported_op_error(translator, expr, *args): @@ -350,6 +349,9 @@ def _day_of_week_name(t, expr): (sa_arg,) = map(t.translate, expr.op().args) return sa.func.trim(sa.func.format(sa_arg, 'dddd')) +def _string_join(t, expr): + sep, elements = expr.op().args + return sa.func.concat(*map(t.translate, elements)) _operation_registry = alch._operation_registry.copy() @@ -370,6 +372,7 @@ def _day_of_week_name(t, expr): ops.StringFind: _string_find, ops.StringLength: unary(sa.func.length), ops.StringReplace: fixed_arity(sa.func.replace, 3), + ops.StringJoin: _string_join, ops.Strip: unary(sa.func.trim), ops.Substring: _substr, ops.Uppercase: unary(sa.func.upper), @@ -465,7 +468,7 @@ class MSSQLExprTranslator(alch.AlchemyExprTranslator): _type_map = alch.AlchemyExprTranslator._type_map.copy() _type_map.update( { - dt.Boolean: pyodbc.SQL_BIT, + dt.Boolean: mssql.BIT, dt.Int8: mssql.TINYINT, dt.Int32: mssql.INTEGER, dt.Int64: mssql.BIGINT,