From 7a218d2b516d480dad05f6fb52ed6347aef429ed Mon Sep 17 00:00:00 2001 From: Robby <40561036+Robby29@users.noreply.github.com> Date: Wed, 23 Mar 2022 21:08:24 +0530 Subject: [PATCH] feat: Support for custom query (#390) * added custom-query sub-option to validate command * add source and target query option in custom query * added min,max,sum aggregates with custom query * fixed hive t0 column name addition issue * added empty query file check * linting fixes * added unit tests * incorporated black linting changes * incorporated flake linter changes --- README.md | 43 ++++++++++++++ data_validation/__main__.py | 12 +++- data_validation/cli_tools.py | 82 ++++++++++++++++++++++++- data_validation/combiner.py | 3 + data_validation/config_manager.py | 22 +++++++ data_validation/consts.py | 4 ++ data_validation/validation_builder.py | 86 ++++++++++++++++++++++++++- docs/examples.md | 17 ++++++ tests/resources/custom-query.sql | 1 + tests/unit/test_validation_builder.py | 75 +++++++++++++++++++++++ 10 files changed, 340 insertions(+), 5 deletions(-) create mode 100644 tests/resources/custom-query.sql diff --git a/README.md b/README.md index 9e90aca80..88c32989b 100644 --- a/README.md +++ b/README.md @@ -216,6 +216,49 @@ data-validation (--verbose or -v) validate schema Defaults to table. ``` +#### Custom Query Validations + +Below is the command syntax for custom query validations. + +``` +data-validation (--verbose or -v) validate custom-query + --source-conn or -sc SOURCE_CONN + Source connection details + See: *Data Source Configurations* section for each data source + --target-conn or -tc TARGET_CONN + Target connection details + See: *Connections* section for each data source + --tables-list or -tbls SOURCE_SCHEMA.SOURCE_TABLE=TARGET_SCHEMA.TARGET_TABLE + Comma separated list of tables in the form schema.table=target_schema.target_table + Target schema name and table name are optional. + i.e 'bigquery-public-data.new_york_citibike.citibike_trips' + --source-query-file SOURCE_QUERY_FILE, -sqf SOURCE_QUERY_FILE + File containing the source sql commands + --target-query-file TARGET_QUERY_FILE, -tqf TARGET_QUERY_FILE + File containing the target sql commands + [--count COLUMNS] Comma separated list of columns for count or * for all columns + [--sum COLUMNS] Comma separated list of columns for sum or * for all numeric + [--min COLUMNS] Comma separated list of columns for min or * for all numeric + [--max COLUMNS] Comma separated list of columns for max or * for all numeric + [--avg COLUMNS] Comma separated list of columns for avg or * for all numeric + [--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE] + BigQuery destination for validation results. Defaults to stdout. + See: *Validation Reports* section + [--service-account or -sa PATH_TO_SA_KEY] + Service account to use for BigQuery result handler output. + [--labels or -l KEY1=VALUE1,KEY2=VALUE2] + Comma-separated key value pair labels for the run. + [--format or -fmt] Format for stdout output. Supported formats are (text, csv, json, table). + Defaults to table. +``` + +The default aggregation type is a 'COUNT *'. If no aggregation flag (i.e count, +sum , min, etc.) is provided, the default aggregation will run. + +The [Examples](docs/examples.md) page provides few examples of how this tool can +used to run custom query validations. + + ### Running Custom SQL Exploration There are many occasions where you need to explore a data source while running diff --git a/data_validation/__main__.py b/data_validation/__main__.py index c3c75e66a..0a35e7664 100644 --- a/data_validation/__main__.py +++ b/data_validation/__main__.py @@ -150,6 +150,14 @@ def build_config_from_args(args, config_manager): # TODO(GH#18): Add query filter config logic + if config_manager.validation_type == consts.CUSTOM_QUERY: + config_manager.append_aggregates(get_aggregate_config(args, config_manager)) + if args.source_query_file is not None: + query_file = cli_tools.get_arg_list(args.source_query_file) + config_manager.append_source_query_file(query_file) + if args.target_query_file is not None: + query_file = cli_tools.get_arg_list(args.target_query_file) + config_manager.append_target_query_file(query_file) return config_manager @@ -165,6 +173,8 @@ def build_config_managers_from_args(args): config_type = consts.COLUMN_VALIDATION elif validate_cmd == "Row": config_type = consts.ROW_VALIDATION + elif validate_cmd == "Custom-query": + config_type = consts.CUSTOM_QUERY else: raise ValueError(f"Unknown Validation Type: {validate_cmd}") else: @@ -436,7 +446,7 @@ def run_validation_configs(args): def validate(args): """ Run commands related to data validation.""" - if args.validate_cmd in ["column", "row", "schema"]: + if args.validate_cmd in ["column", "row", "schema", "custom-query"]: run(args) else: raise ValueError(f"Validation Argument '{args.validate_cmd}' is not supported") diff --git a/data_validation/cli_tools.py b/data_validation/cli_tools.py index b50ba3e9a..530c48ee2 100644 --- a/data_validation/cli_tools.py +++ b/data_validation/cli_tools.py @@ -151,7 +151,6 @@ def configure_arg_parser(): _configure_raw_query(subparsers) _configure_run_parser(subparsers) _configure_beta_parser(subparsers) - return parser @@ -385,6 +384,11 @@ def _configure_validate_parser(subparsers): ) _configure_schema_parser(schema_parser) + custom_query_parser = validate_subparsers.add_parser( + "custom-query", help="Run a custom query validation" + ) + _configure_custom_query_parser(custom_query_parser) + def _configure_row_parser(row_parser): """Configure arguments to run row level validations.""" @@ -531,6 +535,82 @@ def _configure_schema_parser(schema_parser): _add_common_arguments(schema_parser) +def _configure_custom_query_parser(custom_query_parser): + """Configure arguments to run custom-query validations.""" + _add_common_arguments(custom_query_parser) + custom_query_parser.add_argument( + "--source-query-file", "-sqf", help="File containing the source sql query", + ) + custom_query_parser.add_argument( + "--target-query-file", "-tqf", help="File containing the target sql query", + ) + custom_query_parser.add_argument( + "--count", + "-count", + help="Comma separated list of columns for count 'col_a,col_b' or * for all columns", + ) + custom_query_parser.add_argument( + "--sum", + "-sum", + help="Comma separated list of columns for sum 'col_a,col_b' or * for all columns", + ) + custom_query_parser.add_argument( + "--avg", + "-avg", + help="Comma separated list of columns for avg 'col_a,col_b' or * for all columns", + ) + custom_query_parser.add_argument( + "--min", + "-min", + help="Comma separated list of columns for min 'col_a,col_b' or * for all columns", + ) + custom_query_parser.add_argument( + "--max", + "-max", + help="Comma separated list of columns for max 'col_a,col_b' or * for all columns", + ) + custom_query_parser.add_argument( + "--bit_xor", + "-bit_xor", + help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns", + ) + custom_query_parser.add_argument( + "--hash", + "-hash", + help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns", + ) + custom_query_parser.add_argument( + "--filters", + "-filters", + help="Filters in the format source_filter:target_filter", + ) + custom_query_parser.add_argument( + "--labels", "-l", help="Key value pair labels for validation run" + ) + custom_query_parser.add_argument( + "--threshold", + "-th", + type=threshold_float, + help="Float max threshold for percent difference", + ) + custom_query_parser.add_argument( + "--use-random-row", + "-rr", + action="store_true", + help="Finds a set of random rows of the first primary key supplied.", + ) + custom_query_parser.add_argument( + "--random-row-batch-size", + "-rbs", + help="Row batch size used for random row filters (default 10,000).", + ) + custom_query_parser.add_argument( + "--primary-keys", + "-pk", + help="Comma separated list of primary key columns 'col_a,col_b'", + ) + + def _add_common_arguments(parser): parser.add_argument("--source-conn", "-sc", help="Source connection name") parser.add_argument("--target-conn", "-tc", help="Target connection name") diff --git a/data_validation/combiner.py b/data_validation/combiner.py index 328607e8a..8c5fff5c7 100644 --- a/data_validation/combiner.py +++ b/data_validation/combiner.py @@ -72,6 +72,7 @@ def generate_report( "Expected source and target to have same schema, got " f"source: {source_names} target: {target_names}" ) + differences_pivot = _calculate_differences( source, target, join_on_fields, run_metadata.validations, is_value_comparison ) @@ -194,6 +195,7 @@ def _calculate_differences( ) ] ) + differences_pivot = functools.reduce( lambda pivot1, pivot2: pivot1.union(pivot2), differences_pivots ) @@ -233,6 +235,7 @@ def _pivot_result(result, join_on_fields, validations, result_type): + join_on_fields ) ) + pivot = functools.reduce(lambda pivot1, pivot2: pivot1.union(pivot2), pivots) return pivot diff --git a/data_validation/config_manager.py b/data_validation/config_manager.py index c384672e5..939217df7 100644 --- a/data_validation/config_manager.py +++ b/data_validation/config_manager.py @@ -139,6 +139,28 @@ def append_query_groups(self, grouped_column_configs): self.query_groups + grouped_column_configs ) + @property + def source_query_file(self): + """ Return SQL Query File from Config """ + return self._config.get(consts.CONFIG_SOURCE_QUERY_FILE, []) + + def append_source_query_file(self, query_file_configs): + """Append grouped configs to existing config.""" + self._config[consts.CONFIG_SOURCE_QUERY_FILE] = ( + self.source_query_file + query_file_configs + ) + + @property + def target_query_file(self): + """ Return SQL Query File from Config """ + return self._config.get(consts.CONFIG_TARGET_QUERY_FILE, []) + + def append_target_query_file(self, query_file_configs): + """Append grouped configs to existing config.""" + self._config[consts.CONFIG_TARGET_QUERY_FILE] = ( + self.target_query_file + query_file_configs + ) + @property def primary_keys(self): """ Return Primary keys from Config """ diff --git a/data_validation/consts.py b/data_validation/consts.py index 09ffbf2b7..5743e511b 100644 --- a/data_validation/consts.py +++ b/data_validation/consts.py @@ -47,6 +47,8 @@ CONFIG_FILTER_SOURCE = "source" CONFIG_FILTER_TARGET = "target" CONFIG_MAX_RECURSIVE_QUERY_SIZE = "max_recursive_query_size" +CONFIG_SOURCE_QUERY_FILE = "source_query_file" +CONFIG_TARGET_QUERY_FILE = "target_query_file" CONFIG_FILTER_SOURCE_COLUMN = "source_column" CONFIG_FILTER_SOURCE_VALUE = "source_value" @@ -71,12 +73,14 @@ GROUPED_COLUMN_VALIDATION = "GroupedColumn" ROW_VALIDATION = "Row" SCHEMA_VALIDATION = "Schema" +CUSTOM_QUERY = "Custom-query" CONFIG_TYPES = [ COLUMN_VALIDATION, GROUPED_COLUMN_VALIDATION, ROW_VALIDATION, SCHEMA_VALIDATION, + CUSTOM_QUERY, ] # State Manager Fields diff --git a/data_validation/validation_builder.py b/data_validation/validation_builder.py index 2af152e40..6020d2bdc 100644 --- a/data_validation/validation_builder.py +++ b/data_validation/validation_builder.py @@ -73,7 +73,13 @@ def clone(self): @staticmethod def get_query_builder(validation_type): """ Return Query Builder object given validation type """ - if validation_type in ["Column", "GroupedColumn", "Row", "Schema"]: + if validation_type in [ + "Column", + "GroupedColumn", + "Row", + "Schema", + "Custom-query", + ]: builder = QueryBuilder.build_count_validator() else: msg = "Validation Builder supplied unknown type: %s" % validation_type @@ -327,7 +333,21 @@ def get_source_query(self): "schema_name": self.config_manager.source_schema, "table_name": self.config_manager.source_table, } - query = self.source_builder.compile(**source_config) + if self.validation_type == consts.CUSTOM_QUERY: + source_input_query = self.get_query_from_file( + self.config_manager.source_query_file[0] + ) + source_aggregate_query = "SELECT " + for aggregate in self.config_manager.aggregates: + source_aggregate_query += self.get_aggregation_query( + aggregate.get("type"), aggregate.get("target_column") + ) + source_aggregate_query = self.get_wrapper_aggregation_query( + source_aggregate_query, source_input_query + ) + query = self.source_client.sql(source_aggregate_query) + else: + query = self.source_builder.compile(**source_config) if self.verbose: print(source_config) print("-- ** Source Query ** --") @@ -342,7 +362,22 @@ def get_target_query(self): "schema_name": self.config_manager.target_schema, "table_name": self.config_manager.target_table, } - query = self.target_builder.compile(**target_config) + if self.validation_type == consts.CUSTOM_QUERY: + target_input_query = self.get_query_from_file( + self.config_manager.target_query_file[0] + ) + target_aggregate_query = "SELECT " + for aggregate in self.config_manager.aggregates: + target_aggregate_query += self.get_aggregation_query( + aggregate.get("type"), aggregate.get("target_column") + ) + + target_aggregate_query = self.get_wrapper_aggregation_query( + target_aggregate_query, target_input_query + ) + query = self.target_client.sql(target_aggregate_query) + else: + query = self.target_builder.compile(**target_config) if self.verbose: print(target_config) print("-- ** Target Query ** --") @@ -358,3 +393,48 @@ def add_query_limit(self): limit = self.config_manager.query_limit self.source_builder.limit = limit self.target_builder.limit = limit + + def get_query_from_file(self, filename): + """ Return query from input file """ + query = "" + try: + file = open(filename, "r") + query = file.read() + except IOError: + print("Cannot read query file: ", filename) + + if not query or query.isspace(): + raise ValueError( + "Expected file with sql query, got empty file or file with white spaces. " + f"input file: {filename}" + ) + file.close() + return query + + def get_aggregation_query(self, agg_type, column_name): + """ Return aggregation query """ + aggregation_query = "" + if column_name is None: + aggregation_query = agg_type + "(*) as " + agg_type + "," + else: + aggregation_query = ( + agg_type + + "(" + + column_name + + ") as " + + agg_type + + "__" + + column_name + + "," + ) + return aggregation_query + + def get_wrapper_aggregation_query(self, aggregate_query, base_query): + """ Return wrapper aggregation query """ + + return ( + aggregate_query[: len(aggregate_query) - 1] + + " FROM (" + + base_query + + ") as base_query" + ) diff --git a/docs/examples.md b/docs/examples.md index d136cb41e..650703e67 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -184,3 +184,20 @@ validations: threshold: 0.0 type: Column ``` + +#### Run a custom query validation +````shell script +data-validation validate custom-query --source-query-file source_query.sql --target-query-file target_query.sql -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations +```` + +#### Run a custom query validation with sum aggregation +````shell script +data-validation validate custom-query --source-query-file source_query.sql --target-query-file target_query.sql -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --sum num_bikes_available +```` + +#### Run a custom query validation with max aggregation +````shell script +data-validation validate custom-query --source-query-file source_query.sql --target-query-file target_query.sql -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --max num_bikes_available +```` + +Please replace source_query.sql and target_query.sql with the correct files containing sql query for source and target database respectively. \ No newline at end of file diff --git a/tests/resources/custom-query.sql b/tests/resources/custom-query.sql new file mode 100644 index 000000000..38cd9b8d6 --- /dev/null +++ b/tests/resources/custom-query.sql @@ -0,0 +1 @@ +SELECT * FROM bigquery-public-data.usa_names.usa_1910_2013 \ No newline at end of file diff --git a/tests/unit/test_validation_builder.py b/tests/unit/test_validation_builder.py index cb50fcf73..a537fae42 100644 --- a/tests/unit/test_validation_builder.py +++ b/tests/unit/test_validation_builder.py @@ -40,6 +40,22 @@ ], } +CUSTOM_QUERY_VALIDATION_CONFIG = { + # BigQuery Specific Connection Config + "source_conn": None, + "target_conn": None, + # Validation Type + consts.CONFIG_TYPE: "Custom-query", + # Configuration Required Depending on Validator Type + consts.CONFIG_SCHEMA_NAME: "bigquery-public-data.new_york_citibike", + consts.CONFIG_TABLE_NAME: "citibike_trips", + consts.CONFIG_CALCULATED_FIELDS: [], + consts.CONFIG_GROUPED_COLUMNS: [], + consts.CONFIG_FILTERS: [], + consts.CONFIG_SOURCE_QUERY_FILE: "tests/resources/custom-query.sql", + consts.CONFIG_TARGET_QUERY_FILE: "tests/resources/custom-query.sql", +} + QUERY_LIMIT = 100 COLUMN_VALIDATION_CONFIG_LIMIT = deepcopy(COLUMN_VALIDATION_CONFIG) COLUMN_VALIDATION_CONFIG_LIMIT[consts.CONFIG_LIMIT] = QUERY_LIMIT @@ -116,6 +132,9 @@ }, ] +AGGREGATION_QUERY = "sum(starttime) as sum_starttime," +BASE_QUERY = "SELECT * FROM bigquery-public-data.usa_names.usa_1910_2013" + class MockIbisClient(object): _source_type = "BigQuery" @@ -208,3 +227,59 @@ def test_validation_add_filters(module_under_test): filter_field = builder.source_builder.filters[0] assert filter_field.left == "column_name > 100" + + +def test_custom_query_validation(module_under_test): + mock_config_manager = ConfigManager( + CUSTOM_QUERY_VALIDATION_CONFIG, + MockIbisClient(), + MockIbisClient(), + verbose=False, + ) + builder = module_under_test.ValidationBuilder(mock_config_manager) + + assert not builder.verbose + assert ( + builder.config_manager.source_query_file == "tests/resources/custom-query.sql" + ) + + +def test_custom_query_get_query_from_file(module_under_test): + mock_config_manager = ConfigManager( + CUSTOM_QUERY_VALIDATION_CONFIG, + MockIbisClient(), + MockIbisClient(), + verbose=False, + ) + builder = module_under_test.ValidationBuilder(mock_config_manager) + query = builder.get_query_from_file(builder.config_manager.source_query_file) + assert query == "SELECT * FROM bigquery-public-data.usa_names.usa_1910_2013" + + +def test_custom_query_get_aggregation_query(module_under_test): + mock_config_manager = ConfigManager( + CUSTOM_QUERY_VALIDATION_CONFIG, + MockIbisClient(), + MockIbisClient(), + verbose=False, + ) + builder = module_under_test.ValidationBuilder(mock_config_manager) + aggregation_query = builder.get_aggregation_query( + AGGREGATES_TEST[0]["type"], AGGREGATES_TEST[0]["source_column"] + ) + assert aggregation_query == "sum(starttime) as sum__starttime," + + +def test_custom_query_get_wrapper_aggregation_query(module_under_test): + mock_config_manager = ConfigManager( + CUSTOM_QUERY_VALIDATION_CONFIG, + MockIbisClient(), + MockIbisClient(), + verbose=False, + ) + builder = module_under_test.ValidationBuilder(mock_config_manager) + wrapper_query = builder.get_wrapper_aggregation_query(AGGREGATION_QUERY, BASE_QUERY) + assert ( + wrapper_query + == "sum(starttime) as sum_starttime FROM (SELECT * FROM bigquery-public-data.usa_names.usa_1910_2013) as base_query" + )