From 01765b31e507e80fc6f4db579b18ec9a8ea608f3 Mon Sep 17 00:00:00 2001 From: Neha Nene Date: Thu, 22 Sep 2022 15:53:03 -0500 Subject: [PATCH] feat: Postgres row hash validation support (#589) * feat: postgres hashbytes function * adding tests, random row --- README.md | 5 +- .../query_builder/random_row_builder.py | 2 + tests/system/data_sources/test_postgres.py | 76 ++++++++++++++++++- tests/system/data_sources/test_sql_server.py | 1 - third_party/ibis/ibis_addon/operations.py | 14 +++- 5 files changed, 90 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index e6b4fb1d9..40530c626 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ perform this task. DVT supports the following validations: * Column validation (count, sum, avg, min, max, group by) -* Row validation (BQ, Hive, Teradata, Oracle, SQL Server only) +* Row validation (BQ, Hive, Teradata, Oracle, SQL Server, Postgres only) * Schema validation * Custom Query validation * Ad hoc SQL exploration @@ -133,8 +133,7 @@ The [Examples](https://github.com/GoogleCloudPlatform/professional-services-data #### Row Validations -(Note: Row hash validation is currently supported for BigQuery, Teradata, Impala/Hive, Oracle, and SQL Server. -Struct and array data types are not currently supported and random row is not yet supported for SQL Server. +(Note: Row hash validation is currently supported for BigQuery, Teradata, Impala/Hive, Oracle, SQL Server, and Postgres. Struct and array data types are not currently supported and random row is not yet supported for SQL Server. In addition, please note that SHA256 is not a supported function on Teradata systems. If you wish to perform this comparison on Teradata you will need to [deploy a UDF to perform the conversion](https://github.com/akuroda/teradata-udf-sha2/blob/master/src/sha256.c).) diff --git a/data_validation/query_builder/random_row_builder.py b/data_validation/query_builder/random_row_builder.py index 0675f90fe..43eb42fb2 100644 --- a/data_validation/query_builder/random_row_builder.py +++ b/data_validation/query_builder/random_row_builder.py @@ -28,6 +28,7 @@ from ibis_bigquery import BigQueryClient from ibis.backends.impala.client import ImpalaClient from ibis.backends.pandas.client import PandasClient +from ibis.backends.postgres.client import PostgreSQLClient from ibis.expr.signature import Argument as Arg from data_validation import clients from data_validation.query_builder.query_builder import QueryBuilder @@ -49,6 +50,7 @@ clients.TeradataClient: None, ImpalaClient: "RAND()", clients.OracleClient: "DBMS_RANDOM.VALUE", + PostgreSQLClient: "RANDOM()", } diff --git a/tests/system/data_sources/test_postgres.py b/tests/system/data_sources/test_postgres.py index 7e2753bea..496f3178c 100644 --- a/tests/system/data_sources/test_postgres.py +++ b/tests/system/data_sources/test_postgres.py @@ -65,7 +65,6 @@ def cloud_sql(request): def test_postgres_count(cloud_sql): """Test count validation on Postgres instance""" config_count_valid = { - # BigQuery Specific Connection Config consts.CONFIG_SOURCE_CONN: CONN, consts.CONFIG_TARGET_CONN: CONN, # Validation Type @@ -106,6 +105,81 @@ def test_postgres_count(cloud_sql): assert sorted(list(df["source_agg_value"])) == ["28", "7", "7"] +def test_postgres_row(cloud_sql): + """Test row validaiton on Postgres""" + config_row_valid = { + consts.CONFIG_SOURCE_CONN: CONN, + consts.CONFIG_TARGET_CONN: CONN, + # Validation Type + consts.CONFIG_TYPE: "Row", + # Configuration Required Depending on Validator Type + consts.CONFIG_SCHEMA_NAME: "public", + consts.CONFIG_TABLE_NAME: "entries", + consts.CONFIG_COMPARISON_FIELDS: [ + { + "source_column": "hash__all", + "target_column": "hash__all", + "field_alias": "hash__all", + "cast": None, + } + ], + consts.CONFIG_CALCULATED_FIELDS: [ + { + "source_calculated_columns": ["content"], + "target_calculated_columns": ["content"], + "field_alias": "cast__content", + "type": "cast", + "depth": 0, + }, + { + "source_calculated_columns": ["cast__content"], + "target_calculated_columns": ["cast__content"], + "field_alias": "ifnull__cast__content", + "type": "ifnull", + "depth": 1, + }, + { + "source_calculated_columns": ["ifnull__cast__content"], + "target_calculated_columns": ["ifnull__cast__content"], + "field_alias": "rstrip__ifnull__cast__content", + "type": "rstrip", + "depth": 2, + }, + { + "source_calculated_columns": ["rstrip__ifnull__cast__content"], + "target_calculated_columns": ["rstrip__ifnull__cast__content"], + "field_alias": "upper__rstrip__ifnull__cast__content", + "type": "upper", + "depth": 3, + }, + { + "source_calculated_columns": ["upper__rstrip__ifnull__cast__content"], + "target_calculated_columns": ["upper__rstrip__ifnull__cast__content"], + "field_alias": "hash__all", + "type": "hash", + "depth": 4, + }, + ], + consts.CONFIG_PRIMARY_KEYS: [ + { + "source_column": "entryid", + "target_column": "entryid", + "field_alias": "entryid", + "cast": None, + } + ], + consts.CONFIG_FORMAT: "table", + } + + data_validator = data_validation.DataValidation( + config_row_valid, + verbose=False, + ) + df = data_validator.execute() + + assert df["source_agg_value"][0] == df["target_agg_value"][0] + + def test_schema_validation(cloud_sql): """Test schema validation on Postgres instance""" config_count_valid = { diff --git a/tests/system/data_sources/test_sql_server.py b/tests/system/data_sources/test_sql_server.py index 23b1435a2..4618c9fa2 100644 --- a/tests/system/data_sources/test_sql_server.py +++ b/tests/system/data_sources/test_sql_server.py @@ -94,7 +94,6 @@ def test_sql_server_count(cloud_sql): def test_sql_server_row(cloud_sql): """Test row validation on SQL Server instance""" config_row_valid = { - # BigQuery Specific Connection Config consts.CONFIG_SOURCE_CONN: CONN, consts.CONFIG_TARGET_CONN: CONN, # Validation Type diff --git a/third_party/ibis/ibis_addon/operations.py b/third_party/ibis/ibis_addon/operations.py index 01cebe562..990a311cd 100644 --- a/third_party/ibis/ibis_addon/operations.py +++ b/third_party/ibis/ibis_addon/operations.py @@ -35,7 +35,6 @@ from ibis.expr.operations import Arg, Comparison, Reduction, ValueOp from ibis.expr.types import BinaryValue, IntegerColumn, StringValue from ibis.backends.impala.compiler import ImpalaExprTranslator -from ibis.backends.postgres.compiler import PostgreSQLExprTranslator from ibis.backends.pandas import client as _pandas_client from ibis.backends.base_sqlalchemy.alchemy import AlchemyExprTranslator from ibis.backends.base_sqlalchemy.compiler import ExprTranslator @@ -43,6 +42,7 @@ from third_party.ibis.ibis_oracle.compiler import OracleExprTranslator from third_party.ibis.ibis_teradata.compiler import TeradataExprTranslator from third_party.ibis.ibis_mssql.compiler import MSSQLExprTranslator +from ibis.backends.postgres.compiler import PostgreSQLExprTranslator # from third_party.ibis.ibis_snowflake.compiler import SnowflakeExprTranslator # from third_party.ibis.ibis_oracle.compiler import OracleExprTranslator <<<<<< DB2 @@ -168,7 +168,7 @@ def sa_format_raw_sql(translator, expr): rand_col, raw_sql = op.args return sa.text(raw_sql.op().args[0]) -def sa_format_hashbytes(translator, expr): +def sa_format_hashbytes_mssql(translator, expr): arg, how = expr.op().args compiled_arg = translator.translate(arg) hash_func = sa.func.hashbytes(sa.sql.literal_column("'SHA2_256'"), compiled_arg) @@ -181,6 +181,13 @@ def sa_format_hashbytes_oracle(translator, expr): hash_func = sa.func.standard_hash(compiled_arg, sa.sql.literal_column("'SHA256'")) return sa.func.lower(hash_func) +def sa_format_hashbytes_postgres(translator, expr): + arg, how = expr.op().args + compiled_arg = translator.translate(arg) + convert = sa.func.convert_to(compiled_arg, sa.sql.literal_column("'UTF8'")) + hash_func = sa.func.sha256(convert) + return sa.func.encode(hash_func, sa.sql.literal_column("'hex'")) + _pandas_client._inferable_pandas_dtypes["floating"] = _pandas_client.dt.float64 IntegerColumn.bit_xor = ibis.expr.api._agg_function("bit_xor", BitXor, True) @@ -194,7 +201,7 @@ def sa_format_hashbytes_oracle(translator, expr): BigQueryExprTranslator._registry[RawSQL] = format_raw_sql AlchemyExprTranslator._registry[RawSQL] = format_raw_sql AlchemyExprTranslator._registry[HashBytes] = format_hashbytes_alchemy -MSSQLExprTranslator._registry[HashBytes] = sa_format_hashbytes +MSSQLExprTranslator._registry[HashBytes] = sa_format_hashbytes_mssql MSSQLExprTranslator._registry[RawSQL] = sa_format_raw_sql BaseExprTranslator._registry[RawSQL] = format_raw_sql BaseExprTranslator._registry[HashBytes] = format_hashbytes_base @@ -204,4 +211,5 @@ def sa_format_hashbytes_oracle(translator, expr): OracleExprTranslator._registry[HashBytes] = sa_format_hashbytes_oracle TeradataExprTranslator._registry[RawSQL] = format_raw_sql TeradataExprTranslator._registry[HashBytes] = format_hashbytes_teradata +PostgreSQLExprTranslator._registry[HashBytes] = sa_format_hashbytes_postgres PostgreSQLExprTranslator._registry[RawSQL] = sa_format_raw_sql \ No newline at end of file