Skip to content

Commit

Permalink
feat: Support for MSSQL row validation (#570)
Browse files Browse the repository at this point in the history
* feat: support for mssql row validation, updating schema print to stdout, removing unneeded cast to date in filter

* lint, make import optional

* import error

* chore: release 2.0.0

Release-As: 2.0.0

* removed unnecessary import

* chore: release 2.1.1

Release-As: 2.1.1

* fix typo in tests
  • Loading branch information
nehanene15 committed Aug 29, 2022
1 parent 68e5058 commit 61dabe0
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 41 deletions.
8 changes: 8 additions & 0 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ def get_aggregate_config(args, config_manager):
"int64",
"decimal",
"timestamp",
"float64[non-nullable]",
"float32[non-nullable]",
"int8[non-nullable]",
"int16[non-nullable]",
"int32[non-nullable]",
"int64[non-nullable]",
"decimal[non-nullable]",
"timestamp[non-nullable]",
]

if args.wildcard_include_string_len:
Expand Down
7 changes: 5 additions & 2 deletions data_validation/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@
"start_time",
"end_time",
"aggregation_type",
"source_agg_value",
"target_agg_value",
"difference",
"primary_keys",
"group_by_columns",
"num_random_rows",
"pct_threshold",
]
11 changes: 1 addition & 10 deletions data_validation/query_builder/query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,18 +176,9 @@ def compile(self, ibis_table):

if self.left_field:
self.left = ibis_table[self.left_field]
# Cast All Datetime to Date (TODO this may be a bug in BQ)
if isinstance(
ibis_table[self.left_field].type(), ibis.expr.datatypes.Timestamp
):
self.left = self.left.cast("date")

if self.right_field:
self.right = ibis_table[self.right_field]
# Cast All Datetime to Date (TODO this may be a bug in BQ)
if isinstance(
ibis_table[self.right_field].type(), ibis.expr.datatypes.Timestamp
):
self.right = self.right.cast("date")

return self.expr(self.left, self.right)

Expand Down
107 changes: 86 additions & 21 deletions tests/system/data_sources/test_sql_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
SQL_SERVER_USER = os.getenv("SQL_SERVER_USER", "sqlserver")
SQL_SERVER_PASSWORD = os.getenv("SQL_SERVER_PASSWORD")
PROJECT_ID = os.getenv("PROJECT_ID")
CONN = {
"source_type": "MSSQL",
"host": SQL_SERVER_HOST,
"user": SQL_SERVER_USER,
"password": SQL_SERVER_PASSWORD,
"port": 1433,
"database": "guestbook",
}


@pytest.fixture
Expand All @@ -55,19 +63,10 @@ def cloud_sql(request):

def test_sql_server_count(cloud_sql):
"""Test count validation on SQL Server instance"""
conn = {
"source_type": "MSSQL",
"host": SQL_SERVER_HOST,
"user": SQL_SERVER_USER,
"password": SQL_SERVER_PASSWORD,
"port": 1433,
"database": "guestbook",
}

config_count_valid = {
# BigQuery Specific Connection Config
consts.CONFIG_SOURCE_CONN: conn,
consts.CONFIG_TARGET_CONN: conn,
consts.CONFIG_SOURCE_CONN: CONN,
consts.CONFIG_TARGET_CONN: CONN,
# Validation Type
consts.CONFIG_TYPE: "Column",
# Configuration Required Depending on Validator Type
Expand All @@ -92,19 +91,85 @@ def test_sql_server_count(cloud_sql):
assert df["source_agg_value"][0] == df["target_agg_value"][0]


def test_schema_validation():
conn = {
"source_type": "MSSQL",
"host": SQL_SERVER_HOST,
"user": SQL_SERVER_USER,
"password": SQL_SERVER_PASSWORD,
"port": 1433,
"database": "guestbook",
def test_sql_server_row(cloud_sql):
"""Test row validation on SQL Server instance"""
config_row_valid = {
# BigQuery Specific Connection Config
consts.CONFIG_SOURCE_CONN: CONN,
consts.CONFIG_TARGET_CONN: CONN,
# Validation Type
consts.CONFIG_TYPE: "Row",
# Configuration Required Depending on Validator Type
consts.CONFIG_SCHEMA_NAME: "dbo",
consts.CONFIG_TABLE_NAME: "entries",
consts.CONFIG_COMPARISON_FIELDS: [
{
"source_column": "hash__all",
"target_column": "hash__all",
"field_alias": "hash__all",
"cast": None,
}
],
consts.CONFIG_CALCULATED_FIELDS: [
{
"source_calculated_columns": ["content"],
"target_calculated_columns": ["content"],
"field_alias": "cast__content",
"type": "cast",
"depth": 0,
},
{
"source_calculated_columns": ["cast__content"],
"target_calculated_columns": ["cast__content"],
"field_alias": "ifnull__cast__content",
"type": "ifnull",
"depth": 1,
},
{
"source_calculated_columns": ["ifnull__cast__content"],
"target_calculated_columns": ["ifnull__cast__content"],
"field_alias": "rstrip__ifnull__cast__content",
"type": "rstrip",
"depth": 2,
},
{
"source_calculated_columns": ["rstrip__ifnull__cast__content"],
"target_calculated_columns": ["rstrip__ifnull__cast__content"],
"field_alias": "upper__rstrip__ifnull__cast__content",
"type": "upper",
"depth": 3,
},
{
"source_calculated_columns": ["upper__rstrip__ifnull__cast__content"],
"target_calculated_columns": ["upper__rstrip__ifnull__cast__content"],
"field_alias": "hash__all",
"type": "hash",
"depth": 4,
},
],
consts.CONFIG_PRIMARY_KEYS: [
{
"source_column": "entryID",
"target_column": "entryID",
"field_alias": "entryID",
"cast": None,
}
],
consts.CONFIG_FORMAT: "table",
}

data_validator = data_validation.DataValidation(
config_row_valid,
verbose=False,
)
df = data_validator.execute()
assert df["source_agg_value"][0] == df["target_agg_value"][0]


def test_schema_validation():
config = {
consts.CONFIG_SOURCE_CONN: conn,
consts.CONFIG_TARGET_CONN: conn,
consts.CONFIG_SOURCE_CONN: CONN,
consts.CONFIG_TARGET_CONN: CONN,
consts.CONFIG_TYPE: "Schema",
consts.CONFIG_SCHEMA_NAME: "dbo",
consts.CONFIG_TABLE_NAME: "entries",
Expand Down
21 changes: 16 additions & 5 deletions third_party/ibis/ibis_addon/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
"""

import ibis
import sqlalchemy
import sqlalchemy as sa

import ibis.expr.api
from ibis_bigquery.compiler import reduction as bq_reduction, BigQueryExprTranslator
import ibis.expr.datatypes as dt
from ibis.expr.operations import Arg, Comparison, Reduction, ValueOp
import ibis.expr.rules as rlz

from data_validation.clients import _raise_missing_client_error
from ibis_bigquery.compiler import reduction as bq_reduction, BigQueryExprTranslator
from ibis.expr.operations import Arg, Comparison, Reduction, ValueOp
from ibis.expr.types import BinaryValue, IntegerColumn, StringValue
from ibis.backends.impala.compiler import ImpalaExprTranslator
from ibis.backends.pandas import client as _pandas_client
Expand All @@ -39,8 +41,9 @@
from ibis.backends.base_sql.compiler import BaseExprTranslator
from third_party.ibis.ibis_oracle.compiler import OracleExprTranslator
from third_party.ibis.ibis_teradata.compiler import TeradataExprTranslator
from third_party.ibis.ibis_mssql.compiler import MSSQLExprTranslator


# from third_party.ibis.ibis_mssql.compiler import MSSQLExprTranslator # TODO figure how to add RAWSQL
# from third_party.ibis.ibis_snowflake.compiler import SnowflakeExprTranslator
# from third_party.ibis.ibis_oracle.compiler import OracleExprTranslator <<<<<< DB2

Expand Down Expand Up @@ -163,7 +166,14 @@ def format_raw_sql(translator, expr):
def sa_format_raw_sql(translator, expr):
op = expr.op()
rand_col, raw_sql = op.args
return sqlalchemy.text(raw_sql.op().args[0])
return sa.text(raw_sql.op().args[0])

def sa_format_hashbytes(translator, expr):
arg, how = expr.op().args
compiled_arg = translator.translate(arg)
hash_func = sa.func.hashbytes(sa.sql.literal_column("'SHA2_256'"), compiled_arg)
hash_to_string = sa.func.convert(sa.sql.literal_column('CHAR(64)'), hash_func, sa.sql.literal_column('2'))
return sa.func.lower(hash_to_string)


_pandas_client._inferable_pandas_dtypes["floating"] = _pandas_client.dt.float64
Expand All @@ -177,6 +187,7 @@ def sa_format_raw_sql(translator, expr):
BigQueryExprTranslator._registry[HashBytes] = format_hashbytes_bigquery
AlchemyExprTranslator._registry[RawSQL] = format_raw_sql
AlchemyExprTranslator._registry[HashBytes] = format_hashbytes_alchemy
MSSQLExprTranslator._registry[HashBytes] = sa_format_hashbytes
BaseExprTranslator._registry[RawSQL] = format_raw_sql
BaseExprTranslator._registry[HashBytes] = format_hashbytes_base
BigQueryExprTranslator._registry[RawSQL] = format_raw_sql
Expand Down
9 changes: 6 additions & 3 deletions third_party/ibis/ibis_mssql/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import math

import pyodbc
import sqlalchemy as sa
import sqlalchemy.dialects.mssql as mssql

Expand All @@ -27,7 +26,7 @@
import ibis.backends.base_sqlalchemy.alchemy as alch

from ibis import literal as L
from ibis.backends.base_sql import _cumulative_to_reduction, fixed_arity, unary
from ibis.backends.base_sqlalchemy.alchemy import fixed_arity, unary, _cumulative_to_reduction


def raise_unsupported_op_error(translator, expr, *args):
Expand Down Expand Up @@ -350,6 +349,9 @@ def _day_of_week_name(t, expr):
(sa_arg,) = map(t.translate, expr.op().args)
return sa.func.trim(sa.func.format(sa_arg, 'dddd'))

def _string_join(t, expr):
sep, elements = expr.op().args
return sa.func.concat(*map(t.translate, elements))

_operation_registry = alch._operation_registry.copy()

Expand All @@ -370,6 +372,7 @@ def _day_of_week_name(t, expr):
ops.StringFind: _string_find,
ops.StringLength: unary(sa.func.length),
ops.StringReplace: fixed_arity(sa.func.replace, 3),
ops.StringJoin: _string_join,
ops.Strip: unary(sa.func.trim),
ops.Substring: _substr,
ops.Uppercase: unary(sa.func.upper),
Expand Down Expand Up @@ -465,7 +468,7 @@ class MSSQLExprTranslator(alch.AlchemyExprTranslator):
_type_map = alch.AlchemyExprTranslator._type_map.copy()
_type_map.update(
{
dt.Boolean: pyodbc.SQL_BIT,
dt.Boolean: mssql.BIT,
dt.Int8: mssql.TINYINT,
dt.Int32: mssql.INTEGER,
dt.Int64: mssql.BIGINT,
Expand Down

0 comments on commit 61dabe0

Please sign in to comment.