Skip to content

Commit

Permalink
Merge branch 'develop' into issue356-db2-test
Browse files Browse the repository at this point in the history
  • Loading branch information
ngdav committed Mar 16, 2022
2 parents 6718a6f + 0ca0ccf commit 0f9716a
Show file tree
Hide file tree
Showing 35 changed files with 1,672 additions and 362 deletions.
67 changes: 61 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ updating the configuration.

### Managing Connections

The Data Validation Tool expects to recieve a source and target connection for
The Data Validation Tool expects to receive a source and target connection for
each validation which is run.

These connections can be supplied directly to the configuration, but more often
Expand Down Expand Up @@ -115,7 +115,7 @@ data-validation (--verbose or -v) validate column
[--service-account or -sa PATH_TO_SA_KEY]
Service account to use for BigQuery result handler output.
[--filters SOURCE_FILTER:TARGET_FILTER]
Colon spearated string values of source and target filters.
Colon separated string values of source and target filters.
If target filter is not provided, the source filter will run on source and target tables.
See: *Filters* section
[--config-file or -c CONFIG_FILE]
Expand All @@ -134,6 +134,51 @@ sum , min, etc.) is provided, the default aggregation will run.
The [Examples](docs/examples.md) page provides many examples of how a tool can
used to run powerful validations without writing any queries.

#### Row Validations

Below is the command syntax for row validations. In order to run row level
validations you need to pass a `--primary-key` flag which defines what field(s)
the validation will be compared along, as well as a `--comparison-fields` flag
which specifies the values (e.g. columns) whose raw values will be compared
based on the primary key join. Additionally you can use
[Calculated Fields](#calculated-fields) to compare derived values such as string
counts and hashes of multiple columns.

```
data-validation (--verbose or -v) validate row
--source-conn or -sc SOURCE_CONN
Source connection details
See: *Data Source Configurations* section for each data source
--target-conn or -tc TARGET_CONN
Target connection details
See: *Connections* section for each data source
--tables-list or -tbls SOURCE_SCHEMA.SOURCE_TABLE=TARGET_SCHEMA.TARGET_TABLE
Comma separated list of tables in the form schema.table=target_schema.target_table
Target schema name and table name are optional.
i.e 'bigquery-public-data.new_york_citibike.citibike_trips'
[--primary-keys or -pk PRIMARY_KEYS]
Comma separated list of columns to use as primary keys
[--comparison-fields or -fields comparison-fields]
Comma separated list of columns to compare. Can either be a physical column or an alias
See: *Calculated Fields* section for details
[--hash COLUMNS] Comma separated list of columns to perform a hash operation on or * for all columns
[--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE]
BigQuery destination for validation results. Defaults to stdout.
See: *Validation Reports* section
[--service-account or -sa PATH_TO_SA_KEY]
Service account to use for BigQuery result handler output.
[--filters SOURCE_FILTER:TARGET_FILTER]
Colon spearated string values of source and target filters.
If target filter is not provided, the source filter will run on source and target tables.
See: *Filters* section
[--config-file or -c CONFIG_FILE]
YAML Config File Path to be used for storing validations.
[--labels or -l KEY1=VALUE1,KEY2=VALUE2]
Comma-separated key value pair labels for the run.
[--format or -fmt] Format for stdout output. Supported formats are (text, csv, json, table).
Defaults to table.
```

#### Schema Validations

Below is the syntax for schema validations. These can be used to compare column
Expand Down Expand Up @@ -178,7 +223,7 @@ data-validation query

### Using Beta CLI Features

There may be ocassions we want to release a new CLI feature under a Beta flag.
There may be occasions we want to release a new CLI feature under a Beta flag.
Any features under Beta may or may not make their way to production. However, if
there is a Beta feature you wish to use than it can be accessed using the
following.
Expand All @@ -203,7 +248,9 @@ case specific CLI arguments or editing the saved YAML configuration file.
For example, the following command creates a YAML file for the validation of the
`new_york_citibike` table: `data-validation validate column -sc my_bq_conn -tc
my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips -c
citibike.yaml`
citibike.yaml`.

The vaildation config file is saved to the GCS path specified by the `PSO_DV_CONFIG_HOME` env variable if that has been set; otherwise, it is saved to wherever the tool is run.

Here is the generated YAML file named `citibike.yaml`:

Expand Down Expand Up @@ -233,12 +280,14 @@ Once the file is updated and saved, the following command runs the new
validation:

```
data-validation run-config -c citibike.yaml
data-validation configs run -c citibike.yaml
```

View the complete YAML file for a GroupedColumn validation on the
[examples](docs/examples.md#) page.

You can view a list of all saved validation YAML files using `data-validation configs list`, and print a YAML config using `data-validation configs get -c citibike.yaml`.

### Aggregated Fields

Aggregate fields contain the SQL fields that you want to produce an aggregate
Expand Down Expand Up @@ -285,6 +334,12 @@ Grouped Columns contain the fields you want your aggregations to be broken out
by, e.g. `SELECT last_updated::DATE, COUNT(*) FROM my.table` will produce a
resultset that breaks down the count of rows per calendar date.

### Comparison Fields

For row validations you need to specify the specific columns that you want to
compare. These values will be compared via a JOIN on their corresponding primary
key and will be evaluated for an exact match.

### Calculated Fields

Sometimes direct comparisons are not feasible between databases due to
Expand Down Expand Up @@ -454,7 +509,7 @@ in the Data Validation tool, it is a simple process.

1. In data_validation/data_validation.py

- Import the extened Client for the given source (ie. from
- Import the extended Client for the given source (ie. from
ibis.sql.mysql.client import MySQLClient).
- Add the "<RefName>": Client to the global CLIENT_LOOKUP dictionary.

Expand Down
2 changes: 1 addition & 1 deletion ci/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ python3.6 -m nox --version

# When NOX_SESSION is set, it only runs the specified session
if [[ -n "${NOX_SESSION:-}" && ( "$NOX_SESSION" == "integration_postgres" || "$NOX_SESSION" == "integration_sql_server" ) ]]; then
./cloud_sql_proxy -instances=$CLOUD_SQL_CONNECTION & python3.6 -m nox --error-on-missing-interpreters -s "${NOX_SESSION:-}"
./cloud_sql_proxy -instances="$CLOUD_SQL_CONNECTION" & python3.6 -m nox --error-on-missing-interpreters -s "${NOX_SESSION:-}"
elif [[ -n "${NOX_SESSION:-}" ]]; then
python3.6 -m nox --error-on-missing-interpreters -s "${NOX_SESSION:-}"
else
Expand Down
140 changes: 104 additions & 36 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
# limitations under the License.

import os

import logging
import json
from yaml import dump, load, Dumper, Loader
import sys
from yaml import Dumper, dump

from data_validation import (
cli_tools,
Expand All @@ -29,6 +28,10 @@
from data_validation.data_validation import DataValidation


# by default yaml dumps lists as pointers. This disables that feature
Dumper.ignore_aliases = lambda *args: True


def _get_arg_config_file(args):
"""Return String yaml config file path."""
if not args.config_file:
Expand All @@ -39,10 +42,8 @@ def _get_arg_config_file(args):

def _get_yaml_config_from_file(config_file_path):
"""Return Dict of yaml validation data."""
with open(config_file_path, "r") as yaml_file:
yaml_configs = load(yaml_file.read(), Loader=Loader)

return yaml_configs
yaml_config = cli_tools.get_validation(config_file_path)
return yaml_config


def get_aggregate_config(args, config_manager):
Expand Down Expand Up @@ -78,30 +79,73 @@ def get_aggregate_config(args, config_manager):
aggregate_configs += config_manager.build_config_column_aggregates(
"max", col_args, consts.NUMERIC_DATA_TYPES
)
if args.bit_xor:
col_args = None if args.bit_xor == "*" else cli_tools.get_arg_list(args.bit_xor)
aggregate_configs += config_manager.build_config_column_aggregates(
"bit_xor", col_args, consts.NUMERIC_DATA_TYPES
)
return aggregate_configs


def get_calculated_config(args, config_manager):
"""Return list of formatted calculated objects.
Args:
config_manager(ConfigManager): Validation config manager instance.
"""
calculated_configs = []
fields = []
if args.hash:
fields = config_manager._build_dependent_aliases("hash")
if len(fields) > 0:
max_depth = max([x["depth"] for x in fields])
else:
max_depth = 0
for field in fields:
calculated_configs.append(
config_manager.build_config_calculated_fields(
field["reference"],
field["calc_type"],
field["name"],
field["depth"],
None,
)
)
if args.hash:
config_manager.append_comparison_fields(
config_manager.build_config_comparison_fields(
["hash__all"], depth=max_depth
)
)
return calculated_configs


def build_config_from_args(args, config_manager):
"""Return config manager object ready to execute.
Args:
config_manager (ConfigManager): Validation config manager instance.
"""
config_manager.append_aggregates(get_aggregate_config(args, config_manager))
if args.primary_keys and not args.grouped_columns:
if not args.grouped_columns and not config_manager.use_random_rows():
logging.warning(
"No Grouped columns or Random Rows specified, ignoring primary keys."
config_manager.append_calculated_fields(get_calculated_config(args, config_manager))
if config_manager.validation_type == consts.COLUMN_VALIDATION:
config_manager.append_aggregates(get_aggregate_config(args, config_manager))
if args.grouped_columns is not None:
grouped_columns = cli_tools.get_arg_list(args.grouped_columns)
config_manager.append_query_groups(
config_manager.build_config_grouped_columns(grouped_columns)
)
if args.grouped_columns:
grouped_columns = cli_tools.get_arg_list(args.grouped_columns)
config_manager.append_query_groups(
config_manager.build_config_grouped_columns(grouped_columns)
)
if args.primary_keys:
primary_keys = cli_tools.get_arg_list(args.primary_keys, default_value=[])
elif config_manager.validation_type == consts.ROW_VALIDATION:
if args.comparison_fields is not None:
comparison_fields = cli_tools.get_arg_list(
args.comparison_fields, default_value=[]
)
config_manager.append_comparison_fields(
config_manager.build_config_comparison_fields(comparison_fields)
)
if args.primary_keys is not None:
primary_keys = cli_tools.get_arg_list(args.primary_keys)
config_manager.append_primary_keys(
config_manager.build_config_grouped_columns(primary_keys)
config_manager.build_config_comparison_fields(primary_keys)
)

# TODO(GH#18): Add query filter config logic
Expand All @@ -118,11 +162,9 @@ def build_config_managers_from_args(args):
if validate_cmd == "Schema":
config_type = consts.SCHEMA_VALIDATION
elif validate_cmd == "Column":
# TODO: We need to discuss how GroupedColumn and Row are differentiated.
if args.grouped_columns:
config_type = consts.GROUPED_COLUMN_VALIDATION
else:
config_type = consts.COLUMN_VALIDATION
config_type = consts.COLUMN_VALIDATION
elif validate_cmd == "Row":
config_type = consts.ROW_VALIDATION
else:
raise ValueError(f"Unknown Validation Type: {validate_cmd}")
else:
Expand Down Expand Up @@ -153,6 +195,13 @@ def build_config_managers_from_args(args):

format = args.format if args.format else "table"

use_random_rows = (
None if config_type == consts.SCHEMA_VALIDATION else args.use_random_row
)
random_row_batch_size = (
None if config_type == consts.SCHEMA_VALIDATION else args.random_row_batch_size
)

is_filesystem = source_client._source_type == "FileSystem"
tables_list = cli_tools.get_tables_list(
args.tables_list, default_value=[], is_filesystem=is_filesystem
Expand All @@ -167,8 +216,8 @@ def build_config_managers_from_args(args):
labels,
threshold,
format,
use_random_rows=args.use_random_row,
random_row_batch_size=args.random_row_batch_size,
use_random_rows=use_random_rows,
random_row_batch_size=random_row_batch_size,
source_client=source_client,
target_client=target_client,
result_handler_config=result_handler_config,
Expand Down Expand Up @@ -336,12 +385,9 @@ def store_yaml_config_file(args, config_managers):
Args:
config_managers (list[ConfigManager]): List of config manager instances.
"""
config_file_path = _get_arg_config_file(args)
yaml_configs = convert_config_to_yaml(args, config_managers)
yaml_config_str = dump(yaml_configs, Dumper=Dumper)

with open(config_file_path, "w") as yaml_file:
yaml_file.write(yaml_config_str)
config_file_path = _get_arg_config_file(args)
cli_tools.store_validation(config_file_path, yaml_configs)


def run(args):
Expand All @@ -355,7 +401,7 @@ def run(args):


def run_connections(args):
""" Run commands related to connection management."""
"""Run commands related to connection management."""
if args.connect_cmd == "list":
cli_tools.list_connections()
elif args.connect_cmd == "add":
Expand All @@ -367,9 +413,30 @@ def run_connections(args):
raise ValueError(f"Connections Argument '{args.connect_cmd}' is not supported")


def run_config(args):
"""Run commands related to validation config YAMLs (legacy - superceded by run_validation_configs)."""
config_managers = build_config_managers_from_yaml(args)
run_validations(args, config_managers)


def run_validation_configs(args):
"""Run commands related to validation config YAMLs."""
if args.validation_config_cmd == "run":
config_managers = build_config_managers_from_yaml(args)
run_validations(args, config_managers)
elif args.validation_config_cmd == "list":
cli_tools.list_validations()
elif args.validation_config_cmd == "get":
# Get and print yaml file config.
yaml = cli_tools.get_validation(_get_arg_config_file(args))
dump(yaml, sys.stdout)
else:
raise ValueError(f"Configs argument '{args.validate_cmd}' is not supported")


def validate(args):
""" Run commands related to data validation."""
if args.validate_cmd == "column" or args.validate_cmd == "schema":
if args.validate_cmd in ["column", "row", "schema"]:
run(args)
else:
raise ValueError(f"Validation Argument '{args.validate_cmd}' is not supported")
Expand All @@ -384,8 +451,9 @@ def main():
elif args.command == "connections":
run_connections(args)
elif args.command == "run-config":
config_managers = build_config_managers_from_yaml(args)
run_validations(args, config_managers)
run_config(args)
elif args.command == "configs":
run_validation_configs(args)
elif args.command == "find-tables":
print(find_tables_using_string_matching(args))
elif args.command == "query":
Expand Down
Loading

0 comments on commit 0f9716a

Please sign in to comment.