Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Add 'primary_keys' and 'num_random_rows' fields to result handler #372

Merged
merged 6 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions data_validation/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,15 @@ def _pivot_result(result, join_on_fields, validations, result_type):
continue
else:
validation = validations[field]
if validation.primary_keys:
primary_keys = (
ibis.literal("{")
+ ibis.literal(", ").join(validation.primary_keys)
+ ibis.literal("}")
).name("primary_keys")
else:
primary_keys = ibis.literal(None).cast("string").name("primary_keys")

pivots.append(
result.projection(
(
Expand All @@ -255,6 +264,10 @@ def _pivot_result(result, join_on_fields, validations, result_type):
ibis.literal(validation.get_column_name(result_type))
.cast("string")
.name("column_name"),
primary_keys,
ibis.literal(validation.num_random_rows).name(
"num_random_rows"
),
result[field].cast("string").name("agg_value"),
)
+ join_on_fields
Expand Down Expand Up @@ -303,6 +316,8 @@ def _join_pivots(source, target, differences, join_on_fields):
source["aggregation_type"],
source["table_name"],
source["column_name"],
source["primary_keys"],
source["num_random_rows"],
source["agg_value"],
differences["difference"],
differences["pct_difference"],
Expand All @@ -325,6 +340,8 @@ def _join_pivots(source, target, differences, join_on_fields):
target["column_name"].name("target_column_name"),
target["agg_value"].name("target_agg_value"),
group_by_columns,
source_difference["primary_keys"],
source_difference["num_random_rows"],
source_difference["difference"],
source_difference["pct_difference"],
source_difference["pct_threshold"],
Expand Down
8 changes: 8 additions & 0 deletions data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def random_row_batch_size(self):
or consts.DEFAULT_NUM_RANDOM_ROWS
)

def get_random_row_batch_size(self):
"""Return number of random rows or None."""
return self.random_row_batch_size() if self.use_random_rows() else None

def process_in_memory(self):
"""Return whether to process in memory or on a remote platform."""
return True
Expand Down Expand Up @@ -194,6 +198,10 @@ def append_primary_keys(self, primary_key_configs):
self.primary_keys + primary_key_configs
)

def get_primary_keys_list(self):
"""Return list of primary key column names"""
return [key[consts.CONFIG_SOURCE_COLUMN] for key in self.primary_keys]

@property
def comparison_fields(self):
"""Return fields from Config"""
Expand Down
6 changes: 6 additions & 0 deletions data_validation/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@
"pct_threshold",
"run_id",
"start_time",
"target_table_name",
"target_column_name",
"difference",
"primary_keys",
"group_by_columns",
"num_random_rows",
]
SCHEMA_VALIDATION_COLUMN_FILTER_LIST = [
"run_id",
Expand Down
2 changes: 2 additions & 0 deletions data_validation/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class ValidationMetadata(object):
target_table_name: str
source_column_name: str
target_column_name: str
primary_keys: list
num_random_rows: int
threshold: float

def get_table_name(self, result_type):
Expand Down
14 changes: 13 additions & 1 deletion data_validation/result_handlers/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from google.cloud import bigquery

from data_validation import client_info
from data_validation.result_handlers.text import TextResultHandler


class BigQueryResultHandler(object):
Expand Down Expand Up @@ -54,6 +55,9 @@ def get_handler_for_project(
return BigQueryResultHandler(client, table_id=table_id)

def execute(self, config, result_df):
text_handler = TextResultHandler("table")
text_handler.print_formatted_(result_df)

table = self._bigquery_client.get_table(self._table_id)
chunk_errors = self._bigquery_client.insert_rows_from_dataframe(
table, result_df
Expand All @@ -64,9 +68,17 @@ def execute(self, config, result_df):
== "no such field: validation_status."
):
raise RuntimeError(
f"Please update your BigQuery results table schema using the script : samples/bq_utils/update_schema.sh.\n"
f"Please update your BigQuery results table schema using the script : samples/bq_utils/rename_column_schema.sh.\n"
f"The lastest release of DVT has updated the column name 'status' to 'validation_status': {chunk_errors}"
)
elif (
chunk_errors[0][0]["errors"][0]["message"]
== "no such field: primary_keys."
):
raise RuntimeError(
f"Please update your BigQuery results table schema using the script : samples/bq_utils/add_columns_schema.sh.\n"
f"The lastest release of DVT has added two fields 'primary_keys' and 'num_random_rows': {chunk_errors}"
)
raise RuntimeError(f"could not write rows: {chunk_errors}")

return result_df
2 changes: 1 addition & 1 deletion data_validation/result_handlers/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def print_formatted_(self, result_df):
else:
print(
result_df.drop(self.cols_filter_list, axis=1).to_markdown(
tablefmt="fancy_grid"
tablefmt="fancy_grid", index=False
)
)

Expand Down
4 changes: 4 additions & 0 deletions data_validation/validation_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ def add_aggregate(self, aggregate_field):
target_table_name=self.config_manager.target_table,
source_column_name=source_field_name,
target_column_name=target_field_name,
primary_keys=self.config_manager.get_primary_keys_list(),
num_random_rows=self.config_manager.get_random_row_batch_size(),
threshold=self.config_manager.threshold,
)

Expand Down Expand Up @@ -296,6 +298,8 @@ def add_comparison_field(self, comparison_field):
target_table_name=self.config_manager.target_table,
source_column_name=source_field_name,
target_column_name=target_field_name,
primary_keys=self.config_manager.get_primary_keys_list(),
num_random_rows=self.config_manager.get_random_row_batch_size(),
threshold=self.config_manager.threshold,
)

Expand Down
9 changes: 9 additions & 0 deletions samples/bq_utils/add_columns_schema.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash
# This script adds two columns 'primary_keys' and 'num_random_rows' to the schema as per PR #372. BigQuery natively supports adding columns to a schema.
# Reference: https://cloud.google.com/bigquery/docs/managing-table-schemas#adding_columns_to_a_tables_schema_definition

DATASET=pso_data_validator
TABLE=results_test

# The JSON schema includes two additional columns for primary_keys and num_random_rows
bq update $PROJECT_ID:$DATASET.$TABLE ../../terraform/results_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ bq cp $CURRENT_TABLE $DEPRECATED_TABLE
# To create a new table with column as Validation_status
bq query \
--destination_table $DEST_TABLE \
--clustering_fields validation_name,run_id \
--time_partitioning_field start_time \
--replace \
--use_legacy_sql=false \
"SELECT * EXCEPT(status), status as validation_status FROM $CURRENT_TABLE"
12 changes: 11 additions & 1 deletion terraform/results_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,17 @@
{
"name": "group_by_columns",
"type": "STRING",
"description": "Group by columns, stored as a key-value JSON mapping."
"description": "Group by columns, stored as a key-value JSON mapping"
},
{
"name": "primary_keys",
"type": "STRING",
"description": "Primary keys for the validation"
},
{
"name": "num_random_rows",
"type": "INTEGER",
"description": "Number of random row batch size"
},
{
"name": "source_agg_value",
Expand Down
10 changes: 5 additions & 5 deletions tests/unit/result_handlers/test_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,16 @@ def test_columns_to_print(module_under_test, capsys):
)
result_handler.execute(SAMPLE_CONFIG, result_df)

grid_text = "│A│C││0│0│2││1│4│6││2│8│10│"
grid_text = "│A│C││0│2││4│6││8│10│"
printed_text = capsys.readouterr().out
print(printed_text)
printed_text = (
printed_text.replace("\n", "")
.replace("'", "")
.replace(" ", "")
.replace("╒════╤═════╤═════╕", "")
.replace("╞════╪═════╪═════╡", "")
.replace("├────┼─────┼─────┤", "")
.replace("╘════╧═════╧═════╛", "")
.replace("╒═════╤═════╕", "")
.replace("╞═════╪═════╡", "")
.replace("├─────┼─────┤", "")
.replace("╘═════╧═════╛", "")
)
assert printed_text == grid_text
Loading