From f79c30873e9ec7a0377cdbce4e781cdf69a2a305 Mon Sep 17 00:00:00 2001 From: Mohammed Turky <87987893+mohdt786@users.noreply.github.com> Date: Wed, 25 Jan 2023 09:49:43 +0530 Subject: [PATCH] feat: Added Partition support to generate multiple YAML config files (#653) (Issue #619,#662) Features: 1. New command 'generate-table-partitions' added to generate partitions for `row` type validation 2. --partition-num: Number of partitions/config files to create. Range=[1,1000] If specified value is greater than count(*), value if coalesced to count(*) 3. --config-dir: Directory Path to store YAML Config Files. Either local or GCS path can be supplied 5. Added required arguments group to distinguish from optional arguments 6. Added mutually exclusive arguments group for --hash and --concat 7. --partition-key: Column on which the partitions would be generated. Column type must be integer. Defaults to Primary key Tests: Added unit tests for partition_builder.py, provides coverage for partition_row_builder.py README.md & examples.md: 1. Added description for usage of 'generate-table-partitions' command 2. Added examples --- .gitignore | 6 + README.md | 43 ++ data_validation/__main__.py | 52 +- data_validation/cli_tools.py | 180 ++++++- data_validation/partition_builder.py | 277 ++++++++++ .../query_builder/partition_row_builder.py | 73 +++ data_validation/state_manager.py | 28 ++ docs/examples.md | 6 + tests/unit/test_partition_builder.py | 475 ++++++++++++++++++ 9 files changed, 1124 insertions(+), 16 deletions(-) create mode 100644 data_validation/partition_builder.py create mode 100644 data_validation/query_builder/partition_row_builder.py create mode 100644 tests/unit/test_partition_builder.py diff --git a/.gitignore b/.gitignore index ac377563f..740f991dd 100644 --- a/.gitignore +++ b/.gitignore @@ -84,7 +84,13 @@ terraform.rc # Custom *.yaml +partitions_dir # Test temp files source_table_data.json target_table_data.json +source-sql.sql + +# Commit msg +commits.txt +test.py \ No newline at end of file diff --git a/README.md b/README.md index fa55fa692..b9636964c 100644 --- a/README.md +++ b/README.md @@ -197,7 +197,50 @@ data-validation (--verbose or -v) (--log-level or -ll) validate row [--filter-status or -fs STATUSES_LIST] Comma separated list of statuses to filter the validation results. Supported statuses are (success, fail). If no list is provided, all statuses are returned. ``` +#### Generate Table Partitions for Large Table Row Validations +Below is the command syntax for generating table partitions in order to perform row validations on large tables with memory constraints. + +The command generates and stores multiple YAML configs that represent chunks of the large table using filters (`WHERE partition_key > X AND partition_key < Y`). You can then run the configs in the directory serially with the `data-validation configs run --config-dir PATH` command as described [here](https://github.com/GoogleCloudPlatform/professional-services-data-validator#yaml-configuration-files). + +The command takes the same parameters as required for `Row Validation` *plus* few commands to implement the partitioning logic. + +(Note: As of now, only monotonically increasing key is supported for `--partition-key`.) + +``` +data-validation (--verbose or -v) (--log-level or -ll) generate-table-partitions + + --source-conn or -sc SOURCE_CONN + Source connection details + See: *Data Source Configurations* section for each data source + --target-conn or -tc TARGET_CONN + Target connection details + See: *Connections* section for each data source + --tables-list or -tbls SOURCE_SCHEMA.SOURCE_TABLE=TARGET_SCHEMA.TARGET_TABLE + Comma separated list of tables in the form schema.table=target_schema.target_table + Target schema name and table name are optional. + i.e 'bigquery-public-data.new_york_citibike.citibike_trips' + --primary-keys PRIMARY_KEYS, -pk PRIMARY_KEYS + Comma separated list of primary key columns 'col_a,col_b' + --comparison-fields or -comp-fields FIELDS + Comma separated list of columns to compare. Can either be a physical column or an alias + See: *Calculated Fields* section for details + --hash COLUMNS Comma separated list of columns to hash or * for all columns + --concat COLUMNS Comma separated list of columns to concatenate or * for all columns (use if a common hash function is not available between databases) + --config-dir CONFIG_DIR, -cdir CONFIG_DIR + Directory Path to store YAML Config Files + GCS: Provide a full gs:// path of the target directory. Eg: `gs:///partitions_dir` + Local: Provide a relative path of the target directory. Eg: `partitions_dir` + --partition-num [1-1000], -pn [1-1000] + Number of partitions/config files to generate + In case this value exceeds the row count of the source/target table, its will be decreased to max(source_row_count, target_row_count) + [--partition-key PARTITION_KEY, -partkey PARTITION_KEY] + Column on which the partitions would be generated. Column type must be integer. Defaults to Primary key + [--filters SOURCE_FILTER:TARGET_FILTER] + Colon spearated string values of source and target filters. + If target filter is not provided, the source filter will run on source and target tables. + See: *Filters* section +``` #### Schema Validations Below is the syntax for schema validations. These can be used to compare case insensitive column names and diff --git a/data_validation/__main__.py b/data_validation/__main__.py index 202e68577..7dbe81ed6 100644 --- a/data_validation/__main__.py +++ b/data_validation/__main__.py @@ -18,7 +18,7 @@ import sys from yaml import Dumper, dump - +from argparse import Namespace from data_validation import ( cli_tools, clients, @@ -28,6 +28,7 @@ ) from data_validation.config_manager import ConfigManager from data_validation.data_validation import DataValidation +from data_validation.partition_builder import PartitionBuilder # by default yaml dumps lists as pointers. This disables that feature Dumper.ignore_aliases = lambda *args: True @@ -50,15 +51,7 @@ def _get_arg_config_file(args): return args.config_file -def _get_arg_config_dir(args): - """Return String yaml config directory path.""" - if not args.config_dir: - raise ValueError("YAML Config Directory was not supplied.") - - return args.config_dir - - -def get_aggregate_config(args, config_manager): +def get_aggregate_config(args, config_manager: ConfigManager): """Return list of formated aggregation objects. Args: @@ -217,11 +210,15 @@ def build_config_from_args(args, config_manager): return config_manager -def build_config_managers_from_args(args): +def build_config_managers_from_args(args: Namespace, validate_cmd: str = None): """Return a list of config managers ready to execute.""" configs = [] - validate_cmd = args.validate_cmd.capitalize() + # Since `generate-table-partitions` defaults to `validate_cmd=row`, + # `validate_cmd` is passed along while calling this method + if validate_cmd is None: + validate_cmd = args.validate_cmd.capitalize() + if validate_cmd == "Schema": config_type = consts.SCHEMA_VALIDATION elif validate_cmd == "Column": @@ -492,10 +489,33 @@ def store_yaml_config_file(args, config_managers): cli_tools.store_validation(config_file_path, yaml_configs) -def run(args): - """ """ - config_managers = build_config_managers_from_args(args) +def partition_and_store_config_files(args: Namespace) -> None: + """Build multiple YAML Config files using user specified partition logic + + Args: + args (Namespace): User specified Arguments + Returns: + None + """ + # Default Validate Type + config_managers = build_config_managers_from_args(args, consts.ROW_VALIDATION) + partition_builder = PartitionBuilder(config_managers, args) + partition_builder.partition_configs() + + +def run(args) -> None: + """Splits execution into: + 1. Build and save single Yaml Config file + 2. Run Validations + + Args: + args (Namespace): User specified Arguments. + + Returns: + None + """ + config_managers = build_config_managers_from_args(args) if args.config_file: store_yaml_config_file(args, config_managers) else: @@ -564,6 +584,8 @@ def main(): print(run_raw_query_against_connection(args)) elif args.command == "validate": validate(args) + elif args.command == "generate-table-partitions": + partition_and_store_config_files(args) elif args.command == "deploy": from data_validation import app diff --git a/data_validation/cli_tools.py b/data_validation/cli_tools.py index 8117d68de..224d8f72b 100644 --- a/data_validation/cli_tools.py +++ b/data_validation/cli_tools.py @@ -40,6 +40,9 @@ -c ex_yaml.yaml data-validation run-config -c ex_yaml.yaml + +command: +data-validation """ import argparse @@ -48,6 +51,7 @@ import logging import sys import uuid +from argparse import Namespace from data_validation import consts, state_manager @@ -138,7 +142,7 @@ } -def get_parsed_args(): +def get_parsed_args() -> Namespace: """Return ArgParser with configured CLI arguments.""" parser = configure_arg_parser() args = ["--help"] if len(sys.argv) == 1 else None @@ -168,9 +172,104 @@ def configure_arg_parser(): _configure_find_tables(subparsers) _configure_raw_query(subparsers) _configure_beta_parser(subparsers) + _configure_partition_parser(subparsers) return parser +def _configure_partition_parser(subparsers): + """Configure arguments to generate partitioned config files.""" + partition_parser = subparsers.add_parser( + "generate-table-partitions", + help=( + "Generate partitions for validation and store the Config files in " + "a directory" + ), + ) + + # Group all optional arguments together + optional_arguments = partition_parser.add_argument_group("optional arguments") + optional_arguments.add_argument( + "--threshold", + "-th", + type=threshold_float, + help="Float max threshold for percent difference", + ) + optional_arguments.add_argument( + "--filters", + "-filters", + help="Filters in the format source_filter:target_filter", + ) + optional_arguments.add_argument( + "--use-random-row", + "-rr", + action="store_true", + help="Finds a set of random rows of the first primary key supplied.", + ) + optional_arguments.add_argument( + "--random-row-batch-size", + "-rbs", + help="Row batch size used for random row filters (default 10,000).", + ) + # Keep these in order to support data-validation run command for + # backwards-compatibility + optional_arguments.add_argument("--type", "-t", help="Type of Validation") + optional_arguments.add_argument( + "--result-handler-config", "-rc", help="Result handler config details" + ) + + """Configure arguments to generate partitions for row based validation.""" + + # Group all required arguments together + required_arguments = partition_parser.add_argument_group("required arguments") + required_arguments.add_argument( + "--primary-keys", + "-pk", + required=True, + help="Comma separated list of primary key columns 'col_a,col_b'", + ) + required_arguments.add_argument( + "--tables-list", + "-tbls", + required=True, + help=( + "Comma separated tables list in the form " + "'schema.table=target_schema.target_table'" + ), + ) + + # Group for mutually exclusive required arguments. Either must be supplied + mutually_exclusive_arguments = required_arguments.add_mutually_exclusive_group( + required=True + ) + mutually_exclusive_arguments.add_argument( + "--hash", + "-hash", + help=( + "Comma separated list of columns for hash 'col_a,col_b' or * for " + "all columns" + ), + ) + mutually_exclusive_arguments.add_argument( + "--concat", + "-concat", + help=( + "Comma separated list of columns for concat 'col_a,col_b' or * " + "for all columns" + ), + ) + + mutually_exclusive_arguments.add_argument( + "--comparison-fields", + "-comp-fields", + help=( + "Individual columns to compare. If comparing a calculated field use " + "the column alias." + ), + ) + + _add_common_partition_arguments(optional_arguments, required_arguments) + + def _configure_beta_parser(subparsers): """Configure beta commands for the parser.""" connection_parser = subparsers.add_parser( @@ -646,6 +745,79 @@ def _add_common_arguments(parser): ) +def _add_common_partition_arguments(optional_arguments, required_arguments=None): + """Add all arguments common to get-partition command""" + + # Group all Required Arguments together + required_arguments.add_argument( + "--source-conn", "-sc", required=True, help="Source connection name" + ) + required_arguments.add_argument( + "--target-conn", "-tc", required=True, help="Target connection name" + ) + required_arguments.add_argument( + "--config-dir", + "-cdir", + required=True, + help="Directory Path to store YAML Config Files. " + "GCS: Provide a full gs:// path of the target directory. " + "Eg: `gs:///partiitons_dir`. " + "Local: Provide a relative path of the target directory. " + "Eg: `partitions_dir`", + ) + required_arguments.add_argument( + "--partition-num", + "-pn", + required=True, + help="Number of partitions/config files to generate", + type=int, + choices=range(1, 1001), + metavar="[1-1000]", + ) + + # Optional arguments + optional_arguments.add_argument( + "--bq-result-handler", + "-bqrh", + help="BigQuery result handler config details", + ) + optional_arguments.add_argument( + "--labels", "-l", help="Key value pair labels for validation run" + ) + optional_arguments.add_argument( + "--service-account", + "-sa", + help="Path to SA key file for result handler output", + ) + optional_arguments.add_argument( + "--format", + "-fmt", + default="table", + help=( + "Set the format for printing command output, Supported formats are " + "(text, csv, json, table). Defaults to table" + ), + ) + optional_arguments.add_argument( + "--filter-status", + "-fs", + # TODO: update if we start to support other statuses + help=( + "Comma separated list of statuses to filter the validation results. " + "Supported statuses are (success, fail). If no list is provided, " + "all statuses are returned" + ), + ) + optional_arguments.add_argument( + "--partition-key", + "-partkey", + help=( + "Column on which the partitions would be generated. " + "Column type must be integer. Defaults to Primary key" + ), + ) + + def get_connection_config_from_args(args): """Return dict with connection config supplied.""" config = {consts.SOURCE_TYPE: args.connect_type} @@ -710,6 +882,12 @@ def store_validation(validation_file_name, yaml_config): mgr.create_validation_yaml(validation_file_name, yaml_config) +def store_partition(target_file_path, yaml_config, target_folder_path=None): + """Store the partition YAML config under the given name.""" + mgr = state_manager.StateManager(target_folder_path) + mgr.create_partition_yaml(target_file_path, yaml_config) + + def get_validation(validation_name, config_dir=None): """Return validation YAML for a specific connection.""" if config_dir: diff --git a/data_validation/partition_builder.py b/data_validation/partition_builder.py new file mode 100644 index 000000000..60466bbcb --- /dev/null +++ b/data_validation/partition_builder.py @@ -0,0 +1,277 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import logging +import numpy +from typing import List, Dict +from argparse import Namespace + +from data_validation import cli_tools, consts +from data_validation.config_manager import ConfigManager +from data_validation.query_builder.partition_row_builder import PartitionRowBuilder +from data_validation.validation_builder import ValidationBuilder + + +class PartitionBuilder: + def __init__(self, config_managers: List[ConfigManager], args: Namespace) -> None: + + self.config_managers = config_managers + self.table_count = len(config_managers) + self.args = args + self.config_dir = self._get_arg_config_dir() + self.primary_key = self._get_primary_key() + self.partition_key = self._get_arg_partition_key() + + def _get_arg_config_dir(self) -> str: + """Return String yaml config folder path.""" + if not self.args.config_dir: + raise ValueError("YAML Config Dir Path was not supplied.") + + return self.args.config_dir + + def _get_primary_key(self) -> str: + """Return the first Primary Key""" + # Filter for only first primary key (multi-pk filter not supported) + primary_keys = cli_tools.get_arg_list(self.args.primary_keys) + primary_key = primary_keys[0] + return primary_key + + def _get_arg_partition_key(self) -> str: + """Return Partition Key. If not supplied, defaults to Primary Key""" + if not self.args.partition_key: + logging.warning( + "Partition key cannot be found. Will default to Primary key" + ) + return self.primary_key + + return self.args.partition_key + + def _get_yaml_from_config(self, config_manager: ConfigManager) -> Dict: + """Return dict objects formatted for yaml validations. + + Args: + config_managers (list[ConfigManager]): List of config manager instances. + """ + yaml_config = { + consts.YAML_SOURCE: self.args.source_conn, + consts.YAML_TARGET: self.args.target_conn, + consts.YAML_RESULT_HANDLER: config_manager.result_handler_config, + consts.YAML_VALIDATIONS: [config_manager.get_yaml_validation_block()], + } + return yaml_config + + def partition_configs(self) -> None: + """Takes a list of ConfigManager object and splits each it into multiple + ConfigManager objects applying supplied partition logic. + + Returns: + None + """ + + # Default partition logic: Partition key + # Add necessary checks and routes when support for hashmod or partitionkeymod is added + partition_filters = self._get_partition_key_filters() + yaml_configs_list = self._add_partition_filters(partition_filters) + self._store_partitions(yaml_configs_list) + + def _get_partition_key_filters(self) -> List[List[str]]: + """Generate Partition filters for primary_key type partition for all + Configs/Tables. + + Returns: + A list of lists of partition filters for each table + """ + + master_filter_list = [] + + for config_manager in self.config_managers: + + validation_builder = ValidationBuilder(config_manager) + + source_partition_row_builder = PartitionRowBuilder( + self.partition_key, + config_manager.source_client, + config_manager.source_schema, + config_manager.source_table, + validation_builder.source_builder, + ) + + target_partition_row_builder = PartitionRowBuilder( + self.partition_key, + config_manager.target_client, + config_manager.target_schema, + config_manager.target_table, + validation_builder.target_builder, + ) + + # Get Source and Target row Count + source_count_query = source_partition_row_builder.get_count_query() + source_count = source_count_query.execute() + + target_count_query = target_partition_row_builder.get_count_query() + target_count = target_count_query.execute() + + if source_count != target_count: + logging.warning( + "Source and Target table row counts do not match," + "proceeding with max(source_count, target_count)" + ) + row_count = max(source_count, target_count) + + # If supplied partition_num is greater than count(*) coalesce it + if self.args.partition_num > row_count: + partition_count = row_count + logging.warning( + "Supplied partition num is greater than row count, " + "truncating it to row count" + ) + else: + partition_count = self.args.partition_num + + # Get Source and Target Primary key Min + source_min_query = source_partition_row_builder.get_min_query() + source_min = source_min_query.execute() + + target_min_query = target_partition_row_builder.get_min_query() + target_min = target_min_query.execute() + + # If Primary key is non numeric, raise Type Error + accepted_data_types = [int, numpy.int32, numpy.int64] + if not ( + type(source_min) in accepted_data_types + and type(target_min) in accepted_data_types + ): + raise TypeError( + f"Supplied Partition key is not of type Numeric: " + f"{self.partition_key}" + ) + + if source_min != target_min: + logging.warning( + "min(partition_key) for Source and Target tables do not" + "match, proceeding with min(source_min, target_min)" + ) + lower_bound = min(source_min, target_min) + + # Get Source and Target Primary key Max + source_max_query = source_partition_row_builder.get_max_query() + source_max = source_max_query.execute() + + target_max_query = target_partition_row_builder.get_max_query() + target_max = target_max_query.execute() + + if source_min != target_min: + logging.warning( + "max(partition_key) for Source and Target tables do not" + "match, proceeding with max(source_max, target_max)" + ) + + upper_bound = max(source_max, target_max) + + filter_list = [] # Store partition filters per config/table + i = 0 + marker = lower_bound + partition_step = (upper_bound - lower_bound) // partition_count + while i < partition_count: + lower_val = marker + upper_val = marker + partition_step + + if i == partition_count - 1: + upper_val = upper_bound + 1 + + partition_filter = ( + f"{self.partition_key} >= {lower_val} " + f"and {self.partition_key} < {upper_val}" + ) + filter_list.append(partition_filter) + + i += 1 + marker += partition_step + + master_filter_list.append(filter_list) + + return master_filter_list + + def _add_partition_filters( + self, + partition_filters: List[List[str]], + ) -> List[Dict]: + """Add Partition Filters to ConfigManager and return a list of dict + ConfigManager objects. + + Args: + partition_filters (List[List[str]]): List of List of Partition filters + for all Table/ConfigManager objects + + Returns: + yaml_configs_list (List[Dict]): List of YAML configs for all tables + """ + + table_count = len(self.config_managers) + yaml_configs_list = [None] * table_count + for ind in range(table_count): + config_manager = self.config_managers[ind] + filter_list = partition_filters[ind] + + yaml_configs_list[ind] = { + "target_folder_name": config_manager.full_source_table, + "partitions": [], + } + for pos in range(len(filter_list)): + filter_dict = { + "type": "custom", + "source": filter_list[pos], + "target": filter_list[pos], + } + # Append partition new filter + config_manager.filters.append(filter_dict) + + # Build and append partition YAML + yaml_config = self._get_yaml_from_config(config_manager) + target_file_name = "0" * (4 - len(str(pos))) + str(pos) + ".yaml" + yaml_configs_list[ind]["partitions"].append( + {"target_file_name": target_file_name, "yaml_config": yaml_config} + ) + + # Pop last partition filter + config_manager.filters.pop() + + return yaml_configs_list + + def _store_partitions(self, yaml_configs_list: List[Dict]) -> None: + """Save Partitions to target folder + + Args: + yaml_configs_list (List[Dict]): List of YAML configs for all tables + + Returns: + None + """ + logging.info(f"Writing table partition configs to directory: {self.config_dir}") + + for table in yaml_configs_list: + target_folder_name = table["target_folder_name"] + target_folder_path = os.path.join(self.config_dir, target_folder_name) + for partition in table["partitions"]: + yaml_config = partition["yaml_config"] + target_file_name = partition["target_file_name"] + target_file_path = os.path.join(target_folder_path, target_file_name) + cli_tools.store_partition( + target_file_path, yaml_config, target_folder_path + ) + + logging.info( + f"Success! Table partition configs written to directory: {self.config_dir}" + ) diff --git a/data_validation/query_builder/partition_row_builder.py b/data_validation/query_builder/partition_row_builder.py new file mode 100644 index 000000000..0ee4337a2 --- /dev/null +++ b/data_validation/query_builder/partition_row_builder.py @@ -0,0 +1,73 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import ibis +from data_validation import clients +from data_validation.query_builder.query_builder import QueryBuilder + + +class PartitionRowBuilder(object): + def __init__( + self, + partition_key: str, + data_client: ibis.client, + schema_name: str, + table_name: str, + query_builder: QueryBuilder, + ) -> None: + """Build a PartitionRowBuilder object which is ready to build a partition row filter query. + + Args: + partition_key (str): Partition key used to generate Partitions. + data_client (IbisClient): The client used to query random rows. + schema_name (String): The name of the schema for the given table. + table_name (String): The name of the table to query. + query_builder (QueryBuilder): QueryBuilder object. + """ + self.partition_key = partition_key + self.query = self._compile_query( + data_client, schema_name, table_name, query_builder + ) + + def _compile_query( + self, + data_client: ibis.client, + schema_name: str, + table_name: str, + query_builder: QueryBuilder, + ) -> ibis.Expr: + """Return an Ibis query object + + Args: + data_client (IbisClient): The client used to query random rows. + schema_name (String): The name of the schema for the given table. + table_name (String): The name of the table to query. + query_builder (QueryBuilder): QueryBuilder object. + """ + table = clients.get_ibis_table(data_client, schema_name, table_name) + compiled_filters = query_builder.compile_filter_fields(table) + filtered_table = table.filter(compiled_filters) if compiled_filters else table + return filtered_table + + def get_max_query(self) -> ibis.Expr: + """Return an Ibis query object to get Max of Primary Key column""" + return self.query[self.partition_key].max() + + def get_min_query(self) -> ibis.Expr: + """Return an Ibis query object to get Min of Primary Key column""" + return self.query[self.partition_key].min() + + def get_count_query(self) -> ibis.Expr: + """Return an Ibis query object to get count of Primary Key column""" + return self.query[self.partition_key].count() diff --git a/data_validation/state_manager.py b/data_validation/state_manager.py index 587776d5f..5be76b9bd 100644 --- a/data_validation/state_manager.py +++ b/data_validation/state_manager.py @@ -113,6 +113,17 @@ def create_validation_yaml(self, name: str, yaml_config: Dict[str, str]): yaml_config_str = dump(yaml_config, Dumper=Dumper) self._write_file(validation_path, yaml_config_str) + def create_partition_yaml(self, target_file_path: str, yaml_config: Dict[str, str]): + """Create a validation file and store the given config as YAML. + + Args: + name (String): The name of the validation. + yaml_config (Dict): A dictionary with the validation details. + """ + partition_path = self._get_partition_path(target_file_path) + yaml_config_str = dump(yaml_config, Dumper=Dumper) + self._write_partition(partition_path, yaml_config_str) + def get_validation_config(self, name: str, config_dir=None) -> Dict[str, str]: """Get a validation configuration from the expected file. @@ -159,6 +170,16 @@ def _get_validation_path(self, name: str) -> str: """ return os.path.join(self._get_validations_directory(), f"{name}") + def _get_partition_path(self, name: str) -> str: + """Returns the full path to a validation. + + Args: + name: The name of the validation. + """ + if self.file_system == FileSystem.LOCAL: + return os.path.join("./", name) + return name + def _read_file(self, file_path: str) -> str: if self.file_system == FileSystem.GCS: return self._read_gcs_file(file_path) @@ -174,6 +195,13 @@ def _write_file(self, file_path: str, data: str): logging.info("Success! Config output written to {}".format(file_path)) + def _write_partition(self, file_path: str, data: str): + if self.file_system == FileSystem.GCS: + self._write_gcs_file(file_path, data) + else: + with open(file_path, "w") as file: + file.write(data) + def _list_directory(self, directory_path: str) -> List[str]: if self.file_system == FileSystem.GCS: return self._list_gcs_directory(directory_path) diff --git a/docs/examples.md b/docs/examples.md index 528792c47..9dc9de731 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -30,6 +30,12 @@ Above command creates a YAML file named citibike.yaml that can be used to run va data-validation run-config -c citibike.yaml ```` Above command executes validations stored in a config file named citibike.yaml. + +#### Generate partitions and save as multiple configuration files +````shell script +data-validation generate-table-partitions -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --primary-keys station_id --hash '*' --filters 'station_id>3000' -cdir partitions_dir --partition-key station_id --partition-num 200 +```` +Above command creates multiple partitions based on `--partition-key`. Number of generated configuration files is decided by `--partition-num` #### Run COUNT validations for all columns ````shell script diff --git a/tests/unit/test_partition_builder.py b/tests/unit/test_partition_builder.py new file mode 100644 index 000000000..0778489d4 --- /dev/null +++ b/tests/unit/test_partition_builder.py @@ -0,0 +1,475 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil +import pytest +import json +import random +from datetime import datetime, timedelta + +from data_validation import cli_tools +from data_validation import consts +from data_validation.config_manager import ConfigManager + +SOURCE_TABLE_FILE_PATH = "source_table_data.json" +TARGET_TABLE_FILE_PATH = "target_table_data.json" + +STRING_CONSTANT = "constant" +RANDOM_STRINGS = ["a", "b", "c", "d"] + +SOURCE_CONN_CONFIG = { + "source_type": "FileSystem", + "table_name": "my_table", + "file_path": SOURCE_TABLE_FILE_PATH, + "file_type": "json", +} + +TARGET_CONN_CONFIG = { + "source_type": "FileSystem", + "table_name": "my_table", + "file_path": TARGET_TABLE_FILE_PATH, + "file_type": "json", +} + +TEST_CONN = "{'source_type':'Example'}" +PARTITION_NUM = 3 +PARTITIONS_DIR = "test_partitions_dir" + +CLI_ARGS_JSON_SOURCE = [ + "generate-table-partitions", + "-sc", + TEST_CONN, + "-tc", + TEST_CONN, + "-tbls", + "test_table", + "--primary-keys", + "station_id", + "--hash", + "*", + "--filter-status", + "fail", + "--filters", + "station_id>3000", + "-cdir", + PARTITIONS_DIR, + "--partition-num", + f"{PARTITION_NUM}", + "--partition-key", + "id", +] + +# partition_key is not passed +CLI_ARGS_ABSENT_PARTITION_KEY = [ + "generate-table-partitions", + "-sc", + TEST_CONN, + "-tc", + TEST_CONN, + "-tbls", + "bigquery-public-data.new_york_citibike.citibike_stations,bigquery-public-data.new_york_citibike.citibike_stations", + "--primary-keys", + "station_id", + "--hash", + "*", + "--filter-status", + "fail", + "-cdir", + PARTITIONS_DIR, + "--partition-num", + f"{PARTITION_NUM}", + "--labels", + "name=test_run", +] + +PARTITION_FILTERS_LIST = [ + "id >= 0 and id < 333", + "id >= 333 and id < 666", + "id >= 666 and id < 1001", +] + +YAML_CONFIGS_LIST = [ + { + "target_folder_name": "test_table", + "partitions": [ + { + "target_file_name": "0000.yaml", + "yaml_config": { + "source": "{'source_type':'Example'}", + "target": "{'source_type':'Example'}", + "result_handler": {}, + "validations": [ + { + "type": "Row", + "schema_name": None, + "table_name": "test_table", + "target_schema_name": None, + "target_table_name": "test_table", + "primary_keys": [ + { + "field_alias": "id", + "source_column": "id", + "target_column": "id", + "cast": None, + } + ], + "comparison_fields": [ + { + "field_alias": "int_value", + "source_column": "int_value", + "target_column": "int_value", + "cast": None, + }, + { + "field_alias": "text_value", + "source_column": "text_value", + "target_column": "text_value", + "cast": None, + }, + ], + "format": "table", + "filter_status": None, + "filters": [ + { + "type": "custom", + "source": "id >= 0 and id < 333", + "target": "id >= 0 and id < 333", + } + ], + } + ], + }, + }, + { + "target_file_name": "0001.yaml", + "yaml_config": { + "source": "{'source_type':'Example'}", + "target": "{'source_type':'Example'}", + "result_handler": {}, + "validations": [ + { + "type": "Row", + "schema_name": None, + "table_name": "test_table", + "target_schema_name": None, + "target_table_name": "test_table", + "primary_keys": [ + { + "field_alias": "id", + "source_column": "id", + "target_column": "id", + "cast": None, + } + ], + "comparison_fields": [ + { + "field_alias": "int_value", + "source_column": "int_value", + "target_column": "int_value", + "cast": None, + }, + { + "field_alias": "text_value", + "source_column": "text_value", + "target_column": "text_value", + "cast": None, + }, + ], + "format": "table", + "filter_status": None, + "filters": [ + { + "type": "custom", + "source": "id >= 333 and id < 666", + "target": "id >= 333 and id < 666", + } + ], + } + ], + }, + }, + { + "target_file_name": "0002.yaml", + "yaml_config": { + "source": "{'source_type':'Example'}", + "target": "{'source_type':'Example'}", + "result_handler": {}, + "validations": [ + { + "type": "Row", + "schema_name": None, + "table_name": "test_table", + "target_schema_name": None, + "target_table_name": "test_table", + "primary_keys": [ + { + "field_alias": "id", + "source_column": "id", + "target_column": "id", + "cast": None, + } + ], + "comparison_fields": [ + { + "field_alias": "int_value", + "source_column": "int_value", + "target_column": "int_value", + "cast": None, + }, + { + "field_alias": "text_value", + "source_column": "text_value", + "target_column": "text_value", + "cast": None, + }, + ], + "format": "table", + "filter_status": None, + "filters": [ + { + "type": "custom", + "source": "id >= 666 and id < 1001", + "target": "id >= 666 and id < 1001", + } + ], + } + ], + }, + }, + ], + } +] + + +@pytest.fixture +def module_under_test(): + import data_validation.partition_builder + + return data_validation.partition_builder + + +def teardown_module(module): + # Clean up: Delete test partitions directory and its contents + folder_path = os.path.join("./", PARTITIONS_DIR) + if os.path.exists(folder_path): + shutil.rmtree(folder_path) + + +def _generate_fake_data( + rows=10, initial_id=0, second_range=60 * 60 * 24, int_range=100, random_strings=None +): + """Return a list of dicts with given number of rows. + + Data Keys: + id: a unique int per row + timestamp_value: a random timestamp in the past {second_range} back + date_value: a random date in the past {second_range} back + int_value: a random int value inside 0 to {int_range} + text_value: a random string from supplied list + """ + data = [] + random_strings = random_strings or RANDOM_STRINGS + for i in range(initial_id, initial_id + rows): + rand_seconds = random.randint(0, second_range) + rand_timestamp = datetime.now() - timedelta(seconds=rand_seconds) + rand_date = rand_timestamp.date() + + row = { + "id": i, + "date_value": rand_date, + "timestamp_value": rand_timestamp, + "int_value": random.randint(0, int_range), + "text_constant": STRING_CONSTANT, + "text_numeric": "2", + "text_value": random.choice(random_strings), + "text_value_two": random.choice(random_strings), + } + data.append(row) + + return data + + +def _generate_config_manager(table_name: str = "my_table") -> ConfigManager: + """Returns a Dummy ConfigManager Object""" + + row_config = { + # BigQuery Specific Connection Config + "source_conn": SOURCE_CONN_CONFIG, + "target_conn": TARGET_CONN_CONFIG, + # Validation Type + consts.CONFIG_TYPE: consts.ROW_VALIDATION, + # Configuration Required Depending on Validator Type + "schema_name": None, + "table_name": table_name, + "target_schema_name": None, + "target_table_name": table_name, + consts.CONFIG_PRIMARY_KEYS: [ + { + consts.CONFIG_FIELD_ALIAS: "id", + consts.CONFIG_SOURCE_COLUMN: "id", + consts.CONFIG_TARGET_COLUMN: "id", + consts.CONFIG_CAST: None, + }, + ], + consts.CONFIG_COMPARISON_FIELDS: [ + { + consts.CONFIG_FIELD_ALIAS: "int_value", + consts.CONFIG_SOURCE_COLUMN: "int_value", + consts.CONFIG_TARGET_COLUMN: "int_value", + consts.CONFIG_CAST: None, + }, + { + consts.CONFIG_FIELD_ALIAS: "text_value", + consts.CONFIG_SOURCE_COLUMN: "text_value", + consts.CONFIG_TARGET_COLUMN: "text_value", + consts.CONFIG_CAST: None, + }, + ], + consts.CONFIG_RESULT_HANDLER: None, + consts.CONFIG_FORMAT: "table", + consts.CONFIG_FILTER_STATUS: None, + consts.CONFIG_FILTERS: [], + } + return ConfigManager(row_config) + + +def _get_fake_json_data(data): + for row in data: + row["date_value"] = str(row["date_value"]) + row["timestamp_value"] = str(row["timestamp_value"]) + row["text_constant"] = row["text_constant"] + row["text_numeric"] = row["text_numeric"] + row["text_value"] = row["text_value"] + row["text_value_two"] = row["text_value_two"] + + return json.dumps(data) + + +def _create_table_file(table_path, data): + """Create JSON File""" + with open(table_path, "w") as f: + f.write(data) + + +def test_import(module_under_test): + assert module_under_test is not None + + +def test_class_object_creation(module_under_test): + """Create a PartitionBuilder object passing 2 tables and assert the following + 1. Table/Configs count + 2. config_dir value present and absent + 3. primary_key value + 4. partition_key value present and absent + """ + mock_config_manager = _generate_config_manager("test_table") + config_managers = [mock_config_manager] + + parser = cli_tools.configure_arg_parser() + + # partition_key is present and different from primary_key + # config_dir is passed + args = parser.parse_args(CLI_ARGS_JSON_SOURCE) + builder = module_under_test.PartitionBuilder(config_managers, args) + assert builder.table_count == len(config_managers) + assert builder.partition_key == "id" + assert builder.primary_key == "station_id" + + # partition_key is absent, expected to default to primary_key + args = parser.parse_args(CLI_ARGS_ABSENT_PARTITION_KEY) + builder = module_under_test.PartitionBuilder(config_managers, args) + assert builder.table_count == len(config_managers) + assert builder.partition_key == builder.primary_key + + +def test_get_partition_key_filters(module_under_test): + """Build partitions filters and assert: + 1. Table count + 2. Filters count + 3. Partition filters + """ + data = _generate_fake_data(rows=1001, second_range=0) + + source_json_data = _get_fake_json_data(data) + target_json_data = _get_fake_json_data(data) + + _create_table_file(SOURCE_TABLE_FILE_PATH, source_json_data) + _create_table_file(TARGET_TABLE_FILE_PATH, target_json_data) + + config_manager = _generate_config_manager("my_table") + config_managers = [config_manager] + + parser = cli_tools.configure_arg_parser() + mock_args = parser.parse_args(CLI_ARGS_JSON_SOURCE) + + expected_partition_filters_list = PARTITION_FILTERS_LIST + + builder = module_under_test.PartitionBuilder(config_managers, mock_args) + partition_filters_list = builder._get_partition_key_filters() + assert len(partition_filters_list) == len(config_managers) + assert len(partition_filters_list[0]) == mock_args.partition_num + assert partition_filters_list[0] == expected_partition_filters_list + + +def test_add_partition_filters_to_config(module_under_test): + """Add partition filters to ConfigManager object, build YAML config list + and assert YAML configs + """ + # Generate dummy YAML configs list + config_manager = _generate_config_manager("test_table") + config_managers = [config_manager] + + parser = cli_tools.configure_arg_parser() + mock_args = parser.parse_args(CLI_ARGS_JSON_SOURCE) + + expected_yaml_configs_list = YAML_CONFIGS_LIST + + partition_filters = PARTITION_FILTERS_LIST + master_filter_list = [partition_filters] + + # Create PartitionBuilder object and get YAML configs list + builder = module_under_test.PartitionBuilder(config_managers, mock_args) + yaml_configs_list = builder._add_partition_filters(master_filter_list) + assert yaml_configs_list == expected_yaml_configs_list + + +def test_store_yaml_partitions_local(module_under_test): + """Store all the Partition YAMLs for a table to specified local directory""" + + # Generate dummy YAML configs list + config_manager = _generate_config_manager("test_table") + config_managers = [config_manager] + + parser = cli_tools.configure_arg_parser() + mock_args = parser.parse_args(CLI_ARGS_JSON_SOURCE) + + # Dummy YAML configs list + yaml_configs_list = YAML_CONFIGS_LIST + + # Create test partitions directory to store results + folder_path = os.path.join("./", PARTITIONS_DIR) + if not os.path.exists(folder_path): + os.mkdir(folder_path) + + # Store YAML partition configs to local directory + builder = module_under_test.PartitionBuilder(config_managers, mock_args) + builder._store_partitions(yaml_configs_list) + + # Assert file count for 1 table and sample file names + partition_dir_contents = os.listdir(os.path.join(PARTITIONS_DIR, "test_table")) + + assert len(partition_dir_contents) == PARTITION_NUM + assert "0000.yaml" in partition_dir_contents + assert "0002.yaml" in partition_dir_contents