Skip to content

Commit

Permalink
feat!: Add 'primary_keys' and 'num_random_rows' fields to result hand…
Browse files Browse the repository at this point in the history
…ler (#372)

* feat:adding fields to validation metadata

* feat: added primary_keys and num_random_rows to result handler, added support to print text summary when using bqrh

* merging new fixes, adding backwards incompatible error handling

* lint
  • Loading branch information
nehanene15 committed May 23, 2022
1 parent 93d2072 commit b123279
Show file tree
Hide file tree
Showing 14 changed files with 159 additions and 19 deletions.
17 changes: 17 additions & 0 deletions data_validation/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,15 @@ def _pivot_result(result, join_on_fields, validations, result_type):
continue
else:
validation = validations[field]
if validation.primary_keys:
primary_keys = (
ibis.literal("{")
+ ibis.literal(", ").join(validation.primary_keys)
+ ibis.literal("}")
).name("primary_keys")
else:
primary_keys = ibis.literal(None).cast("string").name("primary_keys")

pivots.append(
result.projection(
(
Expand All @@ -255,6 +264,10 @@ def _pivot_result(result, join_on_fields, validations, result_type):
ibis.literal(validation.get_column_name(result_type))
.cast("string")
.name("column_name"),
primary_keys,
ibis.literal(validation.num_random_rows).name(
"num_random_rows"
),
result[field].cast("string").name("agg_value"),
)
+ join_on_fields
Expand Down Expand Up @@ -303,6 +316,8 @@ def _join_pivots(source, target, differences, join_on_fields):
source["aggregation_type"],
source["table_name"],
source["column_name"],
source["primary_keys"],
source["num_random_rows"],
source["agg_value"],
differences["difference"],
differences["pct_difference"],
Expand All @@ -325,6 +340,8 @@ def _join_pivots(source, target, differences, join_on_fields):
target["column_name"].name("target_column_name"),
target["agg_value"].name("target_agg_value"),
group_by_columns,
source_difference["primary_keys"],
source_difference["num_random_rows"],
source_difference["difference"],
source_difference["pct_difference"],
source_difference["pct_threshold"],
Expand Down
8 changes: 8 additions & 0 deletions data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def random_row_batch_size(self):
or consts.DEFAULT_NUM_RANDOM_ROWS
)

def get_random_row_batch_size(self):
"""Return number of random rows or None."""
return self.random_row_batch_size() if self.use_random_rows() else None

def process_in_memory(self):
"""Return whether to process in memory or on a remote platform."""
return True
Expand Down Expand Up @@ -194,6 +198,10 @@ def append_primary_keys(self, primary_key_configs):
self.primary_keys + primary_key_configs
)

def get_primary_keys_list(self):
"""Return list of primary key column names"""
return [key[consts.CONFIG_SOURCE_COLUMN] for key in self.primary_keys]

@property
def comparison_fields(self):
"""Return fields from Config"""
Expand Down
6 changes: 6 additions & 0 deletions data_validation/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@
"pct_threshold",
"run_id",
"start_time",
"target_table_name",
"target_column_name",
"difference",
"primary_keys",
"group_by_columns",
"num_random_rows",
]
SCHEMA_VALIDATION_COLUMN_FILTER_LIST = [
"run_id",
Expand Down
2 changes: 2 additions & 0 deletions data_validation/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class ValidationMetadata(object):
target_table_name: str
source_column_name: str
target_column_name: str
primary_keys: list
num_random_rows: int
threshold: float

def get_table_name(self, result_type):
Expand Down
14 changes: 13 additions & 1 deletion data_validation/result_handlers/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from google.cloud import bigquery

from data_validation import client_info
from data_validation.result_handlers.text import TextResultHandler


class BigQueryResultHandler(object):
Expand Down Expand Up @@ -54,6 +55,9 @@ def get_handler_for_project(
return BigQueryResultHandler(client, table_id=table_id)

def execute(self, config, result_df):
text_handler = TextResultHandler("table")
text_handler.print_formatted_(result_df)

table = self._bigquery_client.get_table(self._table_id)
chunk_errors = self._bigquery_client.insert_rows_from_dataframe(
table, result_df
Expand All @@ -64,9 +68,17 @@ def execute(self, config, result_df):
== "no such field: validation_status."
):
raise RuntimeError(
f"Please update your BigQuery results table schema using the script : samples/bq_utils/update_schema.sh.\n"
f"Please update your BigQuery results table schema using the script : samples/bq_utils/rename_column_schema.sh.\n"
f"The lastest release of DVT has updated the column name 'status' to 'validation_status': {chunk_errors}"
)
elif (
chunk_errors[0][0]["errors"][0]["message"]
== "no such field: primary_keys."
):
raise RuntimeError(
f"Please update your BigQuery results table schema using the script : samples/bq_utils/add_columns_schema.sh.\n"
f"The lastest release of DVT has added two fields 'primary_keys' and 'num_random_rows': {chunk_errors}"
)
raise RuntimeError(f"could not write rows: {chunk_errors}")

return result_df
2 changes: 1 addition & 1 deletion data_validation/result_handlers/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def print_formatted_(self, result_df):
else:
print(
result_df.drop(self.cols_filter_list, axis=1).to_markdown(
tablefmt="fancy_grid"
tablefmt="fancy_grid", index=False
)
)

Expand Down
4 changes: 4 additions & 0 deletions data_validation/validation_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ def add_aggregate(self, aggregate_field):
target_table_name=self.config_manager.target_table,
source_column_name=source_field_name,
target_column_name=target_field_name,
primary_keys=self.config_manager.get_primary_keys_list(),
num_random_rows=self.config_manager.get_random_row_batch_size(),
threshold=self.config_manager.threshold,
)

Expand Down Expand Up @@ -296,6 +298,8 @@ def add_comparison_field(self, comparison_field):
target_table_name=self.config_manager.target_table,
source_column_name=source_field_name,
target_column_name=target_field_name,
primary_keys=self.config_manager.get_primary_keys_list(),
num_random_rows=self.config_manager.get_random_row_batch_size(),
threshold=self.config_manager.threshold,
)

Expand Down
9 changes: 9 additions & 0 deletions samples/bq_utils/add_columns_schema.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash
# This script adds two columns 'primary_keys' and 'num_random_rows' to the schema as per PR #372. BigQuery natively supports adding columns to a schema.
# Reference: https://cloud.google.com/bigquery/docs/managing-table-schemas#adding_columns_to_a_tables_schema_definition

DATASET=pso_data_validator
TABLE=results_test

# The JSON schema includes two additional columns for primary_keys and num_random_rows
bq update $PROJECT_ID:$DATASET.$TABLE ../../terraform/results_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ bq cp $CURRENT_TABLE $DEPRECATED_TABLE
# To create a new table with column as Validation_status
bq query \
--destination_table $DEST_TABLE \
--clustering_fields validation_name,run_id \
--time_partitioning_field start_time \
--replace \
--use_legacy_sql=false \
"SELECT * EXCEPT(status), status as validation_status FROM $CURRENT_TABLE"
12 changes: 11 additions & 1 deletion terraform/results_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,17 @@
{
"name": "group_by_columns",
"type": "STRING",
"description": "Group by columns, stored as a key-value JSON mapping."
"description": "Group by columns, stored as a key-value JSON mapping"
},
{
"name": "primary_keys",
"type": "STRING",
"description": "Primary keys for the validation"
},
{
"name": "num_random_rows",
"type": "INTEGER",
"description": "Number of random row batch size"
},
{
"name": "source_agg_value",
Expand Down
10 changes: 5 additions & 5 deletions tests/unit/result_handlers/test_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,16 @@ def test_columns_to_print(module_under_test, capsys):
)
result_handler.execute(SAMPLE_CONFIG, result_df)

grid_text = "│A│C││0│0│2││1│4│6││2│8│10│"
grid_text = "│A│C││0│2││4│6││8│10│"
printed_text = capsys.readouterr().out
print(printed_text)
printed_text = (
printed_text.replace("\n", "")
.replace("'", "")
.replace(" ", "")
.replace("╒════╤═════╤═════╕", "")
.replace("╞════╪═════╪═════╡", "")
.replace("├────┼─────┼─────┤", "")
.replace("╘════╧═════╧═════╛", "")
.replace("╒═════╤═════╕", "")
.replace("╞═════╪═════╡", "")
.replace("├─────┼─────┤", "")
.replace("╘═════╧═════╛", "")
)
assert printed_text == grid_text
Loading

0 comments on commit b123279

Please sign in to comment.