Skip to content

Commit

Permalink
feat: Support standard deviation for column agg (#964)
Browse files Browse the repository at this point in the history
* feat: support standard deviation for column agg

* feat: lint

* docs: fix typo

* fix: teradata cast to decimal instead of invalid numeric
  • Loading branch information
nehanene15 committed Aug 31, 2023
1 parent c53f2fc commit bb81701
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 30 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The Data Validation Tool (DVT) provides an automated and repeatable solution to
perform this task.

DVT supports the following validations:
* Column validation (count, sum, avg, min, max, group by)
* Column validation (count, sum, avg, min, max, stddev, group by)
* Row validation (Not supported for FileSystem connections)
* Schema validation
* Custom Query validation
Expand Down Expand Up @@ -106,6 +106,7 @@ data-validation (--verbose or -v) (--log-level or -ll) validate column
[--min COLUMNS] Comma separated list of columns for min or * for all numeric
[--max COLUMNS] Comma separated list of columns for max or * for all numeric
[--avg COLUMNS] Comma separated list of columns for avg or * for all numeric
[--std COLUMNS] Comma separated list of columns for stddev_samp or * for all numeric
[--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE]
BigQuery destination for validation results. Defaults to stdout.
See: *Validation Reports* section
Expand Down Expand Up @@ -309,6 +310,7 @@ data-validation (--verbose or -v) (--log-level or -ll) validate custom-query col
[--min COLUMNS] Comma separated list of columns for min or * for all numeric
[--max COLUMNS] Comma separated list of columns for max or * for all numeric
[--avg COLUMNS] Comma separated list of columns for avg or * for all numeric
[--std COLUMNS] Comma separated list of columns for stddev_samp or * for all numeric
[--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE]
BigQuery destination for validation results. Defaults to stdout.
See: *Validation Reports* section
Expand Down Expand Up @@ -518,8 +520,8 @@ Functions, and other deployment services.
### Aggregated Fields

Aggregate fields contain the SQL fields that you want to produce an aggregate
for. Currently the functions `COUNT()`, `AVG()`, `SUM()`, `MIN()`, and `MAX()`
are supported.
for. Currently the functions `COUNT()`, `AVG()`, `SUM()`, `MIN()`, `MAX()`,
and `STDDEV_SAMP()` are supported.

Here is a sample aggregate config:
```yaml
Expand Down
5 changes: 5 additions & 0 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ def get_aggregate_config(args, config_manager: ConfigManager):
aggregate_configs += config_manager.build_config_column_aggregates(
"bit_xor", col_args, supported_data_types, cast_to_bigint=cast_to_bigint
)
if args.std:
col_args = None if args.std == "*" else cli_tools.get_arg_list(args.std)
aggregate_configs += config_manager.build_config_column_aggregates(
"std", col_args, supported_data_types, cast_to_bigint=cast_to_bigint
)
return aggregate_configs


Expand Down
10 changes: 10 additions & 0 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,11 @@ def _configure_column_parser(column_parser):
"-bit_xor",
help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--std",
"-std",
help="Comma separated list of columns for standard deviation 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--grouped-columns",
"-gc",
Expand Down Expand Up @@ -785,6 +790,11 @@ def _configure_custom_query_column_parser(custom_query_column_parser):
"-bit_xor",
help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--std",
"-std",
help="Comma separated list of columns for standard deviation 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--wildcard-include-string-len",
"-wis",
Expand Down
9 changes: 9 additions & 0 deletions data_validation/query_builder/query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ def bit_xor(field_name=None, alias=None, cast=None):
cast=cast,
)

@staticmethod
def std(field_name=None, alias=None, cast=None):
return AggregateField(
ibis.expr.types.NumericColumn.std,
field_name=field_name,
alias=alias,
cast=cast,
)

def compile(self, ibis_table):
if self.field_name:
agg_field = self.expr(ibis_table[self.field_name])
Expand Down
6 changes: 3 additions & 3 deletions docs/internal/partition_table_prd.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Partition Table

## Why Partition ?
Data Validation Tool performs row validation by comparing every row in the source database with the corresponding row in the target database. Since the comparison is done in memory, all the rows have to be in memory. Databases typically have a large number of rows that don't all fit in memory, therefore Data Validation Tool can run into MemoryError and fail. One way to address this error is to partitition the source and target table into several corresponding partititions. Then validate the corresponding source and target partitions one at a time (either sequentially or in parallel). If the validation is performed in parallel on multiple VMs or containers, this approach has the added benefit of speeding up data validation. There are other approaches available to address the MemoryError - see future work below. Column validation and schema validation do not bring each row of the table into memory. Therefore, they do not run into MemoryErrors.
Data Validation Tool performs row validation by comparing every row in the source database with the corresponding row in the target database. Since the comparison is done in memory, all the rows have to be in memory. Databases typically have a large number of rows that don't all fit in memory, therefore Data Validation Tool can run into MemoryError and fail. One way to address this error is to partition the source and target table into several corresponding partitions. Then validate the corresponding source and target partitions one at a time (either sequentially or in parallel). If the validation is performed in parallel on multiple VMs or containers, this approach has the added benefit of speeding up data validation. There are other approaches available to address the MemoryError - see future work below. Column validation and schema validation do not bring each row of the table into memory. Therefore, they do not run into MemoryErrors.

## How to paritition?
## How to partition?
Data Validation Tool matches rows based on the specified primary key(s). If we split up the table into small enough partitions, then each partition can be validated without running into MemoryError. This can be depicted pictorially as shown below:
![Alt text](./partition_picture.png?raw=true "Title")
Here, both tables are sorted by primary key(s). The blue line denotes the first row of each partition. With tables partitioned as shown, the complete tables can be row validated by concatenating the results of validating each partition source table against the corresponding partition of the target table.
Expand All @@ -30,7 +30,7 @@ SELECT
ORDER BY row_num ASC;
```
### How to generate the where clauses
Once we have the first row of each partition, we have to generate the where clauses for each partition in the source and target tables. The best way may be to generate the ibis table expression including the provided filter clause and the additional filter clause from the first rows we have calculated. We can then have _ibis_ `to_sql` convert the table expression into plain text, extract the where clause and use that. _ibis_ depends on _sqlalchemy_, which has a bug in that it does not support rendering date and timestamps by `to_sql` for versions of _sqlalchemy_ prior to 2.0. Until. we migrate to using _sqlalchemy_ 2.0, we may not be able to support dates and timestamps as a primary key column.
Once we have the first row of each partition, we have to generate the where clauses for each partition in the source and target tables. The best way may be to generate the ibis table expression including the provided filter clause and the additional filter clause from the first rows we have calculated. We can then have _ibis_ `to_sql` convert the table expression into plain text, extract the where clause and use that. _ibis_ depends on _sqlalchemy_, which has a bug in that it does not support rendering date and timestamps by `to_sql` for versions of _sqlalchemy_ prior to 2.0. Until we migrate to using _sqlalchemy_ 2.0, we may not be able to support dates and timestamps as a primary key column.
## Future Work
### How many partitions do I need?
Partition table requires that the user decide on the number of partitions into which they need to divide the table to avoid MemoryError. Data Validation Tool can run on different VMs with different shapes, so the number of partitions depends on the amount of memory available. How does the user figure out the number of partitions they need? Right now, it is by trial and error, say start with 10, then try 100, try to see if 50 still results in MemoryError etc. This is not optimal. Python's `psutil` package has a function [virtual_memory()](https://psutil.readthedocs.io/en/latest/#psutil.virtual_memory) which tell us the total and available memory. `generate-table-partitions` is provided with all the parameters used in `validate row`, and the memory grows linearly to the number of rows being validated. `generate-table-partitions` can bring say 10,000 rows into memory as though performing a row validation. Using the virtual_memory() function in `psutil`, `generate-table-partitions` can estimate the number of rows that will fit in memory for row validation. Since we can calculate the total number of rows, we can estimate the number of partitions needed. This may need some experimentation, as we may need to allow for memory usage by other functions/objects in Data Validation Tool.
Expand Down
31 changes: 10 additions & 21 deletions tests/system/data_sources/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@
consts.CONFIG_TARGET_COLUMN: "birth_year",
consts.CONFIG_FIELD_ALIAS: "min_birth_year",
},
{
consts.CONFIG_TYPE: "std",
consts.CONFIG_SOURCE_COLUMN: "tripduration",
consts.CONFIG_TARGET_COLUMN: "tripduration",
consts.CONFIG_FIELD_ALIAS: "std_tripduration",
},
],
consts.CONFIG_FORMAT: "table",
consts.CONFIG_FILTER_STATUS: None,
Expand Down Expand Up @@ -215,19 +221,6 @@
consts.CONFIG_FILTER_STATUS: None,
}

CONFIG_SCHEMA_VALIDATION = {
# BigQuery Specific Connection Config
consts.CONFIG_SOURCE_CONN: BQ_CONN,
consts.CONFIG_TARGET_CONN: BQ_CONN,
# Validation Type
consts.CONFIG_TYPE: "Schema",
# Configuration Required Depending on Validator Type
consts.CONFIG_SCHEMA_NAME: "bigquery-public-data.new_york_citibike",
consts.CONFIG_TABLE_NAME: "citibike_trips",
consts.CONFIG_FORMAT: "table",
consts.CONFIG_FILTER_STATUS: None,
}

BQ_CONN_NAME = "bq-integration-test"
CLI_CONFIG_FILE = "example_test.yaml"

Expand Down Expand Up @@ -427,12 +420,16 @@ def test_count_validator():
min_birth_year_value = df[df["validation_name"] == "min_birth_year"][
"source_agg_value"
].values[0]
std_tripduration_value = df[df["validation_name"] == "std_tripduration"][
"source_agg_value"
].values[0]

assert float(count_value) > 0
assert float(count_tripduration_value) > 0
assert float(avg_tripduration_value) > 0
assert float(max_birth_year_value) > 0
assert float(min_birth_year_value) > 0
assert float(std_tripduration_value) > 0
assert (
df["source_agg_value"].astype(float).sum()
== df["target_agg_value"].astype(float).sum()
Expand Down Expand Up @@ -465,14 +462,6 @@ def test_numeric_types():
)


def test_schema_validation():
validator = data_validation.DataValidation(CONFIG_SCHEMA_VALIDATION, verbose=True)
df = validator.execute()

for validation in df.to_dict(orient="records"):
assert validation["validation_status"] == consts.VALIDATION_STATUS_SUCCESS


def test_cli_store_yaml_then_run_gcs():
"""Test storing and retrieving validation YAML when GCS env var is set."""
# Store BQ Connection
Expand Down
1 change: 0 additions & 1 deletion third_party/ibis/ibis_db2/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,6 @@ def _day_of_week_name(t, op):
ops.Min: _reduction('min'),
ops.Max: _reduction('max'),
ops.Variance: variance_reduction('var', suffix={'sample': '', 'pop': 'p'}),
ops.StandardDev: variance_reduction('stdev', suffix={'sample': '', 'pop': 'p'}),
ops.RandomScalar: _random,
ops.TimestampNow: lambda *args: sa.func.timezone('UTC', sa.func.now()),
ops.CumulativeAll: unary(sa.func.bool_and),
Expand Down
4 changes: 2 additions & 2 deletions third_party/ibis/ibis_teradata/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def trans_float64(t, context):

@ibis_type_to_teradata_type.register(dt.Integer, TypeTranslationContext)
def trans_integer(t, context):
return "INT64"
return "BIGINT"


@ibis_type_to_teradata_type.register(dt.UInt64, (TypeTranslationContext, UDFContext))
Expand All @@ -224,4 +224,4 @@ def trans_type(t, context):

@ibis_type_to_teradata_type.register(dt.Decimal, TypeTranslationContext)
def trans_numeric(t, context):
return "NUMERIC"
return "DECIMAL"

0 comments on commit bb81701

Please sign in to comment.