From b1232791f89fb39491a65cd1945f272d85e521b1 Mon Sep 17 00:00:00 2001 From: Neha Nene Date: Mon, 23 May 2022 14:35:17 -0500 Subject: [PATCH] feat!: Add 'primary_keys' and 'num_random_rows' fields to result handler (#372) * feat:adding fields to validation metadata * feat: added primary_keys and num_random_rows to result handler, added support to print text summary when using bqrh * merging new fixes, adding backwards incompatible error handling * lint --- data_validation/combiner.py | 17 ++++++ data_validation/config_manager.py | 8 +++ data_validation/consts.py | 6 +++ data_validation/metadata.py | 2 + data_validation/result_handlers/bigquery.py | 14 ++++- data_validation/result_handlers/text.py | 2 +- data_validation/validation_builder.py | 4 ++ samples/bq_utils/add_columns_schema.sh | 9 ++++ ...date_schema.sh => rename_column_schema.sh} | 2 + terraform/results_schema.json | 12 ++++- tests/unit/result_handlers/test_text.py | 10 ++-- tests/unit/test_combiner.py | 54 +++++++++++++++++++ tests/unit/test_config_manager.py | 22 ++++++++ tests/unit/test_metadata.py | 16 ++---- 14 files changed, 159 insertions(+), 19 deletions(-) create mode 100755 samples/bq_utils/add_columns_schema.sh rename samples/bq_utils/{update_schema.sh => rename_column_schema.sh} (91%) diff --git a/data_validation/combiner.py b/data_validation/combiner.py index e4e5dd413..f65bfcaa5 100644 --- a/data_validation/combiner.py +++ b/data_validation/combiner.py @@ -237,6 +237,15 @@ def _pivot_result(result, join_on_fields, validations, result_type): continue else: validation = validations[field] + if validation.primary_keys: + primary_keys = ( + ibis.literal("{") + + ibis.literal(", ").join(validation.primary_keys) + + ibis.literal("}") + ).name("primary_keys") + else: + primary_keys = ibis.literal(None).cast("string").name("primary_keys") + pivots.append( result.projection( ( @@ -255,6 +264,10 @@ def _pivot_result(result, join_on_fields, validations, result_type): ibis.literal(validation.get_column_name(result_type)) .cast("string") .name("column_name"), + primary_keys, + ibis.literal(validation.num_random_rows).name( + "num_random_rows" + ), result[field].cast("string").name("agg_value"), ) + join_on_fields @@ -303,6 +316,8 @@ def _join_pivots(source, target, differences, join_on_fields): source["aggregation_type"], source["table_name"], source["column_name"], + source["primary_keys"], + source["num_random_rows"], source["agg_value"], differences["difference"], differences["pct_difference"], @@ -325,6 +340,8 @@ def _join_pivots(source, target, differences, join_on_fields): target["column_name"].name("target_column_name"), target["agg_value"].name("target_agg_value"), group_by_columns, + source_difference["primary_keys"], + source_difference["num_random_rows"], source_difference["difference"], source_difference["pct_difference"], source_difference["pct_threshold"], diff --git a/data_validation/config_manager.py b/data_validation/config_manager.py index 276542ec2..665c7aac4 100644 --- a/data_validation/config_manager.py +++ b/data_validation/config_manager.py @@ -101,6 +101,10 @@ def random_row_batch_size(self): or consts.DEFAULT_NUM_RANDOM_ROWS ) + def get_random_row_batch_size(self): + """Return number of random rows or None.""" + return self.random_row_batch_size() if self.use_random_rows() else None + def process_in_memory(self): """Return whether to process in memory or on a remote platform.""" return True @@ -194,6 +198,10 @@ def append_primary_keys(self, primary_key_configs): self.primary_keys + primary_key_configs ) + def get_primary_keys_list(self): + """Return list of primary key column names""" + return [key[consts.CONFIG_SOURCE_COLUMN] for key in self.primary_keys] + @property def comparison_fields(self): """Return fields from Config""" diff --git a/data_validation/consts.py b/data_validation/consts.py index 142afe7ae..56bda8e89 100644 --- a/data_validation/consts.py +++ b/data_validation/consts.py @@ -134,6 +134,12 @@ "pct_threshold", "run_id", "start_time", + "target_table_name", + "target_column_name", + "difference", + "primary_keys", + "group_by_columns", + "num_random_rows", ] SCHEMA_VALIDATION_COLUMN_FILTER_LIST = [ "run_id", diff --git a/data_validation/metadata.py b/data_validation/metadata.py index 01c55b7e9..db8d9afcf 100644 --- a/data_validation/metadata.py +++ b/data_validation/metadata.py @@ -33,6 +33,8 @@ class ValidationMetadata(object): target_table_name: str source_column_name: str target_column_name: str + primary_keys: list + num_random_rows: int threshold: float def get_table_name(self, result_type): diff --git a/data_validation/result_handlers/bigquery.py b/data_validation/result_handlers/bigquery.py index 1a2701752..4ea395b8a 100644 --- a/data_validation/result_handlers/bigquery.py +++ b/data_validation/result_handlers/bigquery.py @@ -17,6 +17,7 @@ from google.cloud import bigquery from data_validation import client_info +from data_validation.result_handlers.text import TextResultHandler class BigQueryResultHandler(object): @@ -54,6 +55,9 @@ def get_handler_for_project( return BigQueryResultHandler(client, table_id=table_id) def execute(self, config, result_df): + text_handler = TextResultHandler("table") + text_handler.print_formatted_(result_df) + table = self._bigquery_client.get_table(self._table_id) chunk_errors = self._bigquery_client.insert_rows_from_dataframe( table, result_df @@ -64,9 +68,17 @@ def execute(self, config, result_df): == "no such field: validation_status." ): raise RuntimeError( - f"Please update your BigQuery results table schema using the script : samples/bq_utils/update_schema.sh.\n" + f"Please update your BigQuery results table schema using the script : samples/bq_utils/rename_column_schema.sh.\n" f"The lastest release of DVT has updated the column name 'status' to 'validation_status': {chunk_errors}" ) + elif ( + chunk_errors[0][0]["errors"][0]["message"] + == "no such field: primary_keys." + ): + raise RuntimeError( + f"Please update your BigQuery results table schema using the script : samples/bq_utils/add_columns_schema.sh.\n" + f"The lastest release of DVT has added two fields 'primary_keys' and 'num_random_rows': {chunk_errors}" + ) raise RuntimeError(f"could not write rows: {chunk_errors}") return result_df diff --git a/data_validation/result_handlers/text.py b/data_validation/result_handlers/text.py index 73735ffb1..491888841 100644 --- a/data_validation/result_handlers/text.py +++ b/data_validation/result_handlers/text.py @@ -44,7 +44,7 @@ def print_formatted_(self, result_df): else: print( result_df.drop(self.cols_filter_list, axis=1).to_markdown( - tablefmt="fancy_grid" + tablefmt="fancy_grid", index=False ) ) diff --git a/data_validation/validation_builder.py b/data_validation/validation_builder.py index 561c266fe..f175c434d 100644 --- a/data_validation/validation_builder.py +++ b/data_validation/validation_builder.py @@ -190,6 +190,8 @@ def add_aggregate(self, aggregate_field): target_table_name=self.config_manager.target_table, source_column_name=source_field_name, target_column_name=target_field_name, + primary_keys=self.config_manager.get_primary_keys_list(), + num_random_rows=self.config_manager.get_random_row_batch_size(), threshold=self.config_manager.threshold, ) @@ -296,6 +298,8 @@ def add_comparison_field(self, comparison_field): target_table_name=self.config_manager.target_table, source_column_name=source_field_name, target_column_name=target_field_name, + primary_keys=self.config_manager.get_primary_keys_list(), + num_random_rows=self.config_manager.get_random_row_batch_size(), threshold=self.config_manager.threshold, ) diff --git a/samples/bq_utils/add_columns_schema.sh b/samples/bq_utils/add_columns_schema.sh new file mode 100755 index 000000000..057e1c5b7 --- /dev/null +++ b/samples/bq_utils/add_columns_schema.sh @@ -0,0 +1,9 @@ +#!/bin/bash +# This script adds two columns 'primary_keys' and 'num_random_rows' to the schema as per PR #372. BigQuery natively supports adding columns to a schema. +# Reference: https://cloud.google.com/bigquery/docs/managing-table-schemas#adding_columns_to_a_tables_schema_definition + +DATASET=pso_data_validator +TABLE=results_test + +# The JSON schema includes two additional columns for primary_keys and num_random_rows +bq update $PROJECT_ID:$DATASET.$TABLE ../../terraform/results_schema.json \ No newline at end of file diff --git a/samples/bq_utils/update_schema.sh b/samples/bq_utils/rename_column_schema.sh similarity index 91% rename from samples/bq_utils/update_schema.sh rename to samples/bq_utils/rename_column_schema.sh index 0e7820ad3..13fc50211 100755 --- a/samples/bq_utils/update_schema.sh +++ b/samples/bq_utils/rename_column_schema.sh @@ -12,6 +12,8 @@ bq cp $CURRENT_TABLE $DEPRECATED_TABLE # To create a new table with column as Validation_status bq query \ --destination_table $DEST_TABLE \ +--clustering_fields validation_name,run_id \ +--time_partitioning_field start_time \ --replace \ --use_legacy_sql=false \ "SELECT * EXCEPT(status), status as validation_status FROM $CURRENT_TABLE" \ No newline at end of file diff --git a/terraform/results_schema.json b/terraform/results_schema.json index 0e1de9e5f..a7a462886 100644 --- a/terraform/results_schema.json +++ b/terraform/results_schema.json @@ -52,7 +52,17 @@ { "name": "group_by_columns", "type": "STRING", - "description": "Group by columns, stored as a key-value JSON mapping." + "description": "Group by columns, stored as a key-value JSON mapping" + }, + { + "name": "primary_keys", + "type": "STRING", + "description": "Primary keys for the validation" + }, + { + "name": "num_random_rows", + "type": "INTEGER", + "description": "Number of random row batch size" }, { "name": "source_agg_value", diff --git a/tests/unit/result_handlers/test_text.py b/tests/unit/result_handlers/test_text.py index 3c7576d24..d05f6fcec 100644 --- a/tests/unit/result_handlers/test_text.py +++ b/tests/unit/result_handlers/test_text.py @@ -86,16 +86,16 @@ def test_columns_to_print(module_under_test, capsys): ) result_handler.execute(SAMPLE_CONFIG, result_df) - grid_text = "││A│C││0│0│2││1│4│6││2│8│10│" + grid_text = "│A│C││0│2││4│6││8│10│" printed_text = capsys.readouterr().out print(printed_text) printed_text = ( printed_text.replace("\n", "") .replace("'", "") .replace(" ", "") - .replace("╒════╤═════╤═════╕", "") - .replace("╞════╪═════╪═════╡", "") - .replace("├────┼─────┼─────┤", "") - .replace("╘════╧═════╧═════╛", "") + .replace("╒═════╤═════╕", "") + .replace("╞═════╪═════╡", "") + .replace("├─────┼─────┤", "") + .replace("╘═════╧═════╛", "") ) assert printed_text == grid_text diff --git a/tests/unit/test_combiner.py b/tests/unit/test_combiner.py index 88226aaf4..fbbd43f7a 100644 --- a/tests/unit/test_combiner.py +++ b/tests/unit/test_combiner.py @@ -36,6 +36,8 @@ target_column_name="timecol", validation_type="Column", aggregation_type="count", + primary_keys=[], + num_random_rows=None, threshold=0.0, ), }, @@ -123,6 +125,8 @@ def test_generate_report_with_too_many_rows(module_under_test): target_column_name=None, validation_type="Column", aggregation_type="count", + primary_keys=[], + num_random_rows=None, threshold=0.0, ), }, @@ -146,6 +150,8 @@ def test_generate_report_with_too_many_rows(module_under_test): "source_agg_value": ["1"], "target_agg_value": ["2"], "group_by_columns": [None], + "primary_keys": [None], + "num_random_rows": [None], "difference": [1.0], "pct_difference": [100.0], "pct_threshold": [0.0], @@ -172,6 +178,8 @@ def test_generate_report_with_too_many_rows(module_under_test): target_column_name="timecol", validation_type="Column", aggregation_type="max", + primary_keys=[], + num_random_rows=None, threshold=0.0, ), }, @@ -195,6 +203,8 @@ def test_generate_report_with_too_many_rows(module_under_test): "source_agg_value": ["2020-07-01 16:00:00+00:00"], "target_agg_value": ["2020-07-01 16:00:00+00:00"], "group_by_columns": [None], + "primary_keys": [None], + "num_random_rows": [None], "difference": [0.0], "pct_difference": [0.0], "pct_threshold": [0.0], @@ -229,6 +239,8 @@ def test_generate_report_with_too_many_rows(module_under_test): target_table_schema="bq-public.target_dataset", validation_type="Column", aggregation_type="max", + primary_keys=[], + num_random_rows=None, threshold=0.0, ), }, @@ -252,6 +264,8 @@ def test_generate_report_with_too_many_rows(module_under_test): "source_agg_value": ["2020-09-13 12:26:40+00:00"], "target_agg_value": ["2033-05-18 03:33:20+00:00"], "group_by_columns": [None], + "primary_keys": [None], + "num_random_rows": [None], "difference": [400000000.0], "pct_difference": [25.0], "pct_threshold": [0.0], @@ -274,6 +288,8 @@ def test_generate_report_with_too_many_rows(module_under_test): target_column_name=None, validation_type="Column", aggregation_type="count", + primary_keys=[], + num_random_rows=None, threshold=30.0, ), "sum__ttteeesssttt": metadata.ValidationMetadata( @@ -285,6 +301,8 @@ def test_generate_report_with_too_many_rows(module_under_test): target_column_name="ttteeesssttt_col", validation_type="Column", aggregation_type="sum", + primary_keys=[], + num_random_rows=None, threshold=0.0, ), }, @@ -314,6 +332,8 @@ def test_generate_report_with_too_many_rows(module_under_test): "source_agg_value": ["8", "-1"], "target_agg_value": ["9", "1"], "group_by_columns": [None, None], + "primary_keys": [None, None], + "num_random_rows": [None, None], "difference": [1.0, 2.0], "pct_difference": [12.5, -200.0], "pct_threshold": [30.0, 0.0], @@ -385,6 +405,8 @@ def test_generate_report_without_group_by( target_column_name=None, validation_type="Column", aggregation_type="count", + primary_keys=[], + num_random_rows=None, threshold=7.0, ), }, @@ -413,6 +435,8 @@ def test_generate_report_without_group_by( '{"grp_a": "b", "grp_i": "0"}', '{"grp_a": "b", "grp_i": "1"}', ], + "primary_keys": [None] * 4, + "num_random_rows": [None] * 4, "difference": [-1.0, -1.0, -1.0, 1.0], "pct_difference": [-50.0, -25.0, -12.5, 6.25], "pct_threshold": [7.0, 7.0, 7.0, 7.0], @@ -441,6 +465,8 @@ def test_generate_report_without_group_by( target_column_name=None, validation_type="Column", aggregation_type="count", + primary_keys=[], + num_random_rows=None, threshold=100.0, ), }, @@ -464,6 +490,8 @@ def test_generate_report_without_group_by( "source_agg_value": ["1", "2"], "target_agg_value": ["3", "4"], "group_by_columns": ['{"grp": "\\""}', '{"grp": "\\\\"}'], + "primary_keys": [None] * 2, + "num_random_rows": [None] * 2, "difference": [2.0, 2.0], "pct_difference": [200.0, 100.0], "pct_threshold": [100.0, 100.0], @@ -502,6 +530,8 @@ def test_generate_report_without_group_by( target_column_name=None, validation_type="Column", aggregation_type="count", + primary_keys=[], + num_random_rows=None, threshold=25.0, ), }, @@ -546,6 +576,8 @@ def test_generate_report_without_group_by( '{"grp_a": "c", "grp_i": "0"}', '{"grp_a": "c", "grp_i": "1"}', ], + "primary_keys": [None] * 6, + "num_random_rows": [None] * 6, "difference": [-1.0, -1.0, _NAN, _NAN, _NAN, _NAN], "pct_difference": [-50.0, -25.0, _NAN, _NAN, _NAN, _NAN], "pct_threshold": [25.0, 25.0, _NAN, _NAN, _NAN, _NAN], @@ -617,6 +649,8 @@ def test_generate_report_with_group_by( target_column_name="test_col", validation_type="Column", aggregation_type="sum", + primary_keys=[], + num_random_rows=None, threshold=0.0, ), }, @@ -640,6 +674,8 @@ def test_generate_report_with_group_by( "source_agg_value": ["8093"], "target_agg_value": ["nan"], "group_by_columns": [None], + "primary_keys": [None], + "num_random_rows": [None], "difference": [_NAN], "pct_difference": [_NAN], "pct_threshold": [0.0], @@ -662,6 +698,8 @@ def test_generate_report_with_group_by( target_column_name="test_col", validation_type="Column", aggregation_type="sum", + primary_keys=[], + num_random_rows=None, threshold=0.0, ), }, @@ -685,6 +723,8 @@ def test_generate_report_with_group_by( "source_agg_value": ["nan"], "target_agg_value": ["8093"], "group_by_columns": [None], + "primary_keys": [None], + "num_random_rows": [None], "difference": [_NAN], "pct_difference": [_NAN], "pct_threshold": [0.0], @@ -707,6 +747,8 @@ def test_generate_report_with_group_by( target_column_name="test_col", validation_type="Column", aggregation_type="sum", + primary_keys=[], + num_random_rows=None, threshold=0.0, ), }, @@ -730,6 +772,8 @@ def test_generate_report_with_group_by( "source_agg_value": ["nan"], "target_agg_value": ["nan"], "group_by_columns": [None], + "primary_keys": [None], + "num_random_rows": [None], "difference": [_NAN], "pct_difference": [_NAN], "pct_threshold": [0.0], @@ -752,6 +796,8 @@ def test_generate_report_with_group_by( target_column_name=None, validation_type="Column", aggregation_type="count", + primary_keys=[], + num_random_rows=None, threshold=0.0, ), }, @@ -775,6 +821,8 @@ def test_generate_report_with_group_by( "source_agg_value": ["1"], "target_agg_value": ["nan"], "group_by_columns": [None], + "primary_keys": [None], + "num_random_rows": [None], "difference": [_NAN], "pct_difference": [_NAN], "pct_threshold": [0.0], @@ -797,6 +845,8 @@ def test_generate_report_with_group_by( target_column_name=None, validation_type="Column", aggregation_type="count", + primary_keys=[], + num_random_rows=None, threshold=30.0, ), "sum__ttteeesssttt": metadata.ValidationMetadata( @@ -808,6 +858,8 @@ def test_generate_report_with_group_by( target_column_name="ttteeesssttt_col", validation_type="Column", aggregation_type="sum", + primary_keys=[], + num_random_rows=None, threshold=0.0, ), }, @@ -837,6 +889,8 @@ def test_generate_report_with_group_by( "source_agg_value": ["8", "-1"], "target_agg_value": ["9", "nan"], "group_by_columns": [None, None], + "primary_keys": [None, None], + "num_random_rows": [None, None], "difference": [1.0, _NAN], "pct_difference": [12.5, _NAN], "pct_threshold": [30.0, 0.0], diff --git a/tests/unit/test_config_manager.py b/tests/unit/test_config_manager.py index 20fa049a1..66c092102 100644 --- a/tests/unit/test_config_manager.py +++ b/tests/unit/test_config_manager.py @@ -295,6 +295,28 @@ def test_get_result_handler(module_under_test): assert handler._table_id == "dataset.table_name" +def test_get_primary_keys_list(module_under_test): + config_manager = module_under_test.ConfigManager( + SAMPLE_CONFIG, MockIbisClient(), MockIbisClient(), verbose=False + ) + config_manager._config[consts.CONFIG_PRIMARY_KEYS] = [ + { + consts.CONFIG_FIELD_ALIAS: "id", + consts.CONFIG_SOURCE_COLUMN: "id", + consts.CONFIG_TARGET_COLUMN: "id", + consts.CONFIG_CAST: None, + }, + { + consts.CONFIG_FIELD_ALIAS: "sample_id", + consts.CONFIG_SOURCE_COLUMN: "sample_id", + consts.CONFIG_TARGET_COLUMN: "sample_id", + consts.CONFIG_CAST: None, + }, + ] + res = config_manager.get_primary_keys_list() + assert res == ["id", "sample_id"] + + def test_dependent_aliases(module_under_test): config_manager = module_under_test.ConfigManager( SAMPLE_ROW_CONFIG, MockIbisClient(), MockIbisClient(), verbose=False diff --git a/tests/unit/test_metadata.py b/tests/unit/test_metadata.py index afcafd42f..1ab0f449a 100644 --- a/tests/unit/test_metadata.py +++ b/tests/unit/test_metadata.py @@ -35,7 +35,7 @@ def test_get_column_name( module_under_test, source_column, target_column, result_type, expected ): validation = module_under_test.ValidationMetadata( - "", "", "", "", "", "", source_column, target_column, "" + "", "", "", "", "", "", source_column, target_column, "", "", "" ) column_name = validation.get_column_name(result_type) assert column_name == expected @@ -43,15 +43,7 @@ def test_get_column_name( def test_get_column_name_with_unexpected_result_type(module_under_test): validation = module_under_test.ValidationMetadata( - "", - "", - "", - "", - "", - "", - "", - "", - "", + "", "", "", "", "", "", "", "", "", "", "" ) with pytest.raises(ValueError, match="Unexpected result_type"): validation.get_column_name("oops_i_goofed") @@ -104,6 +96,8 @@ def test_get_table_name( None, None, "", + "", + "", ) table_name = validation.get_table_name(result_type) assert table_name == expected @@ -111,7 +105,7 @@ def test_get_table_name( def test_get_table_name_with_unexpected_result_type(module_under_test): validation = module_under_test.ValidationMetadata( - "", "", "", "", "", "", "", "", "" + "", "", "", "", "", "", "", "", "", "", "" ) with pytest.raises(ValueError, match="Unexpected result_type"): validation.get_table_name("oops_i_goofed")