Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support for MSSQL row validation #570

Merged
merged 7 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ def get_aggregate_config(args, config_manager):
"int64",
"decimal",
"timestamp",
"float64[non-nullable]",
"float32[non-nullable]",
"int8[non-nullable]",
"int16[non-nullable]",
"int32[non-nullable]",
"int64[non-nullable]",
"decimal[non-nullable]",
"timestamp[non-nullable]",
]

if args.wildcard_include_string_len:
Expand Down
7 changes: 5 additions & 2 deletions data_validation/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@
"start_time",
"end_time",
"aggregation_type",
"source_agg_value",
"target_agg_value",
"difference",
"primary_keys",
"group_by_columns",
"num_random_rows",
"pct_threshold",
]
11 changes: 1 addition & 10 deletions data_validation/query_builder/query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,18 +176,9 @@ def compile(self, ibis_table):

if self.left_field:
self.left = ibis_table[self.left_field]
# Cast All Datetime to Date (TODO this may be a bug in BQ)
if isinstance(
ibis_table[self.left_field].type(), ibis.expr.datatypes.Timestamp
):
self.left = self.left.cast("date")

if self.right_field:
self.right = ibis_table[self.right_field]
# Cast All Datetime to Date (TODO this may be a bug in BQ)
if isinstance(
ibis_table[self.right_field].type(), ibis.expr.datatypes.Timestamp
):
self.right = self.right.cast("date")

return self.expr(self.left, self.right)

Expand Down
107 changes: 86 additions & 21 deletions tests/system/data_sources/test_sql_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
SQL_SERVER_USER = os.getenv("SQL_SERVER_USER", "sqlserver")
SQL_SERVER_PASSWORD = os.getenv("SQL_SERVER_PASSWORD")
PROJECT_ID = os.getenv("PROJECT_ID")
CONN = {
"source_type": "MSSQL",
"host": SQL_SERVER_HOST,
"user": SQL_SERVER_USER,
"password": SQL_SERVER_PASSWORD,
"port": 1433,
"database": "guestbook",
}


@pytest.fixture
Expand All @@ -55,19 +63,10 @@ def cloud_sql(request):

def test_sql_server_count(cloud_sql):
"""Test count validation on SQL Server instance"""
conn = {
"source_type": "MSSQL",
"host": SQL_SERVER_HOST,
"user": SQL_SERVER_USER,
"password": SQL_SERVER_PASSWORD,
"port": 1433,
"database": "guestbook",
}

config_count_valid = {
# BigQuery Specific Connection Config
consts.CONFIG_SOURCE_CONN: conn,
consts.CONFIG_TARGET_CONN: conn,
consts.CONFIG_SOURCE_CONN: CONN,
consts.CONFIG_TARGET_CONN: CONN,
# Validation Type
consts.CONFIG_TYPE: "Column",
# Configuration Required Depending on Validator Type
Expand All @@ -92,19 +91,85 @@ def test_sql_server_count(cloud_sql):
assert df["source_agg_value"][0] == df["target_agg_value"][0]


def test_schema_validation():
conn = {
"source_type": "MSSQL",
"host": SQL_SERVER_HOST,
"user": SQL_SERVER_USER,
"password": SQL_SERVER_PASSWORD,
"port": 1433,
"database": "guestbook",
def test_sql_server_row(cloud_sql):
"""Test row validation on SQL Server instance"""
config_row_valid = {
# BigQuery Specific Connection Config
consts.CONFIG_SOURCE_CONN: CONN,
consts.CONFIG_TARGET_CONN: CONN,
# Validation Type
consts.CONFIG_TYPE: "Row",
# Configuration Required Depending on Validator Type
consts.CONFIG_SCHEMA_NAME: "dbo",
consts.CONFIG_TABLE_NAME: "entries",
consts.CONFIG_COMPARISON_FIELDS: [
{
"source_column": "hash__all",
"target_column": "hash__all",
"field_alias": "hash__all",
"cast": None,
}
],
consts.CONFIG_CALCULATED_FIELDS: [
{
"source_calculated_columns": ["content"],
"target_calculated_columns": ["content"],
"field_alias": "cast__content",
"type": "cast",
"depth": 0,
},
{
"source_calculated_columns": ["cast__content"],
"target_calculated_columns": ["cast__content"],
"field_alias": "ifnull__cast__content",
"type": "ifnull",
"depth": 1,
},
{
"source_calculated_columns": ["ifnull__cast__content"],
"target_calculated_columns": ["ifnull__cast__content"],
"field_alias": "rstrip__ifnull__cast__content",
"type": "rstrip",
"depth": 2,
},
{
"source_calculated_columns": ["rstrip__ifnull__cast__content"],
"target_calculated_columns": ["rstrip__ifnull__cast__content"],
"field_alias": "upper__rstrip__ifnull__cast__content",
"type": "upper",
"depth": 3,
},
{
"source_calculated_columns": ["upper__rstrip__ifnull__cast__content"],
"target_calculated_columns": ["upper__rstrip__ifnull__cast__content"],
"field_alias": "hash__all",
"type": "hash",
"depth": 4,
},
],
consts.CONFIG_PRIMARY_KEYS: [
{
"source_column": "entryID",
"target_column": "entryID",
"field_alias": "entryID",
"cast": None,
}
],
consts.CONFIG_FORMAT: "table",
}

data_validator = data_validation.DataValidation(
config_row_valid,
verbose=False,
)
df = data_validator.execute()
assert df["source_agg_value"][0] == df["target_agg_value"][0]


def test_schema_validation():
config = {
consts.CONFIG_SOURCE_CONN: conn,
consts.CONFIG_TARGET_CONN: conn,
consts.CONFIG_SOURCE_CONN: CONN,
consts.CONFIG_TARGET_CONN: CONN,
consts.CONFIG_TYPE: "Schema",
consts.CONFIG_SCHEMA_NAME: "dbo",
consts.CONFIG_TABLE_NAME: "entries",
Expand Down
21 changes: 16 additions & 5 deletions third_party/ibis/ibis_addon/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
"""

import ibis
import sqlalchemy
import sqlalchemy as sa

import ibis.expr.api
from ibis_bigquery.compiler import reduction as bq_reduction, BigQueryExprTranslator
import ibis.expr.datatypes as dt
from ibis.expr.operations import Arg, Comparison, Reduction, ValueOp
import ibis.expr.rules as rlz

from data_validation.clients import _raise_missing_client_error
from ibis_bigquery.compiler import reduction as bq_reduction, BigQueryExprTranslator
from ibis.expr.operations import Arg, Comparison, Reduction, ValueOp
from ibis.expr.types import BinaryValue, IntegerColumn, StringValue
from ibis.backends.impala.compiler import ImpalaExprTranslator
from ibis.backends.pandas import client as _pandas_client
Expand All @@ -39,8 +41,9 @@
from ibis.backends.base_sql.compiler import BaseExprTranslator
from third_party.ibis.ibis_oracle.compiler import OracleExprTranslator
from third_party.ibis.ibis_teradata.compiler import TeradataExprTranslator
from third_party.ibis.ibis_mssql.compiler import MSSQLExprTranslator


# from third_party.ibis.ibis_mssql.compiler import MSSQLExprTranslator # TODO figure how to add RAWSQL
# from third_party.ibis.ibis_snowflake.compiler import SnowflakeExprTranslator
# from third_party.ibis.ibis_oracle.compiler import OracleExprTranslator <<<<<< DB2

Expand Down Expand Up @@ -163,7 +166,14 @@ def format_raw_sql(translator, expr):
def sa_format_raw_sql(translator, expr):
op = expr.op()
rand_col, raw_sql = op.args
return sqlalchemy.text(raw_sql.op().args[0])
return sa.text(raw_sql.op().args[0])

def sa_format_hashbytes(translator, expr):
arg, how = expr.op().args
compiled_arg = translator.translate(arg)
hash_func = sa.func.hashbytes(sa.sql.literal_column("'SHA2_256'"), compiled_arg)
hash_to_string = sa.func.convert(sa.sql.literal_column('CHAR(64)'), hash_func, sa.sql.literal_column('2'))
return sa.func.lower(hash_to_string)


_pandas_client._inferable_pandas_dtypes["floating"] = _pandas_client.dt.float64
Expand All @@ -177,6 +187,7 @@ def sa_format_raw_sql(translator, expr):
BigQueryExprTranslator._registry[HashBytes] = format_hashbytes_bigquery
AlchemyExprTranslator._registry[RawSQL] = format_raw_sql
AlchemyExprTranslator._registry[HashBytes] = format_hashbytes_alchemy
MSSQLExprTranslator._registry[HashBytes] = sa_format_hashbytes
BaseExprTranslator._registry[RawSQL] = format_raw_sql
BaseExprTranslator._registry[HashBytes] = format_hashbytes_base
BigQueryExprTranslator._registry[RawSQL] = format_raw_sql
Expand Down
9 changes: 6 additions & 3 deletions third_party/ibis/ibis_mssql/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import math

import pyodbc
import sqlalchemy as sa
import sqlalchemy.dialects.mssql as mssql

Expand All @@ -27,7 +26,7 @@
import ibis.backends.base_sqlalchemy.alchemy as alch

from ibis import literal as L
from ibis.backends.base_sql import _cumulative_to_reduction, fixed_arity, unary
from ibis.backends.base_sqlalchemy.alchemy import fixed_arity, unary, _cumulative_to_reduction


def raise_unsupported_op_error(translator, expr, *args):
Expand Down Expand Up @@ -350,6 +349,9 @@ def _day_of_week_name(t, expr):
(sa_arg,) = map(t.translate, expr.op().args)
return sa.func.trim(sa.func.format(sa_arg, 'dddd'))

def _string_join(t, expr):
sep, elements = expr.op().args
return sa.func.concat(*map(t.translate, elements))

_operation_registry = alch._operation_registry.copy()

Expand All @@ -370,6 +372,7 @@ def _day_of_week_name(t, expr):
ops.StringFind: _string_find,
ops.StringLength: unary(sa.func.length),
ops.StringReplace: fixed_arity(sa.func.replace, 3),
ops.StringJoin: _string_join,
ops.Strip: unary(sa.func.trim),
ops.Substring: _substr,
ops.Uppercase: unary(sa.func.upper),
Expand Down Expand Up @@ -465,7 +468,7 @@ class MSSQLExprTranslator(alch.AlchemyExprTranslator):
_type_map = alch.AlchemyExprTranslator._type_map.copy()
_type_map.update(
{
dt.Boolean: pyodbc.SQL_BIT,
dt.Boolean: mssql.BIT,
dt.Int8: mssql.TINYINT,
dt.Int32: mssql.INTEGER,
dt.Int64: mssql.BIGINT,
Expand Down