Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Adds custom query row level hash validation feature. #440

Merged
merged 36 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
308d5b4
added custom-query sub-option to validate command
Robby29 Mar 2, 2022
614ade6
add source and target query option in custom query
Robby29 Mar 7, 2022
72b55f3
added min,max,sum aggregates with custom query
Robby29 Mar 8, 2022
1567c09
fixed hive t0 column name addition issue
Robby29 Mar 12, 2022
f9ab9f2
added empty query file check
Robby29 Mar 13, 2022
455eeab
linting fixes
Robby29 Mar 13, 2022
f732c41
Merge branch 'GoogleCloudPlatform:develop' into develop
Robby29 Mar 16, 2022
5b2b24b
Merge branch 'GoogleCloudPlatform:develop' into develop
Robby29 Mar 21, 2022
994064f
added unit tests
Robby29 Mar 21, 2022
9b084e4
Merge branch 'develop' of https://github.com/Robby29/professional-ser…
Robby29 Mar 21, 2022
78b8146
incorporated black linting changes
Robby29 Mar 21, 2022
cd43524
incorporated flake linter changes
Robby29 Mar 21, 2022
a6d8ab2
Merge branch 'GoogleCloudPlatform:develop' into develop
Robby29 Mar 24, 2022
3bec521
Merge branch 'GoogleCloudPlatform:develop' into develop
Robby29 Apr 4, 2022
7546fa9
Fixed result schema status to validation_status to avoid duplicate co…
Raniksingh Apr 4, 2022
cb622fd
Fixed linting on tests folder
Raniksingh Apr 5, 2022
dbea3e7
BREAKING CHANGE: update BQ results schema column name 'status' to 'va…
nehanene15 Apr 5, 2022
8d29441
Added script to update Bigquery schema
Raniksingh Apr 9, 2022
e2ed1d3
Moved bq_utils to right folder
Raniksingh Apr 11, 2022
3f26768
Updated bash script path and formatting
Raniksingh Apr 11, 2022
56287fa
Added custom query row validation feature
Robby29 Apr 12, 2022
fc5294d
Added custom query row validation feature.
Robby29 Apr 12, 2022
8c0ea59
Merge branch 'develop' of https://github.com/GoogleCloudPlatform/prof…
Robby29 Apr 12, 2022
c929532
Incorporated black and flake8 linting changes.
Robby29 Apr 12, 2022
d4b0f11
Added wildcard-include-string-len sub option
Robby29 Apr 13, 2022
9169fc4
Fixed custom query column bug
Robby29 Apr 14, 2022
baa511a
Merge branch 'GoogleCloudPlatform:develop' into develop
Robby29 Apr 14, 2022
05ab925
Made changes as per review from @dhercher
Robby29 Apr 14, 2022
b6b374d
Merge branch 'GoogleCloudPlatform:develop' into develop
Robby29 Apr 23, 2022
210222a
new changes according to Neha's review requests
Robby29 Apr 23, 2022
8448587
Merge branch 'GoogleCloudPlatform:develop' into develop
Robby29 Apr 28, 2022
74ab258
changed custom query type from list to string
Robby29 Apr 28, 2022
ee26db9
made custom query type argument required=true
Robby29 Apr 28, 2022
0e30f18
Merge branch 'GoogleCloudPlatform:develop' into develop
Robby29 Apr 28, 2022
70b78f0
typo changes
Robby29 Apr 28, 2022
021865f
typo fix
Robby29 Apr 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ data-validation (--verbose or -v) validate schema
Defaults to table.
```

#### Custom Query Validations
#### Custom Query Column Validations
Robby29 marked this conversation as resolved.
Show resolved Hide resolved

Below is the command syntax for custom query validations.
Below is the command syntax for custom query column validations.

```
data-validation (--verbose or -v) validate custom-query
Expand Down Expand Up @@ -273,6 +273,48 @@ The [Examples](docs/examples.md) page provides few examples of how this tool can
used to run custom query validations.


#### Custom Query Row Validations

(Note: Row hash validation is currently only supported for BigQuery and Imapala/Hive)
Robby29 marked this conversation as resolved.
Show resolved Hide resolved

Below is the command syntax for row validations. In order to run row level
validations you need to pass `--hash` flag with `*` value which means all the fields
of the custom query result will be concatenated and hashed.

Below is the command syntax for custom query row validations.

```
data-validation (--verbose or -v) validate custom-query
--source-conn or -sc SOURCE_CONN
Source connection details
See: *Data Source Configurations* section for each data source
--target-conn or -tc TARGET_CONN
Target connection details
See: *Connections* section for each data source
--tables-list or -tbls SOURCE_SCHEMA.SOURCE_TABLE=TARGET_SCHEMA.TARGET_TABLE
Comma separated list of tables in the form schema.table=target_schema.target_table
Target schema name and table name are optional.
i.e 'bigquery-public-data.new_york_citibike.citibike_trips'
--source-query-file SOURCE_QUERY_FILE, -sqf SOURCE_QUERY_FILE
File containing the source sql commands
--target-query-file TARGET_QUERY_FILE, -tqf TARGET_QUERY_FILE
File containing the target sql commands
--hash '*' '*' to hash all columns.
[--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE]
BigQuery destination for validation results. Defaults to stdout.
See: *Validation Reports* section
[--service-account or -sa PATH_TO_SA_KEY]
Service account to use for BigQuery result handler output.
[--labels or -l KEY1=VALUE1,KEY2=VALUE2]
Comma-separated key value pair labels for the run.
[--format or -fmt] Format for stdout output. Supported formats are (text, csv, json, table).
Defaults to table.
```

The [Examples](docs/examples.md) page provides few examples of how this tool can
used to run custom query row validations.


### Running Custom SQL Exploration

There are many occasions where you need to explore a data source while running
Expand Down
6 changes: 6 additions & 0 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,12 @@ def _configure_custom_query_parser(custom_query_parser):
"-pk",
help="Comma separated list of primary key columns 'col_a,col_b'",
)
custom_query_parser.add_argument(
Robby29 marked this conversation as resolved.
Show resolved Hide resolved
"--wildcard-include-string-len",
"-wis",
action="store_true",
help="Include string fields for wildcard aggregations.",
)


def _add_common_arguments(parser):
Expand Down
11 changes: 7 additions & 4 deletions data_validation/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ def generate_report(
differences_pivot = _calculate_differences(
source, target, join_on_fields, run_metadata.validations, is_value_comparison
)

source_pivot = _pivot_result(
source, join_on_fields, run_metadata.validations, consts.RESULT_TYPE_SOURCE
)

target_pivot = _pivot_result(
target, join_on_fields, run_metadata.validations, consts.RESULT_TYPE_TARGET
)
Expand Down Expand Up @@ -150,7 +152,6 @@ def _calculate_difference(field_differences, datatype, validation, is_value_comp
.else_(consts.VALIDATION_STATUS_SUCCESS)
.end()
)

return (
difference.name("difference"),
pct_difference.name("pct_difference"),
Expand Down Expand Up @@ -179,7 +180,6 @@ def _calculate_differences(
# When no join_on_fields are present, we expect only one row per table.
# This is validated in generate_report before this function is called.
differences_joined = source.cross_join(target)

differences_pivots = []
for field, field_type in schema.items():
if field not in validations:
Expand All @@ -202,7 +202,6 @@ def _calculate_differences(
)
]
)

differences_pivot = functools.reduce(
lambda pivot1, pivot2: pivot1.union(pivot2), differences_pivots
)
Expand All @@ -211,7 +210,11 @@ def _calculate_differences(

def _pivot_result(result, join_on_fields, validations, result_type):
all_fields = frozenset(result.schema().names)
validation_fields = all_fields - frozenset(join_on_fields)
validation_fields = (
all_fields - frozenset(join_on_fields)
if "hash__all" not in join_on_fields
Robby29 marked this conversation as resolved.
Show resolved Hide resolved
nehanene15 marked this conversation as resolved.
Show resolved Hide resolved
else all_fields
)
pivots = []

for field in validation_fields:
Expand Down
9 changes: 9 additions & 0 deletions data_validation/data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,19 @@ def _execute_validation(self, validation_builder, process_in_memory=True):
if self.config_manager.validation_type == consts.ROW_VALIDATION
else set(validation_builder.get_group_aliases())
)
if (
self.config_manager.validation_type == consts.CUSTOM_QUERY
and "hash__all" in self.run_metadata.validations
):
join_on_fields = set(["hash__all"])

# If row validation from YAML, compare source and target agg values
Robby29 marked this conversation as resolved.
Show resolved Hide resolved
is_value_comparison = (
self.config_manager.validation_type == consts.ROW_VALIDATION
or (
self.config_manager.validation_type == consts.CUSTOM_QUERY
and "hash__all" in self.run_metadata.validations
)
)

if process_in_memory:
Expand Down
214 changes: 214 additions & 0 deletions data_validation/query_builder/custom_query_row_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""" The QueryBuilder for building custom query row validation."""


class CustomQueryRowBuilder(object):
def __init__(self):
"""Build a CustomQueryRowBuilder objct which is ready to build a custom query row validation nested query."""
Robby29 marked this conversation as resolved.
Show resolved Hide resolved

def compile_custom_query(self, input_query, client_config):
"""Returns the nested sql query calculated from the input query
by adding calculated fields.
Args:
input_query (InputQuery): User provided sql query
"""
base_tbl_expr = self.get_table_exapression(input_query, client_config)
base_df = self.get_data_frame(base_tbl_expr, client_config)
base_df_columns = self.compile_df_fields(base_df)
calculated_columns = self.get_calculated_columns(base_df_columns)
cast_query = self.compile_cast_df_fields(
calculated_columns, input_query, base_df
)
ifnull_query = self.compile_ifnull_df_fields(
calculated_columns, cast_query, client_config
)
rstrip_query = self.compile_rstrip_df_fields(calculated_columns, ifnull_query)
upper_query = self.compile_upper_df_fields(calculated_columns, rstrip_query)
concat_query = self.compile_concat_df_fields(
calculated_columns, upper_query, client_config
)
sha2_query = self.compile_sha2_df_fields(concat_query, client_config)
return sha2_query

def get_table_exapression(self, input_query, client_config):
Robby29 marked this conversation as resolved.
Show resolved Hide resolved
"""Returns the ibis table expression for the input query."""
return client_config["data_client"].sql(input_query)

def get_data_frame(self, base_tbl_expr, client_config):
"""Returns the data frame for the table expression."""
return client_config["data_client"].execute(base_tbl_expr)

def compile_df_fields(self, data_frame):
"""Returns the list of columns in the dataframe.
Args:
data_frame (DataFrame): Pandas Dataframe
"""
return list(data_frame.columns.values)

def get_calculated_columns(self, df_columns):
"""Returns the dictionary containing the calculated fields."""

calculated_columns = {}
calculated_columns["columns"] = df_columns
calculated_columns["cast"] = []
for column in df_columns:
current_column = "cast__" + column
calculated_columns["cast"].append(current_column)

calculated_columns["ifnull"] = []
for column in calculated_columns["cast"]:
current_column = "ifnull__" + column
calculated_columns["ifnull"].append(current_column)

calculated_columns["rstrip"] = []
for column in calculated_columns["ifnull"]:
current_column = "rstrip__" + column
calculated_columns["rstrip"].append(current_column)

calculated_columns["upper"] = []
for column in calculated_columns["rstrip"]:
current_column = "upper__" + column
calculated_columns["upper"].append(current_column)

return calculated_columns

def compile_cast_df_fields(self, calculated_columns, input_query, data_frame):
"""Returns the wrapper cast query for the input query."""

query = "SELECT "
for column in calculated_columns["cast"]:
df_column = column[len("cast__") :]
df_column_dtype = data_frame[df_column].dtype.name
if df_column_dtype != "object" and df_column_dtype != "string":
query = (
query + "CAST(" + df_column + " AS string)" + " AS " + column + ","
)
else:
query += df_column + " AS " + column + ","

query = query[: len(query) - 1] + " FROM (" + input_query + ") AS base_query"
return query

def compile_ifnull_df_fields(self, calculated_columns, cast_query, client_config):
"""Returns the wrapper ifnull query for the input cast_query."""

client = client_config["data_client"]._source_type
if client == "Impala":
operation = "COALESCE"
elif client == "BigQuery":
operation = "IFNULL"
query = "SELECT "
for column in calculated_columns["ifnull"]:
query = (
query
+ operation
+ "("
+ column[len("ifnull__") :]
+ ",'DEFAULT_REPLACEMENT_STRING')"
+ " AS "
+ column
+ ","
)
query = query[: len(query) - 1] + " FROM (" + cast_query + ") AS cast_query"
return query

def compile_rstrip_df_fields(self, calculated_columns, ifnull_query):
"""Returns the wrapper rstrip query for the input ifnull_query."""

operation = "RTRIM"
query = "SELECT "
for column in calculated_columns["rstrip"]:
query = (
query
+ operation
+ "("
+ column[len("rstrip__") :]
+ ")"
+ " AS "
+ column
+ ","
)
query = query[: len(query) - 1] + " FROM (" + ifnull_query + ") AS ifnull_query"
return query

def compile_upper_df_fields(self, calculated_columns, rstrip_query):
"""Returns the wrapper upper query for the input rstrip_query."""

query = "SELECT "
for column in calculated_columns["upper"]:
query = (
query
+ "UPPER("
+ column[len("upper__") :]
+ ")"
+ " AS "
+ column
+ ","
)
query = query[: len(query) - 1] + " FROM (" + rstrip_query + ") AS rstrip_query"
return query

def compile_concat_df_fields(self, calculated_columns, upper_query, client_config):
"""Returns the wrapper concat query for the input upper_query."""

client = client_config["data_client"]._source_type
if client == "Impala":
operation = "CONCAT_WS"
query = "SELECT " + operation + "(',',"
for column in calculated_columns["upper"]:
query += column + ","
query = (
query[: len(query) - 1]
+ ") AS concat__all FROM("
+ upper_query
+ ") AS upper_query"
)
elif client == "BigQuery":
operation = "ARRAY_TO_STRING"
query = "SELECT " + operation + "(["
for column in calculated_columns["upper"]:
query += column + ","
query = (
query[: len(query) - 1]
+ "],',') AS concat__all FROM("
+ upper_query
+ ") AS upper_query"
)
return query

def compile_sha2_df_fields(self, concat_query, client_config):
"""Returns the wrapper sha2 query for the input concat_query."""

client = client_config["data_client"]._source_type
if client == "Impala":
operation = "SHA2"
query = (
"SELECT "
+ operation
+ "(concat__all,256) AS hash__all FROM ("
+ concat_query
+ ") AS concat_query"
)
elif client == "BigQuery":
operation = "TO_HEX"
query = (
"SELECT "
+ operation
+ "(SHA256(concat__all)) AS hash__all FROM ("
+ concat_query
+ ") AS concat_query"
)
return query
2 changes: 0 additions & 2 deletions data_validation/query_builder/query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ def add_grouped_field(self, grouped_field):
"""Add a GroupedField instance to the query which
represents adding a column to group by in the
query being built.

Args:
grouped_field (GroupedField): A GroupedField instance
"""
Expand All @@ -541,7 +540,6 @@ def add_filter_field(self, filter_obj):
"""Add a FilterField instance to your query which
will add the desired filter to your compiled
query (ie. WHERE query_filter=True)

Args:
filter_obj (FilterField): A FilterField instance
"""
Expand Down
Loading