Skip to content

Commit

Permalink
feat: Postgres row hash validation support (#589)
Browse files Browse the repository at this point in the history
* feat: postgres hashbytes function

* adding tests, random row
  • Loading branch information
nehanene15 committed Sep 22, 2022
1 parent ac3460a commit 01765b3
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 8 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ perform this task.

DVT supports the following validations:
* Column validation (count, sum, avg, min, max, group by)
* Row validation (BQ, Hive, Teradata, Oracle, SQL Server only)
* Row validation (BQ, Hive, Teradata, Oracle, SQL Server, Postgres only)
* Schema validation
* Custom Query validation
* Ad hoc SQL exploration
Expand Down Expand Up @@ -133,8 +133,7 @@ The [Examples](https://github.com/GoogleCloudPlatform/professional-services-data

#### Row Validations

(Note: Row hash validation is currently supported for BigQuery, Teradata, Impala/Hive, Oracle, and SQL Server.
Struct and array data types are not currently supported and random row is not yet supported for SQL Server.
(Note: Row hash validation is currently supported for BigQuery, Teradata, Impala/Hive, Oracle, SQL Server, and Postgres. Struct and array data types are not currently supported and random row is not yet supported for SQL Server.
In addition, please note that SHA256 is not a supported function on Teradata systems.
If you wish to perform this comparison on Teradata you will need to
[deploy a UDF to perform the conversion](https://github.com/akuroda/teradata-udf-sha2/blob/master/src/sha256.c).)
Expand Down
2 changes: 2 additions & 0 deletions data_validation/query_builder/random_row_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from ibis_bigquery import BigQueryClient
from ibis.backends.impala.client import ImpalaClient
from ibis.backends.pandas.client import PandasClient
from ibis.backends.postgres.client import PostgreSQLClient
from ibis.expr.signature import Argument as Arg
from data_validation import clients
from data_validation.query_builder.query_builder import QueryBuilder
Expand All @@ -49,6 +50,7 @@
clients.TeradataClient: None,
ImpalaClient: "RAND()",
clients.OracleClient: "DBMS_RANDOM.VALUE",
PostgreSQLClient: "RANDOM()",
}


Expand Down
76 changes: 75 additions & 1 deletion tests/system/data_sources/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ def cloud_sql(request):
def test_postgres_count(cloud_sql):
"""Test count validation on Postgres instance"""
config_count_valid = {
# BigQuery Specific Connection Config
consts.CONFIG_SOURCE_CONN: CONN,
consts.CONFIG_TARGET_CONN: CONN,
# Validation Type
Expand Down Expand Up @@ -106,6 +105,81 @@ def test_postgres_count(cloud_sql):
assert sorted(list(df["source_agg_value"])) == ["28", "7", "7"]


def test_postgres_row(cloud_sql):
"""Test row validaiton on Postgres"""
config_row_valid = {
consts.CONFIG_SOURCE_CONN: CONN,
consts.CONFIG_TARGET_CONN: CONN,
# Validation Type
consts.CONFIG_TYPE: "Row",
# Configuration Required Depending on Validator Type
consts.CONFIG_SCHEMA_NAME: "public",
consts.CONFIG_TABLE_NAME: "entries",
consts.CONFIG_COMPARISON_FIELDS: [
{
"source_column": "hash__all",
"target_column": "hash__all",
"field_alias": "hash__all",
"cast": None,
}
],
consts.CONFIG_CALCULATED_FIELDS: [
{
"source_calculated_columns": ["content"],
"target_calculated_columns": ["content"],
"field_alias": "cast__content",
"type": "cast",
"depth": 0,
},
{
"source_calculated_columns": ["cast__content"],
"target_calculated_columns": ["cast__content"],
"field_alias": "ifnull__cast__content",
"type": "ifnull",
"depth": 1,
},
{
"source_calculated_columns": ["ifnull__cast__content"],
"target_calculated_columns": ["ifnull__cast__content"],
"field_alias": "rstrip__ifnull__cast__content",
"type": "rstrip",
"depth": 2,
},
{
"source_calculated_columns": ["rstrip__ifnull__cast__content"],
"target_calculated_columns": ["rstrip__ifnull__cast__content"],
"field_alias": "upper__rstrip__ifnull__cast__content",
"type": "upper",
"depth": 3,
},
{
"source_calculated_columns": ["upper__rstrip__ifnull__cast__content"],
"target_calculated_columns": ["upper__rstrip__ifnull__cast__content"],
"field_alias": "hash__all",
"type": "hash",
"depth": 4,
},
],
consts.CONFIG_PRIMARY_KEYS: [
{
"source_column": "entryid",
"target_column": "entryid",
"field_alias": "entryid",
"cast": None,
}
],
consts.CONFIG_FORMAT: "table",
}

data_validator = data_validation.DataValidation(
config_row_valid,
verbose=False,
)
df = data_validator.execute()

assert df["source_agg_value"][0] == df["target_agg_value"][0]


def test_schema_validation(cloud_sql):
"""Test schema validation on Postgres instance"""
config_count_valid = {
Expand Down
1 change: 0 additions & 1 deletion tests/system/data_sources/test_sql_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ def test_sql_server_count(cloud_sql):
def test_sql_server_row(cloud_sql):
"""Test row validation on SQL Server instance"""
config_row_valid = {
# BigQuery Specific Connection Config
consts.CONFIG_SOURCE_CONN: CONN,
consts.CONFIG_TARGET_CONN: CONN,
# Validation Type
Expand Down
14 changes: 11 additions & 3 deletions third_party/ibis/ibis_addon/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
from ibis.expr.operations import Arg, Comparison, Reduction, ValueOp
from ibis.expr.types import BinaryValue, IntegerColumn, StringValue
from ibis.backends.impala.compiler import ImpalaExprTranslator
from ibis.backends.postgres.compiler import PostgreSQLExprTranslator
from ibis.backends.pandas import client as _pandas_client
from ibis.backends.base_sqlalchemy.alchemy import AlchemyExprTranslator
from ibis.backends.base_sqlalchemy.compiler import ExprTranslator
from ibis.backends.base_sql.compiler import BaseExprTranslator
from third_party.ibis.ibis_oracle.compiler import OracleExprTranslator
from third_party.ibis.ibis_teradata.compiler import TeradataExprTranslator
from third_party.ibis.ibis_mssql.compiler import MSSQLExprTranslator
from ibis.backends.postgres.compiler import PostgreSQLExprTranslator

# from third_party.ibis.ibis_snowflake.compiler import SnowflakeExprTranslator
# from third_party.ibis.ibis_oracle.compiler import OracleExprTranslator <<<<<< DB2
Expand Down Expand Up @@ -168,7 +168,7 @@ def sa_format_raw_sql(translator, expr):
rand_col, raw_sql = op.args
return sa.text(raw_sql.op().args[0])

def sa_format_hashbytes(translator, expr):
def sa_format_hashbytes_mssql(translator, expr):
arg, how = expr.op().args
compiled_arg = translator.translate(arg)
hash_func = sa.func.hashbytes(sa.sql.literal_column("'SHA2_256'"), compiled_arg)
Expand All @@ -181,6 +181,13 @@ def sa_format_hashbytes_oracle(translator, expr):
hash_func = sa.func.standard_hash(compiled_arg, sa.sql.literal_column("'SHA256'"))
return sa.func.lower(hash_func)

def sa_format_hashbytes_postgres(translator, expr):
arg, how = expr.op().args
compiled_arg = translator.translate(arg)
convert = sa.func.convert_to(compiled_arg, sa.sql.literal_column("'UTF8'"))
hash_func = sa.func.sha256(convert)
return sa.func.encode(hash_func, sa.sql.literal_column("'hex'"))


_pandas_client._inferable_pandas_dtypes["floating"] = _pandas_client.dt.float64
IntegerColumn.bit_xor = ibis.expr.api._agg_function("bit_xor", BitXor, True)
Expand All @@ -194,7 +201,7 @@ def sa_format_hashbytes_oracle(translator, expr):
BigQueryExprTranslator._registry[RawSQL] = format_raw_sql
AlchemyExprTranslator._registry[RawSQL] = format_raw_sql
AlchemyExprTranslator._registry[HashBytes] = format_hashbytes_alchemy
MSSQLExprTranslator._registry[HashBytes] = sa_format_hashbytes
MSSQLExprTranslator._registry[HashBytes] = sa_format_hashbytes_mssql
MSSQLExprTranslator._registry[RawSQL] = sa_format_raw_sql
BaseExprTranslator._registry[RawSQL] = format_raw_sql
BaseExprTranslator._registry[HashBytes] = format_hashbytes_base
Expand All @@ -204,4 +211,5 @@ def sa_format_hashbytes_oracle(translator, expr):
OracleExprTranslator._registry[HashBytes] = sa_format_hashbytes_oracle
TeradataExprTranslator._registry[RawSQL] = format_raw_sql
TeradataExprTranslator._registry[HashBytes] = format_hashbytes_teradata
PostgreSQLExprTranslator._registry[HashBytes] = sa_format_hashbytes_postgres
PostgreSQLExprTranslator._registry[RawSQL] = sa_format_raw_sql

0 comments on commit 01765b3

Please sign in to comment.