Skip to content

Commit

Permalink
feat!: Adds custom query row level hash validation feature. (#440)
Browse files Browse the repository at this point in the history
* added custom-query sub-option to  validate command

* add source and target query option in custom query

* added min,max,sum aggregates with custom query

* fixed hive t0 column name addition issue

* added empty query file check

* linting fixes

* added unit tests

* incorporated black linting changes

* incorporated flake linter changes

* Fixed result schema status to validation_status to avoid duplicate column names

* Fixed linting on tests folder

* BREAKING CHANGE: update BQ results schema column name 'status' to 'validation_status'

* Added script to update Bigquery schema

* Moved bq_utils to right folder

* Updated bash script path and formatting

* Added custom query row validation feature

* Incorporated black and flake8 linting changes.

* Added wildcard-include-string-len sub option

* Fixed custom query column bug

* Made changes as per review from @dhercher

* new changes according to Neha's review requests

* changed custom query type from list to string

* made custom query type argument required=true

* typo changes

Co-authored-by: raniksingh <[email protected]>
Co-authored-by: Neha Nene <[email protected]>
  • Loading branch information
3 people committed Apr 28, 2022
1 parent 57896f4 commit f057fe8
Show file tree
Hide file tree
Showing 14 changed files with 473 additions and 89 deletions.
54 changes: 51 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ data-validation (--verbose or -v) validate schema
Defaults to table.
```

#### Custom Query Validations
### Custom Query Column Validations

Below is the command syntax for custom query validations.
Below is the command syntax for custom query column validations.

```
data-validation (--verbose or -v) validate custom-query
Expand All @@ -246,7 +246,10 @@ data-validation (--verbose or -v) validate custom-query
Comma separated list of tables in the form schema.table=target_schema.target_table
Target schema name and table name are optional.
i.e 'bigquery-public-data.new_york_citibike.citibike_trips'
--source-query-file SOURCE_QUERY_FILE, -sqf SOURCE_QUERY_FILE
--custom-query-type CUSTOM_QUERY_TYPE, -cqt CUSTOM_QUERY_TYPE
Type of custom query validation: ('row'|'column')
Enter 'column' for custom query column validation
--source-query-file SOURCE_QUERY_FILE, -sqf SOURCE_QUERY_FILE
File containing the source sql commands
--target-query-file TARGET_QUERY_FILE, -tqf TARGET_QUERY_FILE
File containing the target sql commands
Expand All @@ -273,6 +276,51 @@ The [Examples](docs/examples.md) page provides few examples of how this tool can
used to run custom query validations.


### Custom Query Row Validations

#### (Note: Row hash validation is currently only supported for BigQuery, Imapala/Hive and Teradata)

Below is the command syntax for row validations. In order to run row level
validations you need to pass `--hash` flag with `*` value which means all the fields
of the custom query result will be concatenated and hashed.

Below is the command syntax for custom query row validations.

```
data-validation (--verbose or -v) validate custom-query
--source-conn or -sc SOURCE_CONN
Source connection details
See: *Data Source Configurations* section for each data source
--target-conn or -tc TARGET_CONN
Target connection details
See: *Connections* section for each data source
--tables-list or -tbls SOURCE_SCHEMA.SOURCE_TABLE=TARGET_SCHEMA.TARGET_TABLE
Comma separated list of tables in the form schema.table=target_schema.target_table
Target schema name and table name are optional.
i.e 'bigquery-public-data.new_york_citibike.citibike_trips'
--custom-query-type CUSTOM_QUERY_TYPE, -cqt CUSTOM_QUERY_TYPE
Type of custom query validation: ('row'|'column')
Enter 'row' for custom query column validation
--source-query-file SOURCE_QUERY_FILE, -sqf SOURCE_QUERY_FILE
File containing the source sql commands
--target-query-file TARGET_QUERY_FILE, -tqf TARGET_QUERY_FILE
File containing the target sql commands
--hash '*' '*' to hash all columns.
[--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE]
BigQuery destination for validation results. Defaults to stdout.
See: *Validation Reports* section
[--service-account or -sa PATH_TO_SA_KEY]
Service account to use for BigQuery result handler output.
[--labels or -l KEY1=VALUE1,KEY2=VALUE2]
Comma-separated key value pair labels for the run.
[--format or -fmt] Format for stdout output. Supported formats are (text, csv, json, table).
Defaults to table.
```

The [Examples](docs/examples.md) page provides few examples of how this tool can
used to run custom query row validations.


### Running Custom SQL Exploration

There are many occasions where you need to explore a data source while running
Expand Down
6 changes: 6 additions & 0 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ def build_config_from_args(args, config_manager):

if config_manager.validation_type == consts.CUSTOM_QUERY:
config_manager.append_aggregates(get_aggregate_config(args, config_manager))
if args.custom_query_type is not None:
config_manager.append_custom_query_type(args.custom_query_type)
else:
raise ValueError(
"Expected custom query type to be given, got empty string."
)
if args.source_query_file is not None:
query_file = cli_tools.get_arg_list(args.source_query_file)
config_manager.append_source_query_file(query_file)
Expand Down
12 changes: 12 additions & 0 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,12 @@ def _configure_schema_parser(schema_parser):
def _configure_custom_query_parser(custom_query_parser):
"""Configure arguments to run custom-query validations."""
_add_common_arguments(custom_query_parser)
custom_query_parser.add_argument(
"--custom-query-type",
"-cqt",
required=True,
help="Which type of custom query (row/column)",
)
custom_query_parser.add_argument(
"--source-query-file",
"-sqf",
Expand Down Expand Up @@ -609,6 +615,12 @@ def _configure_custom_query_parser(custom_query_parser):
"-pk",
help="Comma separated list of primary key columns 'col_a,col_b'",
)
custom_query_parser.add_argument(
"--wildcard-include-string-len",
"-wis",
action="store_true",
help="Include string fields for wildcard aggregations.",
)


def _add_common_arguments(parser):
Expand Down
11 changes: 7 additions & 4 deletions data_validation/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ def generate_report(
differences_pivot = _calculate_differences(
source, target, join_on_fields, run_metadata.validations, is_value_comparison
)

source_pivot = _pivot_result(
source, join_on_fields, run_metadata.validations, consts.RESULT_TYPE_SOURCE
)

target_pivot = _pivot_result(
target, join_on_fields, run_metadata.validations, consts.RESULT_TYPE_TARGET
)
Expand Down Expand Up @@ -161,7 +163,6 @@ def _calculate_difference(field_differences, datatype, validation, is_value_comp
.else_(consts.VALIDATION_STATUS_SUCCESS)
.end()
)

return (
difference.name("difference"),
pct_difference.name("pct_difference"),
Expand Down Expand Up @@ -190,7 +191,6 @@ def _calculate_differences(
# When no join_on_fields are present, we expect only one row per table.
# This is validated in generate_report before this function is called.
differences_joined = source.cross_join(target)

differences_pivots = []
for field, field_type in schema.items():
if field not in validations:
Expand All @@ -213,7 +213,6 @@ def _calculate_differences(
)
]
)

differences_pivot = functools.reduce(
lambda pivot1, pivot2: pivot1.union(pivot2), differences_pivots
)
Expand All @@ -222,7 +221,11 @@ def _calculate_differences(

def _pivot_result(result, join_on_fields, validations, result_type):
all_fields = frozenset(result.schema().names)
validation_fields = all_fields - frozenset(join_on_fields)
validation_fields = (
all_fields - frozenset(join_on_fields)
if "hash__all" not in join_on_fields
else all_fields
)
pivots = []

for field in validation_fields:
Expand Down
11 changes: 11 additions & 0 deletions data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,17 @@ def append_query_groups(self, grouped_column_configs):
self.query_groups + grouped_column_configs
)

@property
def custom_query_type(self):
"""Return custom query type from config"""
return self._config.get(consts.CONFIG_CUSTOM_QUERY_TYPE, "")

def append_custom_query_type(self, custom_query_type):
"""Append custom query type config to existing config."""
self._config[consts.CONFIG_CUSTOM_QUERY_TYPE] = (
self.custom_query_type + custom_query_type
)

@property
def source_query_file(self):
"""Return SQL Query File from Config"""
Expand Down
2 changes: 1 addition & 1 deletion data_validation/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
CONFIG_MAX_RECURSIVE_QUERY_SIZE = "max_recursive_query_size"
CONFIG_SOURCE_QUERY_FILE = "source_query_file"
CONFIG_TARGET_QUERY_FILE = "target_query_file"

CONFIG_CUSTOM_QUERY_TYPE = "custom_query_type"
CONFIG_FILTER_SOURCE_COLUMN = "source_column"
CONFIG_FILTER_SOURCE_VALUE = "source_value"
CONFIG_FILTER_TARGET_COLUMN = "target_column"
Expand Down
9 changes: 9 additions & 0 deletions data_validation/data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,19 @@ def _execute_validation(self, validation_builder, process_in_memory=True):
if self.config_manager.validation_type == consts.ROW_VALIDATION
else set(validation_builder.get_group_aliases())
)
if (
self.config_manager.validation_type == consts.CUSTOM_QUERY
and self.config_manager.custom_query_type == "row"
):
join_on_fields = set(["hash__all"])

# If row validation from YAML, compare source and target agg values
is_value_comparison = (
self.config_manager.validation_type == consts.ROW_VALIDATION
or (
self.config_manager.validation_type == consts.CUSTOM_QUERY
and self.config_manager.custom_query_type == "row"
)
)

if process_in_memory:
Expand Down
Loading

0 comments on commit f057fe8

Please sign in to comment.