diff --git a/.gitignore b/.gitignore index ac377563f..740f991dd 100644 --- a/.gitignore +++ b/.gitignore @@ -84,7 +84,13 @@ terraform.rc # Custom *.yaml +partitions_dir # Test temp files source_table_data.json target_table_data.json +source-sql.sql + +# Commit msg +commits.txt +test.py \ No newline at end of file diff --git a/README.md b/README.md index fa55fa692..b9636964c 100644 --- a/README.md +++ b/README.md @@ -197,7 +197,50 @@ data-validation (--verbose or -v) (--log-level or -ll) validate row [--filter-status or -fs STATUSES_LIST] Comma separated list of statuses to filter the validation results. Supported statuses are (success, fail). If no list is provided, all statuses are returned. ``` +#### Generate Table Partitions for Large Table Row Validations +Below is the command syntax for generating table partitions in order to perform row validations on large tables with memory constraints. + +The command generates and stores multiple YAML configs that represent chunks of the large table using filters (`WHERE partition_key > X AND partition_key < Y`). You can then run the configs in the directory serially with the `data-validation configs run --config-dir PATH` command as described [here](https://github.com/GoogleCloudPlatform/professional-services-data-validator#yaml-configuration-files). + +The command takes the same parameters as required for `Row Validation` *plus* few commands to implement the partitioning logic. + +(Note: As of now, only monotonically increasing key is supported for `--partition-key`.) + +``` +data-validation (--verbose or -v) (--log-level or -ll) generate-table-partitions + + --source-conn or -sc SOURCE_CONN + Source connection details + See: *Data Source Configurations* section for each data source + --target-conn or -tc TARGET_CONN + Target connection details + See: *Connections* section for each data source + --tables-list or -tbls SOURCE_SCHEMA.SOURCE_TABLE=TARGET_SCHEMA.TARGET_TABLE + Comma separated list of tables in the form schema.table=target_schema.target_table + Target schema name and table name are optional. + i.e 'bigquery-public-data.new_york_citibike.citibike_trips' + --primary-keys PRIMARY_KEYS, -pk PRIMARY_KEYS + Comma separated list of primary key columns 'col_a,col_b' + --comparison-fields or -comp-fields FIELDS + Comma separated list of columns to compare. Can either be a physical column or an alias + See: *Calculated Fields* section for details + --hash COLUMNS Comma separated list of columns to hash or * for all columns + --concat COLUMNS Comma separated list of columns to concatenate or * for all columns (use if a common hash function is not available between databases) + --config-dir CONFIG_DIR, -cdir CONFIG_DIR + Directory Path to store YAML Config Files + GCS: Provide a full gs:// path of the target directory. Eg: `gs:///partitions_dir` + Local: Provide a relative path of the target directory. Eg: `partitions_dir` + --partition-num [1-1000], -pn [1-1000] + Number of partitions/config files to generate + In case this value exceeds the row count of the source/target table, its will be decreased to max(source_row_count, target_row_count) + [--partition-key PARTITION_KEY, -partkey PARTITION_KEY] + Column on which the partitions would be generated. Column type must be integer. Defaults to Primary key + [--filters SOURCE_FILTER:TARGET_FILTER] + Colon spearated string values of source and target filters. + If target filter is not provided, the source filter will run on source and target tables. + See: *Filters* section +``` #### Schema Validations Below is the syntax for schema validations. These can be used to compare case insensitive column names and diff --git a/data_validation/__main__.py b/data_validation/__main__.py index 202e68577..7dbe81ed6 100644 --- a/data_validation/__main__.py +++ b/data_validation/__main__.py @@ -18,7 +18,7 @@ import sys from yaml import Dumper, dump - +from argparse import Namespace from data_validation import ( cli_tools, clients, @@ -28,6 +28,7 @@ ) from data_validation.config_manager import ConfigManager from data_validation.data_validation import DataValidation +from data_validation.partition_builder import PartitionBuilder # by default yaml dumps lists as pointers. This disables that feature Dumper.ignore_aliases = lambda *args: True @@ -50,15 +51,7 @@ def _get_arg_config_file(args): return args.config_file -def _get_arg_config_dir(args): - """Return String yaml config directory path.""" - if not args.config_dir: - raise ValueError("YAML Config Directory was not supplied.") - - return args.config_dir - - -def get_aggregate_config(args, config_manager): +def get_aggregate_config(args, config_manager: ConfigManager): """Return list of formated aggregation objects. Args: @@ -217,11 +210,15 @@ def build_config_from_args(args, config_manager): return config_manager -def build_config_managers_from_args(args): +def build_config_managers_from_args(args: Namespace, validate_cmd: str = None): """Return a list of config managers ready to execute.""" configs = [] - validate_cmd = args.validate_cmd.capitalize() + # Since `generate-table-partitions` defaults to `validate_cmd=row`, + # `validate_cmd` is passed along while calling this method + if validate_cmd is None: + validate_cmd = args.validate_cmd.capitalize() + if validate_cmd == "Schema": config_type = consts.SCHEMA_VALIDATION elif validate_cmd == "Column": @@ -492,10 +489,33 @@ def store_yaml_config_file(args, config_managers): cli_tools.store_validation(config_file_path, yaml_configs) -def run(args): - """ """ - config_managers = build_config_managers_from_args(args) +def partition_and_store_config_files(args: Namespace) -> None: + """Build multiple YAML Config files using user specified partition logic + + Args: + args (Namespace): User specified Arguments + Returns: + None + """ + # Default Validate Type + config_managers = build_config_managers_from_args(args, consts.ROW_VALIDATION) + partition_builder = PartitionBuilder(config_managers, args) + partition_builder.partition_configs() + + +def run(args) -> None: + """Splits execution into: + 1. Build and save single Yaml Config file + 2. Run Validations + + Args: + args (Namespace): User specified Arguments. + + Returns: + None + """ + config_managers = build_config_managers_from_args(args) if args.config_file: store_yaml_config_file(args, config_managers) else: @@ -564,6 +584,8 @@ def main(): print(run_raw_query_against_connection(args)) elif args.command == "validate": validate(args) + elif args.command == "generate-table-partitions": + partition_and_store_config_files(args) elif args.command == "deploy": from data_validation import app diff --git a/data_validation/cli_tools.py b/data_validation/cli_tools.py index 8117d68de..224d8f72b 100644 --- a/data_validation/cli_tools.py +++ b/data_validation/cli_tools.py @@ -40,6 +40,9 @@ -c ex_yaml.yaml data-validation run-config -c ex_yaml.yaml + +command: +data-validation """ import argparse @@ -48,6 +51,7 @@ import logging import sys import uuid +from argparse import Namespace from data_validation import consts, state_manager @@ -138,7 +142,7 @@ } -def get_parsed_args(): +def get_parsed_args() -> Namespace: """Return ArgParser with configured CLI arguments.""" parser = configure_arg_parser() args = ["--help"] if len(sys.argv) == 1 else None @@ -168,9 +172,104 @@ def configure_arg_parser(): _configure_find_tables(subparsers) _configure_raw_query(subparsers) _configure_beta_parser(subparsers) + _configure_partition_parser(subparsers) return parser +def _configure_partition_parser(subparsers): + """Configure arguments to generate partitioned config files.""" + partition_parser = subparsers.add_parser( + "generate-table-partitions", + help=( + "Generate partitions for validation and store the Config files in " + "a directory" + ), + ) + + # Group all optional arguments together + optional_arguments = partition_parser.add_argument_group("optional arguments") + optional_arguments.add_argument( + "--threshold", + "-th", + type=threshold_float, + help="Float max threshold for percent difference", + ) + optional_arguments.add_argument( + "--filters", + "-filters", + help="Filters in the format source_filter:target_filter", + ) + optional_arguments.add_argument( + "--use-random-row", + "-rr", + action="store_true", + help="Finds a set of random rows of the first primary key supplied.", + ) + optional_arguments.add_argument( + "--random-row-batch-size", + "-rbs", + help="Row batch size used for random row filters (default 10,000).", + ) + # Keep these in order to support data-validation run command for + # backwards-compatibility + optional_arguments.add_argument("--type", "-t", help="Type of Validation") + optional_arguments.add_argument( + "--result-handler-config", "-rc", help="Result handler config details" + ) + + """Configure arguments to generate partitions for row based validation.""" + + # Group all required arguments together + required_arguments = partition_parser.add_argument_group("required arguments") + required_arguments.add_argument( + "--primary-keys", + "-pk", + required=True, + help="Comma separated list of primary key columns 'col_a,col_b'", + ) + required_arguments.add_argument( + "--tables-list", + "-tbls", + required=True, + help=( + "Comma separated tables list in the form " + "'schema.table=target_schema.target_table'" + ), + ) + + # Group for mutually exclusive required arguments. Either must be supplied + mutually_exclusive_arguments = required_arguments.add_mutually_exclusive_group( + required=True + ) + mutually_exclusive_arguments.add_argument( + "--hash", + "-hash", + help=( + "Comma separated list of columns for hash 'col_a,col_b' or * for " + "all columns" + ), + ) + mutually_exclusive_arguments.add_argument( + "--concat", + "-concat", + help=( + "Comma separated list of columns for concat 'col_a,col_b' or * " + "for all columns" + ), + ) + + mutually_exclusive_arguments.add_argument( + "--comparison-fields", + "-comp-fields", + help=( + "Individual columns to compare. If comparing a calculated field use " + "the column alias." + ), + ) + + _add_common_partition_arguments(optional_arguments, required_arguments) + + def _configure_beta_parser(subparsers): """Configure beta commands for the parser.""" connection_parser = subparsers.add_parser( @@ -646,6 +745,79 @@ def _add_common_arguments(parser): ) +def _add_common_partition_arguments(optional_arguments, required_arguments=None): + """Add all arguments common to get-partition command""" + + # Group all Required Arguments together + required_arguments.add_argument( + "--source-conn", "-sc", required=True, help="Source connection name" + ) + required_arguments.add_argument( + "--target-conn", "-tc", required=True, help="Target connection name" + ) + required_arguments.add_argument( + "--config-dir", + "-cdir", + required=True, + help="Directory Path to store YAML Config Files. " + "GCS: Provide a full gs:// path of the target directory. " + "Eg: `gs:///partiitons_dir`. " + "Local: Provide a relative path of the target directory. " + "Eg: `partitions_dir`", + ) + required_arguments.add_argument( + "--partition-num", + "-pn", + required=True, + help="Number of partitions/config files to generate", + type=int, + choices=range(1, 1001), + metavar="[1-1000]", + ) + + # Optional arguments + optional_arguments.add_argument( + "--bq-result-handler", + "-bqrh", + help="BigQuery result handler config details", + ) + optional_arguments.add_argument( + "--labels", "-l", help="Key value pair labels for validation run" + ) + optional_arguments.add_argument( + "--service-account", + "-sa", + help="Path to SA key file for result handler output", + ) + optional_arguments.add_argument( + "--format", + "-fmt", + default="table", + help=( + "Set the format for printing command output, Supported formats are " + "(text, csv, json, table). Defaults to table" + ), + ) + optional_arguments.add_argument( + "--filter-status", + "-fs", + # TODO: update if we start to support other statuses + help=( + "Comma separated list of statuses to filter the validation results. " + "Supported statuses are (success, fail). If no list is provided, " + "all statuses are returned" + ), + ) + optional_arguments.add_argument( + "--partition-key", + "-partkey", + help=( + "Column on which the partitions would be generated. " + "Column type must be integer. Defaults to Primary key" + ), + ) + + def get_connection_config_from_args(args): """Return dict with connection config supplied.""" config = {consts.SOURCE_TYPE: args.connect_type} @@ -710,6 +882,12 @@ def store_validation(validation_file_name, yaml_config): mgr.create_validation_yaml(validation_file_name, yaml_config) +def store_partition(target_file_path, yaml_config, target_folder_path=None): + """Store the partition YAML config under the given name.""" + mgr = state_manager.StateManager(target_folder_path) + mgr.create_partition_yaml(target_file_path, yaml_config) + + def get_validation(validation_name, config_dir=None): """Return validation YAML for a specific connection.""" if config_dir: diff --git a/data_validation/partition_builder.py b/data_validation/partition_builder.py new file mode 100644 index 000000000..60466bbcb --- /dev/null +++ b/data_validation/partition_builder.py @@ -0,0 +1,277 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import logging +import numpy +from typing import List, Dict +from argparse import Namespace + +from data_validation import cli_tools, consts +from data_validation.config_manager import ConfigManager +from data_validation.query_builder.partition_row_builder import PartitionRowBuilder +from data_validation.validation_builder import ValidationBuilder + + +class PartitionBuilder: + def __init__(self, config_managers: List[ConfigManager], args: Namespace) -> None: + + self.config_managers = config_managers + self.table_count = len(config_managers) + self.args = args + self.config_dir = self._get_arg_config_dir() + self.primary_key = self._get_primary_key() + self.partition_key = self._get_arg_partition_key() + + def _get_arg_config_dir(self) -> str: + """Return String yaml config folder path.""" + if not self.args.config_dir: + raise ValueError("YAML Config Dir Path was not supplied.") + + return self.args.config_dir + + def _get_primary_key(self) -> str: + """Return the first Primary Key""" + # Filter for only first primary key (multi-pk filter not supported) + primary_keys = cli_tools.get_arg_list(self.args.primary_keys) + primary_key = primary_keys[0] + return primary_key + + def _get_arg_partition_key(self) -> str: + """Return Partition Key. If not supplied, defaults to Primary Key""" + if not self.args.partition_key: + logging.warning( + "Partition key cannot be found. Will default to Primary key" + ) + return self.primary_key + + return self.args.partition_key + + def _get_yaml_from_config(self, config_manager: ConfigManager) -> Dict: + """Return dict objects formatted for yaml validations. + + Args: + config_managers (list[ConfigManager]): List of config manager instances. + """ + yaml_config = { + consts.YAML_SOURCE: self.args.source_conn, + consts.YAML_TARGET: self.args.target_conn, + consts.YAML_RESULT_HANDLER: config_manager.result_handler_config, + consts.YAML_VALIDATIONS: [config_manager.get_yaml_validation_block()], + } + return yaml_config + + def partition_configs(self) -> None: + """Takes a list of ConfigManager object and splits each it into multiple + ConfigManager objects applying supplied partition logic. + + Returns: + None + """ + + # Default partition logic: Partition key + # Add necessary checks and routes when support for hashmod or partitionkeymod is added + partition_filters = self._get_partition_key_filters() + yaml_configs_list = self._add_partition_filters(partition_filters) + self._store_partitions(yaml_configs_list) + + def _get_partition_key_filters(self) -> List[List[str]]: + """Generate Partition filters for primary_key type partition for all + Configs/Tables. + + Returns: + A list of lists of partition filters for each table + """ + + master_filter_list = [] + + for config_manager in self.config_managers: + + validation_builder = ValidationBuilder(config_manager) + + source_partition_row_builder = PartitionRowBuilder( + self.partition_key, + config_manager.source_client, + config_manager.source_schema, + config_manager.source_table, + validation_builder.source_builder, + ) + + target_partition_row_builder = PartitionRowBuilder( + self.partition_key, + config_manager.target_client, + config_manager.target_schema, + config_manager.target_table, + validation_builder.target_builder, + ) + + # Get Source and Target row Count + source_count_query = source_partition_row_builder.get_count_query() + source_count = source_count_query.execute() + + target_count_query = target_partition_row_builder.get_count_query() + target_count = target_count_query.execute() + + if source_count != target_count: + logging.warning( + "Source and Target table row counts do not match," + "proceeding with max(source_count, target_count)" + ) + row_count = max(source_count, target_count) + + # If supplied partition_num is greater than count(*) coalesce it + if self.args.partition_num > row_count: + partition_count = row_count + logging.warning( + "Supplied partition num is greater than row count, " + "truncating it to row count" + ) + else: + partition_count = self.args.partition_num + + # Get Source and Target Primary key Min + source_min_query = source_partition_row_builder.get_min_query() + source_min = source_min_query.execute() + + target_min_query = target_partition_row_builder.get_min_query() + target_min = target_min_query.execute() + + # If Primary key is non numeric, raise Type Error + accepted_data_types = [int, numpy.int32, numpy.int64] + if not ( + type(source_min) in accepted_data_types + and type(target_min) in accepted_data_types + ): + raise TypeError( + f"Supplied Partition key is not of type Numeric: " + f"{self.partition_key}" + ) + + if source_min != target_min: + logging.warning( + "min(partition_key) for Source and Target tables do not" + "match, proceeding with min(source_min, target_min)" + ) + lower_bound = min(source_min, target_min) + + # Get Source and Target Primary key Max + source_max_query = source_partition_row_builder.get_max_query() + source_max = source_max_query.execute() + + target_max_query = target_partition_row_builder.get_max_query() + target_max = target_max_query.execute() + + if source_min != target_min: + logging.warning( + "max(partition_key) for Source and Target tables do not" + "match, proceeding with max(source_max, target_max)" + ) + + upper_bound = max(source_max, target_max) + + filter_list = [] # Store partition filters per config/table + i = 0 + marker = lower_bound + partition_step = (upper_bound - lower_bound) // partition_count + while i < partition_count: + lower_val = marker + upper_val = marker + partition_step + + if i == partition_count - 1: + upper_val = upper_bound + 1 + + partition_filter = ( + f"{self.partition_key} >= {lower_val} " + f"and {self.partition_key} < {upper_val}" + ) + filter_list.append(partition_filter) + + i += 1 + marker += partition_step + + master_filter_list.append(filter_list) + + return master_filter_list + + def _add_partition_filters( + self, + partition_filters: List[List[str]], + ) -> List[Dict]: + """Add Partition Filters to ConfigManager and return a list of dict + ConfigManager objects. + + Args: + partition_filters (List[List[str]]): List of List of Partition filters + for all Table/ConfigManager objects + + Returns: + yaml_configs_list (List[Dict]): List of YAML configs for all tables + """ + + table_count = len(self.config_managers) + yaml_configs_list = [None] * table_count + for ind in range(table_count): + config_manager = self.config_managers[ind] + filter_list = partition_filters[ind] + + yaml_configs_list[ind] = { + "target_folder_name": config_manager.full_source_table, + "partitions": [], + } + for pos in range(len(filter_list)): + filter_dict = { + "type": "custom", + "source": filter_list[pos], + "target": filter_list[pos], + } + # Append partition new filter + config_manager.filters.append(filter_dict) + + # Build and append partition YAML + yaml_config = self._get_yaml_from_config(config_manager) + target_file_name = "0" * (4 - len(str(pos))) + str(pos) + ".yaml" + yaml_configs_list[ind]["partitions"].append( + {"target_file_name": target_file_name, "yaml_config": yaml_config} + ) + + # Pop last partition filter + config_manager.filters.pop() + + return yaml_configs_list + + def _store_partitions(self, yaml_configs_list: List[Dict]) -> None: + """Save Partitions to target folder + + Args: + yaml_configs_list (List[Dict]): List of YAML configs for all tables + + Returns: + None + """ + logging.info(f"Writing table partition configs to directory: {self.config_dir}") + + for table in yaml_configs_list: + target_folder_name = table["target_folder_name"] + target_folder_path = os.path.join(self.config_dir, target_folder_name) + for partition in table["partitions"]: + yaml_config = partition["yaml_config"] + target_file_name = partition["target_file_name"] + target_file_path = os.path.join(target_folder_path, target_file_name) + cli_tools.store_partition( + target_file_path, yaml_config, target_folder_path + ) + + logging.info( + f"Success! Table partition configs written to directory: {self.config_dir}" + ) diff --git a/data_validation/query_builder/partition_row_builder.py b/data_validation/query_builder/partition_row_builder.py new file mode 100644 index 000000000..0ee4337a2 --- /dev/null +++ b/data_validation/query_builder/partition_row_builder.py @@ -0,0 +1,73 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import ibis +from data_validation import clients +from data_validation.query_builder.query_builder import QueryBuilder + + +class PartitionRowBuilder(object): + def __init__( + self, + partition_key: str, + data_client: ibis.client, + schema_name: str, + table_name: str, + query_builder: QueryBuilder, + ) -> None: + """Build a PartitionRowBuilder object which is ready to build a partition row filter query. + + Args: + partition_key (str): Partition key used to generate Partitions. + data_client (IbisClient): The client used to query random rows. + schema_name (String): The name of the schema for the given table. + table_name (String): The name of the table to query. + query_builder (QueryBuilder): QueryBuilder object. + """ + self.partition_key = partition_key + self.query = self._compile_query( + data_client, schema_name, table_name, query_builder + ) + + def _compile_query( + self, + data_client: ibis.client, + schema_name: str, + table_name: str, + query_builder: QueryBuilder, + ) -> ibis.Expr: + """Return an Ibis query object + + Args: + data_client (IbisClient): The client used to query random rows. + schema_name (String): The name of the schema for the given table. + table_name (String): The name of the table to query. + query_builder (QueryBuilder): QueryBuilder object. + """ + table = clients.get_ibis_table(data_client, schema_name, table_name) + compiled_filters = query_builder.compile_filter_fields(table) + filtered_table = table.filter(compiled_filters) if compiled_filters else table + return filtered_table + + def get_max_query(self) -> ibis.Expr: + """Return an Ibis query object to get Max of Primary Key column""" + return self.query[self.partition_key].max() + + def get_min_query(self) -> ibis.Expr: + """Return an Ibis query object to get Min of Primary Key column""" + return self.query[self.partition_key].min() + + def get_count_query(self) -> ibis.Expr: + """Return an Ibis query object to get count of Primary Key column""" + return self.query[self.partition_key].count() diff --git a/data_validation/state_manager.py b/data_validation/state_manager.py index 587776d5f..5be76b9bd 100644 --- a/data_validation/state_manager.py +++ b/data_validation/state_manager.py @@ -113,6 +113,17 @@ def create_validation_yaml(self, name: str, yaml_config: Dict[str, str]): yaml_config_str = dump(yaml_config, Dumper=Dumper) self._write_file(validation_path, yaml_config_str) + def create_partition_yaml(self, target_file_path: str, yaml_config: Dict[str, str]): + """Create a validation file and store the given config as YAML. + + Args: + name (String): The name of the validation. + yaml_config (Dict): A dictionary with the validation details. + """ + partition_path = self._get_partition_path(target_file_path) + yaml_config_str = dump(yaml_config, Dumper=Dumper) + self._write_partition(partition_path, yaml_config_str) + def get_validation_config(self, name: str, config_dir=None) -> Dict[str, str]: """Get a validation configuration from the expected file. @@ -159,6 +170,16 @@ def _get_validation_path(self, name: str) -> str: """ return os.path.join(self._get_validations_directory(), f"{name}") + def _get_partition_path(self, name: str) -> str: + """Returns the full path to a validation. + + Args: + name: The name of the validation. + """ + if self.file_system == FileSystem.LOCAL: + return os.path.join("./", name) + return name + def _read_file(self, file_path: str) -> str: if self.file_system == FileSystem.GCS: return self._read_gcs_file(file_path) @@ -174,6 +195,13 @@ def _write_file(self, file_path: str, data: str): logging.info("Success! Config output written to {}".format(file_path)) + def _write_partition(self, file_path: str, data: str): + if self.file_system == FileSystem.GCS: + self._write_gcs_file(file_path, data) + else: + with open(file_path, "w") as file: + file.write(data) + def _list_directory(self, directory_path: str) -> List[str]: if self.file_system == FileSystem.GCS: return self._list_gcs_directory(directory_path) diff --git a/docs/examples.md b/docs/examples.md index 528792c47..9dc9de731 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -30,6 +30,12 @@ Above command creates a YAML file named citibike.yaml that can be used to run va data-validation run-config -c citibike.yaml ```` Above command executes validations stored in a config file named citibike.yaml. + +#### Generate partitions and save as multiple configuration files +````shell script +data-validation generate-table-partitions -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --primary-keys station_id --hash '*' --filters 'station_id>3000' -cdir partitions_dir --partition-key station_id --partition-num 200 +```` +Above command creates multiple partitions based on `--partition-key`. Number of generated configuration files is decided by `--partition-num` #### Run COUNT validations for all columns ````shell script diff --git a/tests/unit/test_partition_builder.py b/tests/unit/test_partition_builder.py new file mode 100644 index 000000000..0778489d4 --- /dev/null +++ b/tests/unit/test_partition_builder.py @@ -0,0 +1,475 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil +import pytest +import json +import random +from datetime import datetime, timedelta + +from data_validation import cli_tools +from data_validation import consts +from data_validation.config_manager import ConfigManager + +SOURCE_TABLE_FILE_PATH = "source_table_data.json" +TARGET_TABLE_FILE_PATH = "target_table_data.json" + +STRING_CONSTANT = "constant" +RANDOM_STRINGS = ["a", "b", "c", "d"] + +SOURCE_CONN_CONFIG = { + "source_type": "FileSystem", + "table_name": "my_table", + "file_path": SOURCE_TABLE_FILE_PATH, + "file_type": "json", +} + +TARGET_CONN_CONFIG = { + "source_type": "FileSystem", + "table_name": "my_table", + "file_path": TARGET_TABLE_FILE_PATH, + "file_type": "json", +} + +TEST_CONN = "{'source_type':'Example'}" +PARTITION_NUM = 3 +PARTITIONS_DIR = "test_partitions_dir" + +CLI_ARGS_JSON_SOURCE = [ + "generate-table-partitions", + "-sc", + TEST_CONN, + "-tc", + TEST_CONN, + "-tbls", + "test_table", + "--primary-keys", + "station_id", + "--hash", + "*", + "--filter-status", + "fail", + "--filters", + "station_id>3000", + "-cdir", + PARTITIONS_DIR, + "--partition-num", + f"{PARTITION_NUM}", + "--partition-key", + "id", +] + +# partition_key is not passed +CLI_ARGS_ABSENT_PARTITION_KEY = [ + "generate-table-partitions", + "-sc", + TEST_CONN, + "-tc", + TEST_CONN, + "-tbls", + "bigquery-public-data.new_york_citibike.citibike_stations,bigquery-public-data.new_york_citibike.citibike_stations", + "--primary-keys", + "station_id", + "--hash", + "*", + "--filter-status", + "fail", + "-cdir", + PARTITIONS_DIR, + "--partition-num", + f"{PARTITION_NUM}", + "--labels", + "name=test_run", +] + +PARTITION_FILTERS_LIST = [ + "id >= 0 and id < 333", + "id >= 333 and id < 666", + "id >= 666 and id < 1001", +] + +YAML_CONFIGS_LIST = [ + { + "target_folder_name": "test_table", + "partitions": [ + { + "target_file_name": "0000.yaml", + "yaml_config": { + "source": "{'source_type':'Example'}", + "target": "{'source_type':'Example'}", + "result_handler": {}, + "validations": [ + { + "type": "Row", + "schema_name": None, + "table_name": "test_table", + "target_schema_name": None, + "target_table_name": "test_table", + "primary_keys": [ + { + "field_alias": "id", + "source_column": "id", + "target_column": "id", + "cast": None, + } + ], + "comparison_fields": [ + { + "field_alias": "int_value", + "source_column": "int_value", + "target_column": "int_value", + "cast": None, + }, + { + "field_alias": "text_value", + "source_column": "text_value", + "target_column": "text_value", + "cast": None, + }, + ], + "format": "table", + "filter_status": None, + "filters": [ + { + "type": "custom", + "source": "id >= 0 and id < 333", + "target": "id >= 0 and id < 333", + } + ], + } + ], + }, + }, + { + "target_file_name": "0001.yaml", + "yaml_config": { + "source": "{'source_type':'Example'}", + "target": "{'source_type':'Example'}", + "result_handler": {}, + "validations": [ + { + "type": "Row", + "schema_name": None, + "table_name": "test_table", + "target_schema_name": None, + "target_table_name": "test_table", + "primary_keys": [ + { + "field_alias": "id", + "source_column": "id", + "target_column": "id", + "cast": None, + } + ], + "comparison_fields": [ + { + "field_alias": "int_value", + "source_column": "int_value", + "target_column": "int_value", + "cast": None, + }, + { + "field_alias": "text_value", + "source_column": "text_value", + "target_column": "text_value", + "cast": None, + }, + ], + "format": "table", + "filter_status": None, + "filters": [ + { + "type": "custom", + "source": "id >= 333 and id < 666", + "target": "id >= 333 and id < 666", + } + ], + } + ], + }, + }, + { + "target_file_name": "0002.yaml", + "yaml_config": { + "source": "{'source_type':'Example'}", + "target": "{'source_type':'Example'}", + "result_handler": {}, + "validations": [ + { + "type": "Row", + "schema_name": None, + "table_name": "test_table", + "target_schema_name": None, + "target_table_name": "test_table", + "primary_keys": [ + { + "field_alias": "id", + "source_column": "id", + "target_column": "id", + "cast": None, + } + ], + "comparison_fields": [ + { + "field_alias": "int_value", + "source_column": "int_value", + "target_column": "int_value", + "cast": None, + }, + { + "field_alias": "text_value", + "source_column": "text_value", + "target_column": "text_value", + "cast": None, + }, + ], + "format": "table", + "filter_status": None, + "filters": [ + { + "type": "custom", + "source": "id >= 666 and id < 1001", + "target": "id >= 666 and id < 1001", + } + ], + } + ], + }, + }, + ], + } +] + + +@pytest.fixture +def module_under_test(): + import data_validation.partition_builder + + return data_validation.partition_builder + + +def teardown_module(module): + # Clean up: Delete test partitions directory and its contents + folder_path = os.path.join("./", PARTITIONS_DIR) + if os.path.exists(folder_path): + shutil.rmtree(folder_path) + + +def _generate_fake_data( + rows=10, initial_id=0, second_range=60 * 60 * 24, int_range=100, random_strings=None +): + """Return a list of dicts with given number of rows. + + Data Keys: + id: a unique int per row + timestamp_value: a random timestamp in the past {second_range} back + date_value: a random date in the past {second_range} back + int_value: a random int value inside 0 to {int_range} + text_value: a random string from supplied list + """ + data = [] + random_strings = random_strings or RANDOM_STRINGS + for i in range(initial_id, initial_id + rows): + rand_seconds = random.randint(0, second_range) + rand_timestamp = datetime.now() - timedelta(seconds=rand_seconds) + rand_date = rand_timestamp.date() + + row = { + "id": i, + "date_value": rand_date, + "timestamp_value": rand_timestamp, + "int_value": random.randint(0, int_range), + "text_constant": STRING_CONSTANT, + "text_numeric": "2", + "text_value": random.choice(random_strings), + "text_value_two": random.choice(random_strings), + } + data.append(row) + + return data + + +def _generate_config_manager(table_name: str = "my_table") -> ConfigManager: + """Returns a Dummy ConfigManager Object""" + + row_config = { + # BigQuery Specific Connection Config + "source_conn": SOURCE_CONN_CONFIG, + "target_conn": TARGET_CONN_CONFIG, + # Validation Type + consts.CONFIG_TYPE: consts.ROW_VALIDATION, + # Configuration Required Depending on Validator Type + "schema_name": None, + "table_name": table_name, + "target_schema_name": None, + "target_table_name": table_name, + consts.CONFIG_PRIMARY_KEYS: [ + { + consts.CONFIG_FIELD_ALIAS: "id", + consts.CONFIG_SOURCE_COLUMN: "id", + consts.CONFIG_TARGET_COLUMN: "id", + consts.CONFIG_CAST: None, + }, + ], + consts.CONFIG_COMPARISON_FIELDS: [ + { + consts.CONFIG_FIELD_ALIAS: "int_value", + consts.CONFIG_SOURCE_COLUMN: "int_value", + consts.CONFIG_TARGET_COLUMN: "int_value", + consts.CONFIG_CAST: None, + }, + { + consts.CONFIG_FIELD_ALIAS: "text_value", + consts.CONFIG_SOURCE_COLUMN: "text_value", + consts.CONFIG_TARGET_COLUMN: "text_value", + consts.CONFIG_CAST: None, + }, + ], + consts.CONFIG_RESULT_HANDLER: None, + consts.CONFIG_FORMAT: "table", + consts.CONFIG_FILTER_STATUS: None, + consts.CONFIG_FILTERS: [], + } + return ConfigManager(row_config) + + +def _get_fake_json_data(data): + for row in data: + row["date_value"] = str(row["date_value"]) + row["timestamp_value"] = str(row["timestamp_value"]) + row["text_constant"] = row["text_constant"] + row["text_numeric"] = row["text_numeric"] + row["text_value"] = row["text_value"] + row["text_value_two"] = row["text_value_two"] + + return json.dumps(data) + + +def _create_table_file(table_path, data): + """Create JSON File""" + with open(table_path, "w") as f: + f.write(data) + + +def test_import(module_under_test): + assert module_under_test is not None + + +def test_class_object_creation(module_under_test): + """Create a PartitionBuilder object passing 2 tables and assert the following + 1. Table/Configs count + 2. config_dir value present and absent + 3. primary_key value + 4. partition_key value present and absent + """ + mock_config_manager = _generate_config_manager("test_table") + config_managers = [mock_config_manager] + + parser = cli_tools.configure_arg_parser() + + # partition_key is present and different from primary_key + # config_dir is passed + args = parser.parse_args(CLI_ARGS_JSON_SOURCE) + builder = module_under_test.PartitionBuilder(config_managers, args) + assert builder.table_count == len(config_managers) + assert builder.partition_key == "id" + assert builder.primary_key == "station_id" + + # partition_key is absent, expected to default to primary_key + args = parser.parse_args(CLI_ARGS_ABSENT_PARTITION_KEY) + builder = module_under_test.PartitionBuilder(config_managers, args) + assert builder.table_count == len(config_managers) + assert builder.partition_key == builder.primary_key + + +def test_get_partition_key_filters(module_under_test): + """Build partitions filters and assert: + 1. Table count + 2. Filters count + 3. Partition filters + """ + data = _generate_fake_data(rows=1001, second_range=0) + + source_json_data = _get_fake_json_data(data) + target_json_data = _get_fake_json_data(data) + + _create_table_file(SOURCE_TABLE_FILE_PATH, source_json_data) + _create_table_file(TARGET_TABLE_FILE_PATH, target_json_data) + + config_manager = _generate_config_manager("my_table") + config_managers = [config_manager] + + parser = cli_tools.configure_arg_parser() + mock_args = parser.parse_args(CLI_ARGS_JSON_SOURCE) + + expected_partition_filters_list = PARTITION_FILTERS_LIST + + builder = module_under_test.PartitionBuilder(config_managers, mock_args) + partition_filters_list = builder._get_partition_key_filters() + assert len(partition_filters_list) == len(config_managers) + assert len(partition_filters_list[0]) == mock_args.partition_num + assert partition_filters_list[0] == expected_partition_filters_list + + +def test_add_partition_filters_to_config(module_under_test): + """Add partition filters to ConfigManager object, build YAML config list + and assert YAML configs + """ + # Generate dummy YAML configs list + config_manager = _generate_config_manager("test_table") + config_managers = [config_manager] + + parser = cli_tools.configure_arg_parser() + mock_args = parser.parse_args(CLI_ARGS_JSON_SOURCE) + + expected_yaml_configs_list = YAML_CONFIGS_LIST + + partition_filters = PARTITION_FILTERS_LIST + master_filter_list = [partition_filters] + + # Create PartitionBuilder object and get YAML configs list + builder = module_under_test.PartitionBuilder(config_managers, mock_args) + yaml_configs_list = builder._add_partition_filters(master_filter_list) + assert yaml_configs_list == expected_yaml_configs_list + + +def test_store_yaml_partitions_local(module_under_test): + """Store all the Partition YAMLs for a table to specified local directory""" + + # Generate dummy YAML configs list + config_manager = _generate_config_manager("test_table") + config_managers = [config_manager] + + parser = cli_tools.configure_arg_parser() + mock_args = parser.parse_args(CLI_ARGS_JSON_SOURCE) + + # Dummy YAML configs list + yaml_configs_list = YAML_CONFIGS_LIST + + # Create test partitions directory to store results + folder_path = os.path.join("./", PARTITIONS_DIR) + if not os.path.exists(folder_path): + os.mkdir(folder_path) + + # Store YAML partition configs to local directory + builder = module_under_test.PartitionBuilder(config_managers, mock_args) + builder._store_partitions(yaml_configs_list) + + # Assert file count for 1 table and sample file names + partition_dir_contents = os.listdir(os.path.join(PARTITIONS_DIR, "test_table")) + + assert len(partition_dir_contents) == PARTITION_NUM + assert "0000.yaml" in partition_dir_contents + assert "0002.yaml" in partition_dir_contents