From f6d2b9d6879aaab1ab92f2f907d9334e15f75d7b Mon Sep 17 00:00:00 2001 From: Neha Nene Date: Thu, 16 Sep 2021 11:13:29 -0500 Subject: [PATCH] feat: Refactor CLI to fit Command Pattern (#303) * feat: refactor CLI to command pattern * fix: added format flag for validate command * fix: update readme formatting * fix: update sample code to use new CLI options --- README.md | 88 ++++++++---- data_validation/__main__.py | 48 +++++-- data_validation/cli_tools.py | 130 +++++++++++++++++- data_validation/config_manager.py | 12 +- data_validation/data_validation.py | 2 +- docs/examples.md | 38 +++-- samples/bq_result_handler_grouped.py | 2 +- tests/system/data_sources/test_bigquery.py | 2 +- tests/system/result_handlers/test_bigquery.py | 2 +- tests/unit/test__main.py | 3 +- tests/unit/test_cli_tools.py | 31 ++++- tests/unit/test_combiner.py | 12 +- tests/unit/test_config_manager.py | 9 +- tests/unit/test_data_validation.py | 19 +-- 14 files changed, 304 insertions(+), 94 deletions(-) diff --git a/README.md b/README.md index ce0a7efe1..ed8201ea5 100644 --- a/README.md +++ b/README.md @@ -70,9 +70,19 @@ Once you have your connections set up, you are ready to run the validations. ### Validation command syntax and options +Below are the command syntax and options for running validations from the CLI. +DVT supports column (including grouped column) and schema validations. + +#### Column Validations + +Below is the command syntax for column validations. To run a grouped column validation, +simply specify the `--grouped-columns` flag. You can also take grouped column validations +a step further by providing the `--primary-key` flag. With this flag, if a mismatch was found, +DVT will dive deeper into the slice with the error and find the row (primary key value) with the +inconsistency. + ``` -data-validation run - --type or -t TYPE Type of Data Validation (Column, GroupedColumn, Row, Schema) +data-validation (--verbose or -v) validate column --source-conn or -sc SOURCE_CONN Source connection details See: *Data Source Configurations* section for each data source @@ -83,35 +93,33 @@ data-validation run Comma separated list of tables in the form schema.table=target_schema.target_table Target schema name and table name are optional. i.e 'bigquery-public-data.new_york_citibike.citibike_trips' - --grouped-columns or -gc GROUPED_COLUMNS + [--grouped-columns or -gc GROUPED_COLUMNS] Comma separated list of columns for Group By i.e col_a,col_b - (Optional) Only used in GroupedColumn validations - --primary-keys or -pc PRIMARY_KEYS + [--primary-keys or -pk PRIMARY_KEYS] Comma separated list of columns to use as primary keys - (Optional) Only use in Row validations - --count COLUMNS Comma separated list of columns for count or * for all columns - --sum COLUMNS Comma separated list of columns for sum or * for all numeric - --min COLUMNS Comma separated list of columns for min or * for all numeric - --max COLUMNS Comma separated list of columns for max or * for all numeric - --avg COLUMNS Comma separated list of columns for avg or * for all numeric - --bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE - (Optional) BigQuery destination for validation results. Defaults to stdout. + (Note) Only use with grouped column validation + [--count COLUMNS] Comma separated list of columns for count or * for all columns + [--sum COLUMNS] Comma separated list of columns for sum or * for all numeric + [--min COLUMNS] Comma separated list of columns for min or * for all numeric + [--max COLUMNS] Comma separated list of columns for max or * for all numeric + [--avg COLUMNS] Comma separated list of columns for avg or * for all numeric + [--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE] + BigQuery destination for validation results. Defaults to stdout. See: *Validation Reports* section - --service-account or -sa PATH_TO_SA_KEY - (Optional) Service account to use for BigQuery result handler output. - --filters SOURCE_FILTER:TARGET_FILTER + [--service-account or -sa PATH_TO_SA_KEY] + Service account to use for BigQuery result handler output. + [--filters SOURCE_FILTER:TARGET_FILTER] Colon spearated string values of source and target filters. If target filter is not provided, the source filter will run on source and target tables. See: *Filters* section - --config-file or -c CONFIG_FILE + [--config-file or -c CONFIG_FILE] YAML Config File Path to be used for storing validations. - --threshold or -th THRESHOLD - (Optional) Float value. Maximum pct_difference allowed for validation to be considered a success. Defaults to 0.0 - --labels or -l KEY1=VALUE1,KEY2=VALUE2 - (Optional) Comma-separated key value pair labels for the run. - --verbose or -v Verbose logging will print queries executed - --format or -fmt Format for stdout output, Supported formats are (text, csv, json, table) - It defaults to table. + [--threshold or -th THRESHOLD] + Float value. Maximum pct_difference allowed for validation to be considered a success. Defaults to 0.0 + [--labels or -l KEY1=VALUE1,KEY2=VALUE2] + Comma-separated key value pair labels for the run. + [--format or -fmt] Format for stdout output. Supported formats are (text, csv, json, table). + Defaults to table. ``` The default aggregation type is a 'COUNT *'. If no aggregation flag (i.e count, @@ -120,6 +128,33 @@ sum , min, etc.) is provided, the default aggregation will run. The [Examples](docs/examples.md) page provides many examples of how a tool can used to run powerful validations without writing any queries. +#### Schema Validations +Below is the syntax for schema validations. These can be used to compare column types between source +and target. + +``` +data-validation (--verbose or -v) validate schema + --source-conn or -sc SOURCE_CONN + Source connection details + See: *Data Source Configurations* section for each data source + --target-conn or -tc TARGET_CONN + Target connection details + See: *Connections* section for each data source + --tables-list or -tbls SOURCE_SCHEMA.SOURCE_TABLE=TARGET_SCHEMA.TARGET_TABLE + Comma separated list of tables in the form schema.table=target_schema.target_table + Target schema name and table name are optional. + i.e 'bigquery-public-data.new_york_citibike.citibike_trips' + [--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE] + BigQuery destination for validation results. Defaults to stdout. + See: *Validation Reports* section + [--service-account or -sa PATH_TO_SA_KEY] + Service account to use for BigQuery result handler output. + [--config-file or -c CONFIG_FILE] + YAML Config File Path to be used for storing validations. + [--format or -fmt] Format for stdout output. Supported formats are (text, csv, json, table). + Defaults to table. +``` + ### Running Custom SQL Exploration There are many occasions where you need to explore a data source while running @@ -142,7 +177,7 @@ case specific CLI arguments or editing the saved YAML configuration file. For example, the following command creates a YAML file for the validation of the `new_york_citibike` table: ``` -data-validation run -t Column -sc my_bq_conn -tc my_bq_conn -tbls +data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips -c citibike.yaml ``` @@ -360,8 +395,7 @@ View the schema of the results [here](terraform/results_schema.json). ### Configure tool to output to BigQuery ``` -data-validation run - -t Column +data-validation validate column -sc bq_conn -tc bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips diff --git a/data_validation/__main__.py b/data_validation/__main__.py index 446638f54..4a808c2a1 100644 --- a/data_validation/__main__.py +++ b/data_validation/__main__.py @@ -80,15 +80,16 @@ def build_config_from_args(args, config_manager): config_manager (ConfigManager): Validation config manager instance. """ config_manager.append_aggregates(get_aggregate_config(args, config_manager)) - if config_manager.validation_type in [ - consts.GROUPED_COLUMN_VALIDATION, - consts.ROW_VALIDATION, - ]: + if args.primary_keys and not args.grouped_columns: + raise ValueError( + "Grouped columns must be specified for primary key level validation." + ) + if args.grouped_columns: grouped_columns = cli_tools.get_arg_list(args.grouped_columns) config_manager.append_query_groups( config_manager.build_config_grouped_columns(grouped_columns) ) - if config_manager.validation_type in [consts.ROW_VALIDATION]: + if args.primary_keys: primary_keys = cli_tools.get_arg_list(args.primary_keys, default_value=[]) config_manager.append_primary_keys( config_manager.build_config_grouped_columns(primary_keys) @@ -103,12 +104,14 @@ def build_config_managers_from_args(args): """Return a list of config managers ready to execute.""" configs = [] - config_type = args.type + if args.type is None: + config_type = args.validate_cmd.capitalize() + else: + config_type = args.type + source_conn = cli_tools.get_connection(args.source_conn) target_conn = cli_tools.get_connection(args.target_conn) - labels = cli_tools.get_labels(args.labels) - result_handler_config = None if args.bq_result_handler: result_handler_config = cli_tools.get_result_handler( @@ -119,14 +122,18 @@ def build_config_managers_from_args(args): args.result_handler_config, args.service_account ) - filter_config = [] - if args.filters: - filter_config = cli_tools.get_filters(args.filters) + # Schema validation will not accept filters, labels, or threshold as flags + filter_config, labels, threshold = [], [], 0.0 + if config_type != consts.SCHEMA_VALIDATION: + if args.filters: + filter_config = cli_tools.get_filters(args.filters) + if args.threshold: + threshold = args.threshold + labels = cli_tools.get_labels(args.labels) source_client = clients.get_data_client(source_conn) target_client = clients.get_data_client(target_conn) - threshold = args.threshold if args.threshold else 0.0 format = args.format if args.format else "table" is_filesystem = True if source_conn["source_type"] == "FileSystem" else False @@ -149,7 +156,10 @@ def build_config_managers_from_args(args): filter_config=filter_config, verbose=args.verbose, ) - configs.append(build_config_from_args(args, config_manager)) + if config_type != consts.SCHEMA_VALIDATION: + config_manager = build_config_from_args(args, config_manager) + + configs.append(config_manager) return configs @@ -302,7 +312,7 @@ def run_validations(args, config_managers): def store_yaml_config_file(args, config_managers): - """Build a YAML config file fromt he supplied configs. + """Build a YAML config file from the supplied configs. Args: config_managers (list[ConfigManager]): List of config manager instances. @@ -338,6 +348,14 @@ def run_connections(args): raise ValueError(f"Connections Argument '{args.connect_cmd}' is not supported") +def validate(args): + """ Run commands related to data validation.""" + if args.validate_cmd == "column" or args.validate_cmd == "schema": + run(args) + else: + raise ValueError(f"Validation Argument '{args.validate_cmd}' is not supported") + + def main(): # Create Parser and Get Deployment Info args = cli_tools.get_parsed_args() @@ -353,6 +371,8 @@ def main(): print(find_tables_using_string_matching(args)) elif args.command == "query": print(run_raw_query_against_connection(args)) + elif args.command == "validate": + validate(args) else: raise ValueError(f"Positional Argument '{args.command}' is not supported") diff --git a/data_validation/cli_tools.py b/data_validation/cli_tools.py index 40874846e..17996bf8f 100644 --- a/data_validation/cli_tools.py +++ b/data_validation/cli_tools.py @@ -25,20 +25,20 @@ data-validation connections add -c my_bq_conn BigQuery --project-id pso-kokoro-resources Step 2) Run Validation using supplied connections -data-validation run -t Column -sc my_bq_conn -tc my_bq_conn \ +data-validation validate column -sc my_bq_conn -tc my_bq_conn \ -tbls bigquery-public-data.new_york_citibike.citibike_trips,bigquery-public-data.new_york_citibike.citibike_stations \ --sum '*' --count '*' -python -m data_validation run -t GroupedColumn -sc my_bq_conn -tc my_bq_conn \ +python -m data_validation validate column -sc my_bq_conn -tc my_bq_conn \ -tbls bigquery-public-data.new_york_citibike.citibike_trips \ --grouped-columns starttime \ --sum tripduration --count tripduration -data-validation run -t Column \ +data-validation validate column \ -sc my_bq_conn -tc my_bq_conn \ -tbls bigquery-public-data.new_york_citibike.citibike_trips,bigquery-public-data.new_york_citibike.citibike_stations \ --sum tripduration,start_station_name --count tripduration,start_station_name \ --rc pso-kokoro-resources.pso_data_validator.results +-bqrh pso-kokoro-resources.pso_data_validator.results -c ex_yaml.yaml data-validation run-config -c ex_yaml.yaml @@ -141,7 +141,7 @@ def configure_arg_parser(): parser.add_argument("--verbose", "-v", action="store_true", help="Verbose logging") - # beta feature only available in run command + # beta feature only available in run/validate command if "beta" in sys.argv: parser.add_argument( "beta", @@ -151,13 +151,15 @@ def configure_arg_parser(): ) subparsers = parser.add_subparsers(dest="command") _configure_run_parser(subparsers) + _configure_validate_parser(subparsers) else: subparsers = parser.add_subparsers(dest="command") - _configure_run_parser(subparsers) + _configure_validate_parser(subparsers) _configure_run_config_parser(subparsers) _configure_connection_parser(subparsers) _configure_find_tables(subparsers) _configure_raw_query(subparsers) + _configure_run_parser(subparsers) return parser @@ -211,7 +213,7 @@ def _configure_run_parser(subparsers): # subparsers = parser.add_subparsers(dest="command") run_parser = subparsers.add_parser( - "run", help="Manually run a validation and optionally store to config" + "run", help="Run a validation and optionally store to config (deprecated)" ) run_parser.add_argument( @@ -335,6 +337,120 @@ def _configure_database_specific_parsers(parser): db_parser.add_argument(arg_field, help=help_txt) +def _configure_validate_parser(subparsers): + """Configure arguments to run validations.""" + validate_parser = subparsers.add_parser( + "validate", help="Run a validation and optionally store to config" + ) + + # Keep these in order to support data-validation run command for backwards-compatibility + validate_parser.add_argument("--type", "-t", help="Type of Data Validation") + validate_parser.add_argument( + "--result-handler-config", "-rc", help="Result handler config details" + ) + + validate_subparsers = validate_parser.add_subparsers(dest="validate_cmd") + + column_parser = validate_subparsers.add_parser( + "column", help="Run a column validation" + ) + _configure_column_parser(column_parser) + + schema_parser = validate_subparsers.add_parser( + "schema", help="Run a schema validation" + ) + _configure_schema_parser(schema_parser) + + +def _configure_column_parser(column_parser): + """Configure arguments to run column level validations.""" + _add_common_arguments(column_parser) + column_parser.add_argument( + "--count", + "-count", + help="Comma separated list of columns for count 'col_a,col_b' or * for all columns", + ) + column_parser.add_argument( + "--sum", + "-sum", + help="Comma separated list of columns for sum 'col_a,col_b' or * for all columns", + ) + column_parser.add_argument( + "--avg", + "-avg", + help="Comma separated list of columns for avg 'col_a,col_b' or * for all columns", + ) + column_parser.add_argument( + "--min", + "-min", + help="Comma separated list of columns for min 'col_a,col_b' or * for all columns", + ) + column_parser.add_argument( + "--max", + "-max", + help="Comma separated list of columns for max 'col_a,col_b' or * for all columns", + ) + column_parser.add_argument( + "--grouped-columns", + "-gc", + help="Comma separated list of columns to use in GroupBy 'col_a,col_b'", + ) + column_parser.add_argument( + "--primary-keys", + "-pk", + help="Comma separated list of primary key columns 'col_a,col_b'", + ) + column_parser.add_argument( + "--labels", "-l", help="Key value pair labels for validation run" + ) + column_parser.add_argument( + "--threshold", + "-th", + type=threshold_float, + help="Float max threshold for percent difference", + ) + column_parser.add_argument( + "--filters", + "-filters", + help="Filters in the format source_filter:target_filter", + ) + + +def _configure_schema_parser(schema_parser): + """Configure arguments to run column level validations.""" + _add_common_arguments(schema_parser) + + +def _add_common_arguments(parser): + parser.add_argument("--source-conn", "-sc", help="Source connection name") + parser.add_argument("--target-conn", "-tc", help="Target connection name") + parser.add_argument( + "--tables-list", + "-tbls", + help="Comma separated tables list in the form 'schema.table=target_schema.target_table'", + ) + parser.add_argument( + "--bq-result-handler", "-bqrh", help="BigQuery result handler config details" + ) + parser.add_argument( + "--service-account", + "-sa", + help="Path to SA key file for result handler output", + ) + parser.add_argument( + "--config-file", + "-c", + help="Store the validation in the YAML Config File Path specified", + ) + parser.add_argument( + "--format", + "-fmt", + default="table", + help="Set the format for printing command output, Supported formats are (text, csv, json, table). Defaults " + "to table", + ) + + def get_connection_config_from_args(args): """ Return dict with connection config supplied.""" config = {consts.SOURCE_TYPE: args.connect_type} diff --git a/data_validation/config_manager.py b/data_validation/config_manager.py index 3bbc31d8a..c1186ada6 100644 --- a/data_validation/config_manager.py +++ b/data_validation/config_manager.py @@ -67,12 +67,12 @@ def target_connection(self): @property def validation_type(self): - """Return string validation type (Column|GroupedColumn|Row).""" + """Return string validation type (Column|Schema).""" return self._config[consts.CONFIG_TYPE] def process_in_memory(self): if ( - self.validation_type == "Row" + self.is_grouped_row_validation and self.source_connection == self.target_connection ): return False @@ -115,7 +115,7 @@ def append_query_groups(self, grouped_column_configs): @property def primary_keys(self): - """ Return Query Groups from Config """ + """ Return Primary keys from Config """ return self._config.get(consts.CONFIG_PRIMARY_KEYS, []) def append_primary_keys(self, primary_key_configs): @@ -124,6 +124,12 @@ def append_primary_keys(self, primary_key_configs): self.primary_keys + primary_key_configs ) + @property + def is_grouped_row_validation(self): + """ Returns boolean indicating if validation type is a Grouped_Column + Row validation. """ + return self.primary_keys != [] + @property def filters(self): """Return Filters from Config """ diff --git a/data_validation/data_validation.py b/data_validation/data_validation.py index 2675dfdc0..50ba409d0 100644 --- a/data_validation/data_validation.py +++ b/data_validation/data_validation.py @@ -88,7 +88,7 @@ def __init__( # Leaving to to swast on the design of how this should look. def execute(self): """ Execute Queries and Store Results """ - if self.config_manager.validation_type == consts.ROW_VALIDATION: + if self.config_manager.is_grouped_row_validation: grouped_fields = self.validation_builder.pop_grouped_fields() result_df = self.execute_recursive_validation( self.validation_builder, grouped_fields diff --git a/docs/examples.md b/docs/examples.md index f854cdc54..edae92974 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -8,17 +8,17 @@ Also, note that if no aggregation flag is provided, the tool will run a 'COUNT * #### Simple COUNT(*) on a table ````shell script -data-validation run -t Column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips +data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips ```` #### Run multiple tables ````shell script -data-validation run -t Column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips,bigquery-public-data.new_york_citibike.citibike_stations +data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips,bigquery-public-data.new_york_citibike.citibike_stations ```` #### Store validation config to the file ````shell script -data-validation run -t Column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips -c citibike.yaml +data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips -c citibike.yaml ```` Above command creates a YAML file named citibike.yaml that can be used to run validations in the future. @@ -33,17 +33,17 @@ Above command executes validations stored in a config file named citibike.yaml. #### Run COUNT validations for all columns ````shell script -data-validation run -t Column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --count '*' +data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --count '*' ```` #### Run COUNT validations for selected columns ````shell script -data-validation run -t Column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --count bikeid,gender +data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --count bikeid,gender ```` #### Store results in a BigQuery table ````shell script -data-validation run -t Column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --count tripduration,start_station_name -bqrh $YOUR_PROJECT_ID.pso_data_validator.results +data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --count tripduration,start_station_name -bqrh $YOUR_PROJECT_ID.pso_data_validator.results ```` Please replace $YOUR_PROJECT_ID with the correct project-id where you created your results datasets as mentioned in the [installation](installation.md#setup) section. @@ -71,50 +71,48 @@ ORDER BY #### Run a single column GroupBy validation ````shell script -data-validation run -t GroupedColumn -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --grouped-columns bikeid +data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --grouped-columns bikeid ```` #### Run a multi-column GroupBy validation ````shell script -data-validation run -t GroupedColumn -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --grouped-columns bikeid,usertype +data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --grouped-columns bikeid,usertype ```` #### Apply single aggregation on a single field ````shell script -data-validation run -t Column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --sum num_bikes_available +data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --sum num_bikes_available ```` #### Apply single aggregation on multiple fields ````shell script -data-validation run -t Column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --sum num_bikes_available,num_docks_available +data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --sum num_bikes_available,num_docks_available ```` #### Apply different aggregations on multiple fields ````shell script -data-validation run -t Column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --sum num_bikes_available,num_docks_available --avg num_bikes_disabled,num_docks_disabled +data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --sum num_bikes_available,num_docks_available --avg num_bikes_disabled,num_docks_disabled ```` #### Apply different aggregations on multiple fields and apply GroupBy ````shell script -data-validation run -t GroupedColumn -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --grouped-columns region_id --sum num_bikes_available,num_docks_available --avg num_bikes_disabled,num_docks_disabled +data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --grouped-columns region_id --sum num_bikes_available,num_docks_available --avg num_bikes_disabled,num_docks_disabled ```` #### Apply filters ````shell script -data-validation run -t GroupedColumn -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --grouped-columns region_id --sum num_bikes_available,num_docks_available --filters 'region_id=71' -bqrh $YOUR_PROJECT_ID.pso_data_validator.results +data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --grouped-columns region_id --sum num_bikes_available,num_docks_available --filters 'region_id=71' -bqrh $YOUR_PROJECT_ID.pso_data_validator.results ```` #### Apply labels ````shell script -data-validation run -t Column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --count tripduration,start_station_name -l tag=test-run,owner=name +data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --count tripduration,start_station_name -l tag=test-run,owner=name ```` #### Run a schema validation ````shell script -# Schema validation will ignore irrelevant flags (count, sum, filters, etc.) -# Labels are not supported for schema validation -data-validation run -t Schema -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips -bqrh $YOUR_PROJECT_ID.pso_data_validator.results +data-validation validate schema -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips -bqrh $YOUR_PROJECT_ID.pso_data_validator.results ```` #### Run validation on a file @@ -125,7 +123,7 @@ pip install fsspec data-validation connections add --connection-name file_conn FileSystem --table-name my_local_file --file-path gs://path/to/file --file-type csv -data-validation run -t Column -sc file_conn -tc file_conn -tbls my_local_file --count name +data-validation validate column -sc file_conn -tc file_conn -tbls my_local_file --count name ```` #### Run custom SQL @@ -135,7 +133,7 @@ data-validation query --query, -q The Raw query to run against the supplied connection ```` -#### Sample YAML file (GroupedColumn validation) +#### Sample YAML file (Grouped Column validation) ```yaml result_handler: project_id: my-project-id @@ -175,5 +173,5 @@ validations: target_schema_name: bigquery-public-data.new_york_citibike target_table_name: citibike_stations threshold: 0.0 - type: GroupedColumn + type: Column ``` \ No newline at end of file diff --git a/samples/bq_result_handler_grouped.py b/samples/bq_result_handler_grouped.py index f9ba3dcd2..a421eb271 100644 --- a/samples/bq_result_handler_grouped.py +++ b/samples/bq_result_handler_grouped.py @@ -25,7 +25,7 @@ "source_conn": BQ_CONN, "target_conn": BQ_CONN, # Validation Type - "type": "GroupedColumn", + "type": "Column", # Configuration Required Depending on Validator Type "schema_name": "bigquery-public-data.new_york_citibike", "table_name": "citibike_trips", diff --git a/tests/system/data_sources/test_bigquery.py b/tests/system/data_sources/test_bigquery.py index 6cd31e29c..5bebe4d5f 100644 --- a/tests/system/data_sources/test_bigquery.py +++ b/tests/system/data_sources/test_bigquery.py @@ -69,7 +69,7 @@ consts.CONFIG_SOURCE_CONN: BQ_CONN, consts.CONFIG_TARGET_CONN: BQ_CONN, # Validation Type - consts.CONFIG_TYPE: "GroupedColumn", + consts.CONFIG_TYPE: "Column", # Configuration Required Depending on Validator Type consts.CONFIG_SCHEMA_NAME: "bigquery-public-data.new_york_citibike", consts.CONFIG_TABLE_NAME: "citibike_trips", diff --git a/tests/system/result_handlers/test_bigquery.py b/tests/system/result_handlers/test_bigquery.py index 5d6c1a182..fa02bd5f8 100644 --- a/tests/system/result_handlers/test_bigquery.py +++ b/tests/system/result_handlers/test_bigquery.py @@ -118,7 +118,7 @@ def test_execute_with_nan(bigquery_client, bigquery_dataset_id): _NAN, _NAN, ], - "validation_type": ["GroupedColumn"] * 6, + "validation_type": ["Column"] * 6, "aggregation_type": ["count"] * 6, "validation_name": ["count"] * 6, "source_agg_value": ["2", "4", _NAN, _NAN, "6", "8"], diff --git a/tests/unit/test__main.py b/tests/unit/test__main.py index 4e1077807..5b553ca50 100644 --- a/tests/unit/test__main.py +++ b/tests/unit/test__main.py @@ -21,8 +21,7 @@ TEST_CONN = '{"source_type":"Example"}' CLI_ARGS = { - "command": "run", - "type": "Column", + "validate_cmd": "column", "source_conn": TEST_CONN, "target_conn": TEST_CONN, "tables_list": "my_schema.my_table", diff --git a/tests/unit/test_cli_tools.py b/tests/unit/test_cli_tools.py index ff7321cb0..7ac30ebce 100644 --- a/tests/unit/test_cli_tools.py +++ b/tests/unit/test_cli_tools.py @@ -21,6 +21,21 @@ TEST_CONN = '{"source_type":"Example"}' CLI_ARGS = { + "command": "validate", + "validate_cmd": "column", + "source_conn": TEST_CONN, + "target_conn": TEST_CONN, + "tables_list": "my_schema.my_table", + "sum": "col_a,col_b", + "count": "col_a,col_b", + "config_file": "example_test.yaml", + "labels": "name=test_run", + "threshold": 30.0, + "verbose": True, +} + +# Define CLI 'Run' command args for backwards compatibility +CLI_RUN_ARGS = { "command": "run", "type": "Column", "source_conn": TEST_CONN, @@ -59,7 +74,21 @@ "argparse.ArgumentParser.parse_args", return_value=argparse.Namespace(**CLI_ARGS), ) def test_get_parsed_args(mock_args): - """Test arg parser values.""" + """Test arg parser values with validate command.""" + args = cli_tools.get_parsed_args() + assert args.command == "validate" + assert args.labels == "name=test_run" + assert args.threshold == 30.0 + assert args.verbose + + +# Test run command for backwards compatibility +@mock.patch( + "argparse.ArgumentParser.parse_args", + return_value=argparse.Namespace(**CLI_RUN_ARGS), +) +def test_get_run_parsed_args(mock_args): + """Test arg parser values with run command.""" args = cli_tools.get_parsed_args() assert args.command == "run" assert args.labels == "name=test_run" diff --git a/tests/unit/test_combiner.py b/tests/unit/test_combiner.py index 7c6721ff4..64461633e 100644 --- a/tests/unit/test_combiner.py +++ b/tests/unit/test_combiner.py @@ -369,7 +369,7 @@ def test_generate_report_without_group_by( target_table_name="test_target", target_table_schema="bq-public.target_dataset", target_column_name=None, - validation_type="GroupedColumn", + validation_type="Column", aggregation_type="count", threshold=7.0, ), @@ -388,7 +388,7 @@ def test_generate_report_without_group_by( "source_column_name": [None] * 4, "target_table_name": ["bq-public.target_dataset.test_target"] * 4, "target_column_name": [None] * 4, - "validation_type": ["GroupedColumn"] * 4, + "validation_type": ["Column"] * 4, "aggregation_type": ["count"] * 4, "validation_name": ["count"] * 4, "source_agg_value": ["2", "4", "8", "16"], @@ -420,7 +420,7 @@ def test_generate_report_without_group_by( target_table_name="test_target", target_table_schema="bq-public.target_dataset", target_column_name=None, - validation_type="GroupedColumn", + validation_type="Column", aggregation_type="count", threshold=100.0, ), @@ -439,7 +439,7 @@ def test_generate_report_without_group_by( "source_column_name": [None] * 2, "target_table_name": ["bq-public.target_dataset.test_target"] * 2, "target_column_name": [None] * 2, - "validation_type": ["GroupedColumn"] * 2, + "validation_type": ["Column"] * 2, "aggregation_type": ["count"] * 2, "validation_name": ["count"] * 2, "source_agg_value": ["1", "2"], @@ -478,7 +478,7 @@ def test_generate_report_without_group_by( target_table_name="test_target", target_table_schema="bq-public.target_dataset", target_column_name=None, - validation_type="GroupedColumn", + validation_type="Column", aggregation_type="count", threshold=25.0, ), @@ -511,7 +511,7 @@ def test_generate_report_without_group_by( _NAN, ], "target_column_name": [None] * 6, - "validation_type": ["GroupedColumn"] * 6, + "validation_type": ["Column"] * 6, "aggregation_type": ["count"] * 6, "validation_name": ["count"] * 6, "source_agg_value": ["2", "4", _NAN, _NAN, "6", "8"], diff --git a/tests/unit/test_config_manager.py b/tests/unit/test_config_manager.py index 35ba739df..fca9b9640 100644 --- a/tests/unit/test_config_manager.py +++ b/tests/unit/test_config_manager.py @@ -165,7 +165,14 @@ def test_process_in_memory(module_under_test): assert config_manager.process_in_memory() is True config_manager._config = copy.deepcopy(config_manager._config) - config_manager._config[consts.CONFIG_TYPE] = "Row" + config_manager._config[consts.CONFIG_PRIMARY_KEYS] = [ + { + consts.CONFIG_FIELD_ALIAS: "id", + consts.CONFIG_SOURCE_COLUMN: "id", + consts.CONFIG_TARGET_COLUMN: "id", + consts.CONFIG_CAST: None, + }, + ] assert config_manager.process_in_memory() is False diff --git a/tests/unit/test_data_validation.py b/tests/unit/test_data_validation.py index 1b3d3efe3..1ca079d22 100644 --- a/tests/unit/test_data_validation.py +++ b/tests/unit/test_data_validation.py @@ -101,12 +101,13 @@ consts.CONFIG_FORMAT: "table", } -SAMPLE_ROW_CONFIG = { +# Grouped Column Row confg +SAMPLE_GC_ROW_CONFIG = { # BigQuery Specific Connection Config "source_conn": SOURCE_CONN_CONFIG, "target_conn": TARGET_CONN_CONFIG, # Validation Type - consts.CONFIG_TYPE: "Row", + consts.CONFIG_TYPE: "Column", consts.CONFIG_MAX_RECURSIVE_QUERY_SIZE: 50, # Configuration Required Depending on Validator Type "schema_name": None, @@ -141,12 +142,12 @@ consts.CONFIG_FORMAT: "table", } -SAMPLE_ROW_CALC_CONFIG = { +SAMPLE_GC_ROW_CALC_CONFIG = { # BigQuery Specific Connection Config "source_conn": SOURCE_CONN_CONFIG, "target_conn": TARGET_CONN_CONFIG, # Validation Type - consts.CONFIG_TYPE: "Row", + consts.CONFIG_TYPE: "Column", consts.CONFIG_MAX_RECURSIVE_QUERY_SIZE: 50, # Configuration Required Depending on Validator Type "schema_name": None, @@ -427,7 +428,7 @@ def test_row_level_validation_perfect_match(module_under_test, fs): _create_table_file(SOURCE_TABLE_FILE_PATH, json_data) _create_table_file(TARGET_TABLE_FILE_PATH, json_data) - client = module_under_test.DataValidation(SAMPLE_ROW_CONFIG) + client = module_under_test.DataValidation(SAMPLE_GC_ROW_CONFIG) result_df = client.execute() expected_date_result = '{"date_value": "%s"}' % str(datetime.now().date()) @@ -444,7 +445,7 @@ def test_calc_field_validation_calc_match(module_under_test, fs): _create_table_file(SOURCE_TABLE_FILE_PATH, json_data) _create_table_file(TARGET_TABLE_FILE_PATH, json_data) - client = module_under_test.DataValidation(SAMPLE_ROW_CALC_CONFIG) + client = module_under_test.DataValidation(SAMPLE_GC_ROW_CALC_CONFIG) result_df = client.execute() calc_val_df = result_df[result_df["validation_name"] == "sum_length"] calc_val_df2 = result_df[result_df["validation_name"] == "sum_concat_length"] @@ -466,7 +467,7 @@ def test_row_level_validation_non_matching(module_under_test, fs): _create_table_file(SOURCE_TABLE_FILE_PATH, source_json_data) _create_table_file(TARGET_TABLE_FILE_PATH, target_json_data) - client = module_under_test.DataValidation(SAMPLE_ROW_CONFIG, verbose=True) + client = module_under_test.DataValidation(SAMPLE_GC_ROW_CONFIG, verbose=True) result_df = client.execute() validation_df = result_df[result_df["validation_name"] == "count_text_value"] @@ -492,7 +493,7 @@ def test_row_level_validation_smart_count(module_under_test, fs): _create_table_file(SOURCE_TABLE_FILE_PATH, source_json_data) _create_table_file(TARGET_TABLE_FILE_PATH, target_json_data) - client = module_under_test.DataValidation(SAMPLE_ROW_CONFIG) + client = module_under_test.DataValidation(SAMPLE_GC_ROW_CONFIG) result_df = client.execute() expected_date_result = '{"date_value": "%s"}' % str(datetime.now().date()) @@ -511,7 +512,7 @@ def test_row_level_validation_multiple_aggregations(module_under_test, fs): _create_table_file(SOURCE_TABLE_FILE_PATH, source_json_data) _create_table_file(TARGET_TABLE_FILE_PATH, target_json_data) - client = module_under_test.DataValidation(SAMPLE_ROW_CONFIG, verbose=True) + client = module_under_test.DataValidation(SAMPLE_GC_ROW_CONFIG, verbose=True) result_df = client.execute() validation_df = result_df[result_df["validation_name"] == "count_text_value"]