diff --git a/README.md b/README.md index 533ca31b5..f71bb9440 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ The Data Validation Tool (DVT) provides an automated and repeatable solution to perform this task. DVT supports the following validations: -* Column validation (count, sum, avg, min, max, group by) +* Column validation (count, sum, avg, min, max, stddev, group by) * Row validation (Not supported for FileSystem connections) * Schema validation * Custom Query validation @@ -106,6 +106,7 @@ data-validation (--verbose or -v) (--log-level or -ll) validate column [--min COLUMNS] Comma separated list of columns for min or * for all numeric [--max COLUMNS] Comma separated list of columns for max or * for all numeric [--avg COLUMNS] Comma separated list of columns for avg or * for all numeric + [--std COLUMNS] Comma separated list of columns for stddev_samp or * for all numeric [--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE] BigQuery destination for validation results. Defaults to stdout. See: *Validation Reports* section @@ -309,6 +310,7 @@ data-validation (--verbose or -v) (--log-level or -ll) validate custom-query col [--min COLUMNS] Comma separated list of columns for min or * for all numeric [--max COLUMNS] Comma separated list of columns for max or * for all numeric [--avg COLUMNS] Comma separated list of columns for avg or * for all numeric + [--std COLUMNS] Comma separated list of columns for stddev_samp or * for all numeric [--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE] BigQuery destination for validation results. Defaults to stdout. See: *Validation Reports* section @@ -518,8 +520,8 @@ Functions, and other deployment services. ### Aggregated Fields Aggregate fields contain the SQL fields that you want to produce an aggregate -for. Currently the functions `COUNT()`, `AVG()`, `SUM()`, `MIN()`, and `MAX()` -are supported. +for. Currently the functions `COUNT()`, `AVG()`, `SUM()`, `MIN()`, `MAX()`, +and `STDDEV_SAMP()` are supported. Here is a sample aggregate config: ```yaml diff --git a/data_validation/__main__.py b/data_validation/__main__.py index 845e3b09a..6eb276349 100644 --- a/data_validation/__main__.py +++ b/data_validation/__main__.py @@ -114,6 +114,11 @@ def get_aggregate_config(args, config_manager: ConfigManager): aggregate_configs += config_manager.build_config_column_aggregates( "bit_xor", col_args, supported_data_types, cast_to_bigint=cast_to_bigint ) + if args.std: + col_args = None if args.std == "*" else cli_tools.get_arg_list(args.std) + aggregate_configs += config_manager.build_config_column_aggregates( + "std", col_args, supported_data_types, cast_to_bigint=cast_to_bigint + ) return aggregate_configs diff --git a/data_validation/cli_tools.py b/data_validation/cli_tools.py index 884371c47..292a6c544 100644 --- a/data_validation/cli_tools.py +++ b/data_validation/cli_tools.py @@ -545,6 +545,11 @@ def _configure_column_parser(column_parser): "-bit_xor", help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns", ) + optional_arguments.add_argument( + "--std", + "-std", + help="Comma separated list of columns for standard deviation 'col_a,col_b' or * for all columns", + ) optional_arguments.add_argument( "--grouped-columns", "-gc", @@ -785,6 +790,11 @@ def _configure_custom_query_column_parser(custom_query_column_parser): "-bit_xor", help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns", ) + optional_arguments.add_argument( + "--std", + "-std", + help="Comma separated list of columns for standard deviation 'col_a,col_b' or * for all columns", + ) optional_arguments.add_argument( "--wildcard-include-string-len", "-wis", diff --git a/data_validation/query_builder/query_builder.py b/data_validation/query_builder/query_builder.py index 72c02baf3..604a94c0c 100644 --- a/data_validation/query_builder/query_builder.py +++ b/data_validation/query_builder/query_builder.py @@ -96,6 +96,15 @@ def bit_xor(field_name=None, alias=None, cast=None): cast=cast, ) + @staticmethod + def std(field_name=None, alias=None, cast=None): + return AggregateField( + ibis.expr.types.NumericColumn.std, + field_name=field_name, + alias=alias, + cast=cast, + ) + def compile(self, ibis_table): if self.field_name: agg_field = self.expr(ibis_table[self.field_name]) diff --git a/docs/internal/partition_table_prd.md b/docs/internal/partition_table_prd.md index 2b99fa144..fce3149db 100644 --- a/docs/internal/partition_table_prd.md +++ b/docs/internal/partition_table_prd.md @@ -1,9 +1,9 @@ # Partition Table ## Why Partition ? -Data Validation Tool performs row validation by comparing every row in the source database with the corresponding row in the target database. Since the comparison is done in memory, all the rows have to be in memory. Databases typically have a large number of rows that don't all fit in memory, therefore Data Validation Tool can run into MemoryError and fail. One way to address this error is to partitition the source and target table into several corresponding partititions. Then validate the corresponding source and target partitions one at a time (either sequentially or in parallel). If the validation is performed in parallel on multiple VMs or containers, this approach has the added benefit of speeding up data validation. There are other approaches available to address the MemoryError - see future work below. Column validation and schema validation do not bring each row of the table into memory. Therefore, they do not run into MemoryErrors. +Data Validation Tool performs row validation by comparing every row in the source database with the corresponding row in the target database. Since the comparison is done in memory, all the rows have to be in memory. Databases typically have a large number of rows that don't all fit in memory, therefore Data Validation Tool can run into MemoryError and fail. One way to address this error is to partition the source and target table into several corresponding partitions. Then validate the corresponding source and target partitions one at a time (either sequentially or in parallel). If the validation is performed in parallel on multiple VMs or containers, this approach has the added benefit of speeding up data validation. There are other approaches available to address the MemoryError - see future work below. Column validation and schema validation do not bring each row of the table into memory. Therefore, they do not run into MemoryErrors. -## How to paritition? +## How to partition? Data Validation Tool matches rows based on the specified primary key(s). If we split up the table into small enough partitions, then each partition can be validated without running into MemoryError. This can be depicted pictorially as shown below: ![Alt text](./partition_picture.png?raw=true "Title") Here, both tables are sorted by primary key(s). The blue line denotes the first row of each partition. With tables partitioned as shown, the complete tables can be row validated by concatenating the results of validating each partition source table against the corresponding partition of the target table. @@ -30,7 +30,7 @@ SELECT ORDER BY row_num ASC; ``` ### How to generate the where clauses -Once we have the first row of each partition, we have to generate the where clauses for each partition in the source and target tables. The best way may be to generate the ibis table expression including the provided filter clause and the additional filter clause from the first rows we have calculated. We can then have _ibis_ `to_sql` convert the table expression into plain text, extract the where clause and use that. _ibis_ depends on _sqlalchemy_, which has a bug in that it does not support rendering date and timestamps by `to_sql` for versions of _sqlalchemy_ prior to 2.0. Until. we migrate to using _sqlalchemy_ 2.0, we may not be able to support dates and timestamps as a primary key column. +Once we have the first row of each partition, we have to generate the where clauses for each partition in the source and target tables. The best way may be to generate the ibis table expression including the provided filter clause and the additional filter clause from the first rows we have calculated. We can then have _ibis_ `to_sql` convert the table expression into plain text, extract the where clause and use that. _ibis_ depends on _sqlalchemy_, which has a bug in that it does not support rendering date and timestamps by `to_sql` for versions of _sqlalchemy_ prior to 2.0. Until we migrate to using _sqlalchemy_ 2.0, we may not be able to support dates and timestamps as a primary key column. ## Future Work ### How many partitions do I need? Partition table requires that the user decide on the number of partitions into which they need to divide the table to avoid MemoryError. Data Validation Tool can run on different VMs with different shapes, so the number of partitions depends on the amount of memory available. How does the user figure out the number of partitions they need? Right now, it is by trial and error, say start with 10, then try 100, try to see if 50 still results in MemoryError etc. This is not optimal. Python's `psutil` package has a function [virtual_memory()](https://psutil.readthedocs.io/en/latest/#psutil.virtual_memory) which tell us the total and available memory. `generate-table-partitions` is provided with all the parameters used in `validate row`, and the memory grows linearly to the number of rows being validated. `generate-table-partitions` can bring say 10,000 rows into memory as though performing a row validation. Using the virtual_memory() function in `psutil`, `generate-table-partitions` can estimate the number of rows that will fit in memory for row validation. Since we can calculate the total number of rows, we can estimate the number of partitions needed. This may need some experimentation, as we may need to allow for memory usage by other functions/objects in Data Validation Tool. diff --git a/tests/system/data_sources/test_bigquery.py b/tests/system/data_sources/test_bigquery.py index ecd7ceeb2..7349c8400 100644 --- a/tests/system/data_sources/test_bigquery.py +++ b/tests/system/data_sources/test_bigquery.py @@ -66,6 +66,12 @@ consts.CONFIG_TARGET_COLUMN: "birth_year", consts.CONFIG_FIELD_ALIAS: "min_birth_year", }, + { + consts.CONFIG_TYPE: "std", + consts.CONFIG_SOURCE_COLUMN: "tripduration", + consts.CONFIG_TARGET_COLUMN: "tripduration", + consts.CONFIG_FIELD_ALIAS: "std_tripduration", + }, ], consts.CONFIG_FORMAT: "table", consts.CONFIG_FILTER_STATUS: None, @@ -215,19 +221,6 @@ consts.CONFIG_FILTER_STATUS: None, } -CONFIG_SCHEMA_VALIDATION = { - # BigQuery Specific Connection Config - consts.CONFIG_SOURCE_CONN: BQ_CONN, - consts.CONFIG_TARGET_CONN: BQ_CONN, - # Validation Type - consts.CONFIG_TYPE: "Schema", - # Configuration Required Depending on Validator Type - consts.CONFIG_SCHEMA_NAME: "bigquery-public-data.new_york_citibike", - consts.CONFIG_TABLE_NAME: "citibike_trips", - consts.CONFIG_FORMAT: "table", - consts.CONFIG_FILTER_STATUS: None, -} - BQ_CONN_NAME = "bq-integration-test" CLI_CONFIG_FILE = "example_test.yaml" @@ -427,12 +420,16 @@ def test_count_validator(): min_birth_year_value = df[df["validation_name"] == "min_birth_year"][ "source_agg_value" ].values[0] + std_tripduration_value = df[df["validation_name"] == "std_tripduration"][ + "source_agg_value" + ].values[0] assert float(count_value) > 0 assert float(count_tripduration_value) > 0 assert float(avg_tripduration_value) > 0 assert float(max_birth_year_value) > 0 assert float(min_birth_year_value) > 0 + assert float(std_tripduration_value) > 0 assert ( df["source_agg_value"].astype(float).sum() == df["target_agg_value"].astype(float).sum() @@ -465,14 +462,6 @@ def test_numeric_types(): ) -def test_schema_validation(): - validator = data_validation.DataValidation(CONFIG_SCHEMA_VALIDATION, verbose=True) - df = validator.execute() - - for validation in df.to_dict(orient="records"): - assert validation["validation_status"] == consts.VALIDATION_STATUS_SUCCESS - - def test_cli_store_yaml_then_run_gcs(): """Test storing and retrieving validation YAML when GCS env var is set.""" # Store BQ Connection diff --git a/third_party/ibis/ibis_db2/registry.py b/third_party/ibis/ibis_db2/registry.py index 80e832022..592834c74 100644 --- a/third_party/ibis/ibis_db2/registry.py +++ b/third_party/ibis/ibis_db2/registry.py @@ -517,7 +517,6 @@ def _day_of_week_name(t, op): ops.Min: _reduction('min'), ops.Max: _reduction('max'), ops.Variance: variance_reduction('var', suffix={'sample': '', 'pop': 'p'}), - ops.StandardDev: variance_reduction('stdev', suffix={'sample': '', 'pop': 'p'}), ops.RandomScalar: _random, ops.TimestampNow: lambda *args: sa.func.timezone('UTC', sa.func.now()), ops.CumulativeAll: unary(sa.func.bool_and), diff --git a/third_party/ibis/ibis_teradata/datatypes.py b/third_party/ibis/ibis_teradata/datatypes.py index 486df69cc..03bdcf0a8 100644 --- a/third_party/ibis/ibis_teradata/datatypes.py +++ b/third_party/ibis/ibis_teradata/datatypes.py @@ -197,7 +197,7 @@ def trans_float64(t, context): @ibis_type_to_teradata_type.register(dt.Integer, TypeTranslationContext) def trans_integer(t, context): - return "INT64" + return "BIGINT" @ibis_type_to_teradata_type.register(dt.UInt64, (TypeTranslationContext, UDFContext)) @@ -224,4 +224,4 @@ def trans_type(t, context): @ibis_type_to_teradata_type.register(dt.Decimal, TypeTranslationContext) def trans_numeric(t, context): - return "NUMERIC" + return "DECIMAL"